1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "components/invalidation/sync_invalidation_listener.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/compiler_specific.h"
12#include "base/logging.h"
13#include "base/tracked_objects.h"
14#include "components/invalidation/invalidation_util.h"
15#include "components/invalidation/object_id_invalidation_map.h"
16#include "components/invalidation/registration_manager.h"
17#include "google/cacheinvalidation/include/invalidation-client.h"
18#include "google/cacheinvalidation/include/types.h"
19#include "jingle/notifier/listener/push_client.h"
20
21namespace {
22
23const char kApplicationName[] = "chrome-sync";
24
25}  // namespace
26
27namespace syncer {
28
29SyncInvalidationListener::Delegate::~Delegate() {}
30
31SyncInvalidationListener::SyncInvalidationListener(
32    scoped_ptr<SyncNetworkChannel> network_channel)
33    : sync_network_channel_(network_channel.Pass()),
34      sync_system_resources_(sync_network_channel_.get(), this),
35      delegate_(NULL),
36      ticl_state_(DEFAULT_INVALIDATION_ERROR),
37      push_client_state_(DEFAULT_INVALIDATION_ERROR),
38      weak_ptr_factory_(this) {
39  DCHECK(CalledOnValidThread());
40  sync_network_channel_->AddObserver(this);
41}
42
43SyncInvalidationListener::~SyncInvalidationListener() {
44  DCHECK(CalledOnValidThread());
45  sync_network_channel_->RemoveObserver(this);
46  Stop();
47  DCHECK(!delegate_);
48}
49
50void SyncInvalidationListener::Start(
51    const CreateInvalidationClientCallback& create_invalidation_client_callback,
52    const std::string& client_id,
53    const std::string& client_info,
54    const std::string& invalidation_bootstrap_data,
55    const UnackedInvalidationsMap& initial_unacked_invalidations,
56    const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
57    const scoped_refptr<base::SequencedTaskRunner>&
58        invalidation_state_tracker_task_runner,
59    Delegate* delegate) {
60  DCHECK(CalledOnValidThread());
61  Stop();
62
63  sync_system_resources_.set_platform(client_info);
64  sync_system_resources_.Start();
65
66  // The Storage resource is implemented as a write-through cache.  We populate
67  // it with the initial state on startup, so subsequent writes go to disk and
68  // update the in-memory cache, while reads just return the cached state.
69  sync_system_resources_.storage()->SetInitialState(
70      invalidation_bootstrap_data);
71
72  unacked_invalidations_map_ = initial_unacked_invalidations;
73  invalidation_state_tracker_ = invalidation_state_tracker;
74  invalidation_state_tracker_task_runner_ =
75      invalidation_state_tracker_task_runner;
76  DCHECK(invalidation_state_tracker_task_runner_.get());
77
78  DCHECK(!delegate_);
79  DCHECK(delegate);
80  delegate_ = delegate;
81
82  invalidation_client_.reset(create_invalidation_client_callback.Run(
83      &sync_system_resources_,
84      sync_network_channel_->GetInvalidationClientType(),
85      client_id,
86      kApplicationName,
87      this));
88  invalidation_client_->Start();
89
90  registration_manager_.reset(
91      new RegistrationManager(invalidation_client_.get()));
92}
93
94void SyncInvalidationListener::UpdateCredentials(
95    const std::string& email, const std::string& token) {
96  DCHECK(CalledOnValidThread());
97  sync_network_channel_->UpdateCredentials(email, token);
98}
99
100void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
101  DCHECK(CalledOnValidThread());
102  registered_ids_ = ids;
103  // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
104  // working XMPP connection (as observed by us), so check it instead
105  // of GetState() (see http://crbug.com/139424).
106  if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
107    DoRegistrationUpdate();
108  }
109}
110
111void SyncInvalidationListener::Ready(
112    invalidation::InvalidationClient* client) {
113  DCHECK(CalledOnValidThread());
114  DCHECK_EQ(client, invalidation_client_.get());
115  ticl_state_ = INVALIDATIONS_ENABLED;
116  EmitStateChange();
117  DoRegistrationUpdate();
118}
119
120void SyncInvalidationListener::Invalidate(
121    invalidation::InvalidationClient* client,
122    const invalidation::Invalidation& invalidation,
123    const invalidation::AckHandle& ack_handle) {
124  DCHECK(CalledOnValidThread());
125  DCHECK_EQ(client, invalidation_client_.get());
126  client->Acknowledge(ack_handle);
127
128  const invalidation::ObjectId& id = invalidation.object_id();
129
130  std::string payload;
131  // payload() CHECK()'s has_payload(), so we must check it ourselves first.
132  if (invalidation.has_payload())
133    payload = invalidation.payload();
134
135  DVLOG(2) << "Received invalidation with version " << invalidation.version()
136           << " for " << ObjectIdToString(id);
137
138  ObjectIdInvalidationMap invalidations;
139  Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
140  inv.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
141  invalidations.Insert(inv);
142
143  DispatchInvalidations(invalidations);
144}
145
146void SyncInvalidationListener::InvalidateUnknownVersion(
147    invalidation::InvalidationClient* client,
148    const invalidation::ObjectId& object_id,
149    const invalidation::AckHandle& ack_handle) {
150  DCHECK(CalledOnValidThread());
151  DCHECK_EQ(client, invalidation_client_.get());
152  DVLOG(1) << "InvalidateUnknownVersion";
153  client->Acknowledge(ack_handle);
154
155  ObjectIdInvalidationMap invalidations;
156  Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
157  unknown_version.SetAckHandler(AsWeakPtr(), base::MessageLoopProxy::current());
158  invalidations.Insert(unknown_version);
159
160  DispatchInvalidations(invalidations);
161}
162
163// This should behave as if we got an invalidation with version
164// UNKNOWN_OBJECT_VERSION for all known data types.
165void SyncInvalidationListener::InvalidateAll(
166    invalidation::InvalidationClient* client,
167    const invalidation::AckHandle& ack_handle) {
168  DCHECK(CalledOnValidThread());
169  DCHECK_EQ(client, invalidation_client_.get());
170  DVLOG(1) << "InvalidateAll";
171  client->Acknowledge(ack_handle);
172
173  ObjectIdInvalidationMap invalidations;
174  for (ObjectIdSet::iterator it = registered_ids_.begin();
175       it != registered_ids_.end(); ++it) {
176    Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
177    unknown_version.SetAckHandler(AsWeakPtr(),
178                                  base::MessageLoopProxy::current());
179    invalidations.Insert(unknown_version);
180  }
181
182  DispatchInvalidations(invalidations);
183}
184
185// If a handler is registered, emit right away.  Otherwise, save it for later.
186void SyncInvalidationListener::DispatchInvalidations(
187    const ObjectIdInvalidationMap& invalidations) {
188  DCHECK(CalledOnValidThread());
189
190  ObjectIdInvalidationMap to_save = invalidations;
191  ObjectIdInvalidationMap to_emit =
192      invalidations.GetSubsetWithObjectIds(registered_ids_);
193
194  SaveInvalidations(to_save);
195  EmitSavedInvalidations(to_emit);
196}
197
198void SyncInvalidationListener::SaveInvalidations(
199    const ObjectIdInvalidationMap& to_save) {
200  ObjectIdSet objects_to_save = to_save.GetObjectIds();
201  for (ObjectIdSet::const_iterator it = objects_to_save.begin();
202       it != objects_to_save.end(); ++it) {
203    UnackedInvalidationsMap::iterator lookup =
204        unacked_invalidations_map_.find(*it);
205    if (lookup == unacked_invalidations_map_.end()) {
206      lookup = unacked_invalidations_map_.insert(
207          std::make_pair(*it, UnackedInvalidationSet(*it))).first;
208    }
209    lookup->second.AddSet(to_save.ForObject(*it));
210  }
211
212  invalidation_state_tracker_task_runner_->PostTask(
213      FROM_HERE,
214      base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
215                 invalidation_state_tracker_,
216                 unacked_invalidations_map_));
217}
218
219void SyncInvalidationListener::EmitSavedInvalidations(
220    const ObjectIdInvalidationMap& to_emit) {
221  DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
222  delegate_->OnInvalidate(to_emit);
223}
224
225void SyncInvalidationListener::InformRegistrationStatus(
226      invalidation::InvalidationClient* client,
227      const invalidation::ObjectId& object_id,
228      InvalidationListener::RegistrationState new_state) {
229  DCHECK(CalledOnValidThread());
230  DCHECK_EQ(client, invalidation_client_.get());
231  DVLOG(1) << "InformRegistrationStatus: "
232           << ObjectIdToString(object_id) << " " << new_state;
233
234  if (new_state != InvalidationListener::REGISTERED) {
235    // Let |registration_manager_| handle the registration backoff policy.
236    registration_manager_->MarkRegistrationLost(object_id);
237  }
238}
239
240void SyncInvalidationListener::InformRegistrationFailure(
241    invalidation::InvalidationClient* client,
242    const invalidation::ObjectId& object_id,
243    bool is_transient,
244    const std::string& error_message) {
245  DCHECK(CalledOnValidThread());
246  DCHECK_EQ(client, invalidation_client_.get());
247  DVLOG(1) << "InformRegistrationFailure: "
248           << ObjectIdToString(object_id)
249           << "is_transient=" << is_transient
250           << ", message=" << error_message;
251
252  if (is_transient) {
253    // We don't care about |unknown_hint|; we let
254    // |registration_manager_| handle the registration backoff policy.
255    registration_manager_->MarkRegistrationLost(object_id);
256  } else {
257    // Non-transient failures require an action to resolve. This could happen
258    // because:
259    // - the server doesn't yet recognize the data type, which could happen for
260    //   brand-new data types.
261    // - the user has changed his password and hasn't updated it yet locally.
262    // Either way, block future registration attempts for |object_id|. However,
263    // we don't forget any saved invalidation state since we may use it once the
264    // error is addressed.
265    registration_manager_->DisableId(object_id);
266  }
267}
268
269void SyncInvalidationListener::ReissueRegistrations(
270    invalidation::InvalidationClient* client,
271    const std::string& prefix,
272    int prefix_length) {
273  DCHECK(CalledOnValidThread());
274  DCHECK_EQ(client, invalidation_client_.get());
275  DVLOG(1) << "AllRegistrationsLost";
276  registration_manager_->MarkAllRegistrationsLost();
277}
278
279void SyncInvalidationListener::InformError(
280    invalidation::InvalidationClient* client,
281    const invalidation::ErrorInfo& error_info) {
282  DCHECK(CalledOnValidThread());
283  DCHECK_EQ(client, invalidation_client_.get());
284  LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
285             << error_info.error_message()
286             << " (transient = " << error_info.is_transient() << ")";
287  if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
288    ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
289  } else {
290    ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
291  }
292  EmitStateChange();
293}
294
295void SyncInvalidationListener::Acknowledge(
296  const invalidation::ObjectId& id,
297  const syncer::AckHandle& handle) {
298  UnackedInvalidationsMap::iterator lookup =
299      unacked_invalidations_map_.find(id);
300  if (lookup == unacked_invalidations_map_.end()) {
301    DLOG(WARNING) << "Received acknowledgement for untracked object ID";
302    return;
303  }
304  lookup->second.Acknowledge(handle);
305  invalidation_state_tracker_task_runner_->PostTask(
306      FROM_HERE,
307      base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
308                 invalidation_state_tracker_,
309                 unacked_invalidations_map_));
310}
311
312void SyncInvalidationListener::Drop(
313    const invalidation::ObjectId& id,
314    const syncer::AckHandle& handle) {
315  UnackedInvalidationsMap::iterator lookup =
316      unacked_invalidations_map_.find(id);
317  if (lookup == unacked_invalidations_map_.end()) {
318    DLOG(WARNING) << "Received drop for untracked object ID";
319    return;
320  }
321  lookup->second.Drop(handle);
322  invalidation_state_tracker_task_runner_->PostTask(
323      FROM_HERE,
324      base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
325                 invalidation_state_tracker_,
326                 unacked_invalidations_map_));
327}
328
329void SyncInvalidationListener::WriteState(const std::string& state) {
330  DCHECK(CalledOnValidThread());
331  DVLOG(1) << "WriteState";
332  invalidation_state_tracker_task_runner_->PostTask(
333      FROM_HERE,
334      base::Bind(&InvalidationStateTracker::SetBootstrapData,
335                 invalidation_state_tracker_,
336                 state));
337}
338
339void SyncInvalidationListener::DoRegistrationUpdate() {
340  DCHECK(CalledOnValidThread());
341  const ObjectIdSet& unregistered_ids =
342      registration_manager_->UpdateRegisteredIds(registered_ids_);
343  for (ObjectIdSet::iterator it = unregistered_ids.begin();
344       it != unregistered_ids.end(); ++it) {
345    unacked_invalidations_map_.erase(*it);
346  }
347  invalidation_state_tracker_task_runner_->PostTask(
348      FROM_HERE,
349      base::Bind(&InvalidationStateTracker::SetSavedInvalidations,
350                 invalidation_state_tracker_,
351                 unacked_invalidations_map_));
352
353  ObjectIdInvalidationMap object_id_invalidation_map;
354  for (UnackedInvalidationsMap::iterator map_it =
355       unacked_invalidations_map_.begin();
356       map_it != unacked_invalidations_map_.end(); ++map_it) {
357    if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
358      continue;
359    }
360    map_it->second.ExportInvalidations(AsWeakPtr(),
361                                       base::MessageLoopProxy::current(),
362                                       &object_id_invalidation_map);
363  }
364
365  // There's no need to run these through DispatchInvalidations(); they've
366  // already been saved to storage (that's where we found them) so all we need
367  // to do now is emit them.
368  EmitSavedInvalidations(object_id_invalidation_map);
369}
370
371void SyncInvalidationListener::RequestDetailedStatus(
372    base::Callback<void(const base::DictionaryValue&)> callback) const {
373  DCHECK(CalledOnValidThread());
374  sync_network_channel_->RequestDetailedStatus(callback);
375  callback.Run(*CollectDebugData());
376}
377
378scoped_ptr<base::DictionaryValue>
379SyncInvalidationListener::CollectDebugData() const {
380  scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
381  return_value->SetString(
382      "SyncInvalidationListener.PushClientState",
383      std::string(InvalidatorStateToString(push_client_state_)));
384  return_value->SetString("SyncInvalidationListener.TiclState",
385                          std::string(InvalidatorStateToString(ticl_state_)));
386  scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
387  for (UnackedInvalidationsMap::const_iterator it =
388           unacked_invalidations_map_.begin();
389       it != unacked_invalidations_map_.end();
390       ++it) {
391    unacked_map->Set((it->first).name(), (it->second).ToValue().release());
392  }
393  return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
394                    unacked_map.release());
395  return return_value.Pass();
396}
397
398void SyncInvalidationListener::StopForTest() {
399  DCHECK(CalledOnValidThread());
400  Stop();
401}
402
403void SyncInvalidationListener::Stop() {
404  DCHECK(CalledOnValidThread());
405  if (!invalidation_client_) {
406    return;
407  }
408
409  registration_manager_.reset();
410  sync_system_resources_.Stop();
411  invalidation_client_->Stop();
412
413  invalidation_client_.reset();
414  delegate_ = NULL;
415
416  ticl_state_ = DEFAULT_INVALIDATION_ERROR;
417  push_client_state_ = DEFAULT_INVALIDATION_ERROR;
418}
419
420InvalidatorState SyncInvalidationListener::GetState() const {
421  DCHECK(CalledOnValidThread());
422  if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
423      push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
424    // If either the ticl or the push client rejected our credentials,
425    // return INVALIDATION_CREDENTIALS_REJECTED.
426    return INVALIDATION_CREDENTIALS_REJECTED;
427  }
428  if (ticl_state_ == INVALIDATIONS_ENABLED &&
429      push_client_state_ == INVALIDATIONS_ENABLED) {
430    // If the ticl is ready and the push client notifications are
431    // enabled, return INVALIDATIONS_ENABLED.
432    return INVALIDATIONS_ENABLED;
433  }
434  // Otherwise, we have a transient error.
435  return TRANSIENT_INVALIDATION_ERROR;
436}
437
438void SyncInvalidationListener::EmitStateChange() {
439  DCHECK(CalledOnValidThread());
440  delegate_->OnInvalidatorStateChange(GetState());
441}
442
443base::WeakPtr<AckHandler> SyncInvalidationListener::AsWeakPtr() {
444  DCHECK(CalledOnValidThread());
445  base::WeakPtr<AckHandler> weak_ptr = weak_ptr_factory_.GetWeakPtr();
446  weak_ptr.get();  // Binds the pointer to this thread.
447  return weak_ptr;
448}
449
450void SyncInvalidationListener::OnNetworkChannelStateChanged(
451    InvalidatorState invalidator_state) {
452  DCHECK(CalledOnValidThread());
453  push_client_state_ = invalidator_state;
454  EmitStateChange();
455}
456
457}  // namespace syncer
458