1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/notifier/non_blocking_invalidator.h"
6
7#include <cstddef>
8
9#include "base/location.h"
10#include "base/logging.h"
11#include "base/memory/scoped_ptr.h"
12#include "base/single_thread_task_runner.h"
13#include "base/thread_task_runner_handle.h"
14#include "base/threading/thread.h"
15#include "jingle/notifier/listener/push_client.h"
16#include "sync/notifier/invalidation_notifier.h"
17
18namespace syncer {
19
20class NonBlockingInvalidator::Core
21    : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
22      // InvalidationHandler to observe the InvalidationNotifier we create.
23      public InvalidationHandler {
24 public:
25  // Called on parent thread.  |delegate_observer| should be
26  // initialized.
27  explicit Core(
28      const WeakHandle<InvalidationHandler>& delegate_observer);
29
30  // Helpers called on I/O thread.
31  void Initialize(
32      const notifier::NotifierOptions& notifier_options,
33      const std::string& invalidator_client_id,
34      const InvalidationStateMap& initial_invalidation_state_map,
35      const std::string& invalidation_bootstrap_data,
36      const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
37      const std::string& client_info);
38  void Teardown();
39  void UpdateRegisteredIds(const ObjectIdSet& ids);
40  void Acknowledge(const invalidation::ObjectId& id,
41                   const AckHandle& ack_handle);
42  void UpdateCredentials(const std::string& email, const std::string& token);
43
44  // InvalidationHandler implementation (all called on I/O thread by
45  // InvalidationNotifier).
46  virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE;
47  virtual void OnIncomingInvalidation(
48      const ObjectIdInvalidationMap& invalidation_map) OVERRIDE;
49
50 private:
51  friend class
52      base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
53  // Called on parent or I/O thread.
54  virtual ~Core();
55
56  // The variables below should be used only on the I/O thread.
57  const WeakHandle<InvalidationHandler> delegate_observer_;
58  scoped_ptr<InvalidationNotifier> invalidation_notifier_;
59  scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
60
61  DISALLOW_COPY_AND_ASSIGN(Core);
62};
63
64NonBlockingInvalidator::Core::Core(
65    const WeakHandle<InvalidationHandler>& delegate_observer)
66    : delegate_observer_(delegate_observer) {
67  DCHECK(delegate_observer_.IsInitialized());
68}
69
70NonBlockingInvalidator::Core::~Core() {
71}
72
73void NonBlockingInvalidator::Core::Initialize(
74    const notifier::NotifierOptions& notifier_options,
75    const std::string& invalidator_client_id,
76    const InvalidationStateMap& initial_invalidation_state_map,
77    const std::string& invalidation_bootstrap_data,
78    const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
79    const std::string& client_info) {
80  DCHECK(notifier_options.request_context_getter.get());
81  DCHECK_EQ(notifier::NOTIFICATION_SERVER,
82            notifier_options.notification_method);
83  network_task_runner_ = notifier_options.request_context_getter->
84      GetNetworkTaskRunner();
85  DCHECK(network_task_runner_->BelongsToCurrentThread());
86  invalidation_notifier_.reset(
87      new InvalidationNotifier(
88          notifier::PushClient::CreateDefaultOnIOThread(notifier_options),
89          invalidator_client_id,
90          initial_invalidation_state_map,
91          invalidation_bootstrap_data,
92          invalidation_state_tracker,
93          client_info));
94  invalidation_notifier_->RegisterHandler(this);
95}
96
97void NonBlockingInvalidator::Core::Teardown() {
98  DCHECK(network_task_runner_->BelongsToCurrentThread());
99  invalidation_notifier_->UnregisterHandler(this);
100  invalidation_notifier_.reset();
101  network_task_runner_ = NULL;
102}
103
104void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
105  DCHECK(network_task_runner_->BelongsToCurrentThread());
106  invalidation_notifier_->UpdateRegisteredIds(this, ids);
107}
108
109void NonBlockingInvalidator::Core::Acknowledge(const invalidation::ObjectId& id,
110                                               const AckHandle& ack_handle) {
111  DCHECK(network_task_runner_->BelongsToCurrentThread());
112  invalidation_notifier_->Acknowledge(id, ack_handle);
113}
114
115void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
116                                                     const std::string& token) {
117  DCHECK(network_task_runner_->BelongsToCurrentThread());
118  invalidation_notifier_->UpdateCredentials(email, token);
119}
120
121void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
122    InvalidatorState reason) {
123  DCHECK(network_task_runner_->BelongsToCurrentThread());
124  delegate_observer_.Call(
125      FROM_HERE, &InvalidationHandler::OnInvalidatorStateChange, reason);
126}
127
128void NonBlockingInvalidator::Core::OnIncomingInvalidation(
129    const ObjectIdInvalidationMap& invalidation_map) {
130  DCHECK(network_task_runner_->BelongsToCurrentThread());
131  delegate_observer_.Call(FROM_HERE,
132                          &InvalidationHandler::OnIncomingInvalidation,
133                          invalidation_map);
134}
135
136NonBlockingInvalidator::NonBlockingInvalidator(
137    const notifier::NotifierOptions& notifier_options,
138    const std::string& invalidator_client_id,
139    const InvalidationStateMap& initial_invalidation_state_map,
140    const std::string& invalidation_bootstrap_data,
141    const WeakHandle<InvalidationStateTracker>&
142        invalidation_state_tracker,
143    const std::string& client_info)
144        : weak_ptr_factory_(this),
145          core_(
146              new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()))),
147          parent_task_runner_(
148              base::ThreadTaskRunnerHandle::Get()),
149          network_task_runner_(notifier_options.request_context_getter->
150              GetNetworkTaskRunner()) {
151  if (!network_task_runner_->PostTask(
152          FROM_HERE,
153          base::Bind(
154              &NonBlockingInvalidator::Core::Initialize,
155              core_.get(),
156              notifier_options,
157              invalidator_client_id,
158              initial_invalidation_state_map,
159              invalidation_bootstrap_data,
160              invalidation_state_tracker,
161              client_info))) {
162    NOTREACHED();
163  }
164}
165
166NonBlockingInvalidator::~NonBlockingInvalidator() {
167  DCHECK(parent_task_runner_->BelongsToCurrentThread());
168  if (!network_task_runner_->PostTask(
169          FROM_HERE,
170          base::Bind(&NonBlockingInvalidator::Core::Teardown,
171                     core_.get()))) {
172    DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
173  }
174}
175
176void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
177  DCHECK(parent_task_runner_->BelongsToCurrentThread());
178  registrar_.RegisterHandler(handler);
179}
180
181void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
182                                                 const ObjectIdSet& ids) {
183  DCHECK(parent_task_runner_->BelongsToCurrentThread());
184  registrar_.UpdateRegisteredIds(handler, ids);
185  if (!network_task_runner_->PostTask(
186          FROM_HERE,
187          base::Bind(
188              &NonBlockingInvalidator::Core::UpdateRegisteredIds,
189              core_.get(),
190              registrar_.GetAllRegisteredIds()))) {
191    NOTREACHED();
192  }
193}
194
195void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
196  DCHECK(parent_task_runner_->BelongsToCurrentThread());
197  registrar_.UnregisterHandler(handler);
198}
199
200void NonBlockingInvalidator::Acknowledge(const invalidation::ObjectId& id,
201                                         const AckHandle& ack_handle) {
202  DCHECK(parent_task_runner_->BelongsToCurrentThread());
203  if (!network_task_runner_->PostTask(
204          FROM_HERE,
205          base::Bind(
206              &NonBlockingInvalidator::Core::Acknowledge,
207              core_.get(),
208              id,
209              ack_handle))) {
210    NOTREACHED();
211  }
212}
213
214InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
215  DCHECK(parent_task_runner_->BelongsToCurrentThread());
216  return registrar_.GetInvalidatorState();
217}
218
219void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
220                                               const std::string& token) {
221  DCHECK(parent_task_runner_->BelongsToCurrentThread());
222  if (!network_task_runner_->PostTask(
223          FROM_HERE,
224          base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
225                     core_.get(), email, token))) {
226    NOTREACHED();
227  }
228}
229
230void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
231  DCHECK(parent_task_runner_->BelongsToCurrentThread());
232  registrar_.UpdateInvalidatorState(state);
233}
234
235void NonBlockingInvalidator::OnIncomingInvalidation(
236        const ObjectIdInvalidationMap& invalidation_map) {
237  DCHECK(parent_task_runner_->BelongsToCurrentThread());
238  registrar_.DispatchInvalidationsToHandlers(invalidation_map);
239}
240
241}  // namespace syncer
242