1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/notifier/non_blocking_invalidator.h" 6 7#include <cstddef> 8 9#include "base/location.h" 10#include "base/logging.h" 11#include "base/memory/scoped_ptr.h" 12#include "base/single_thread_task_runner.h" 13#include "base/thread_task_runner_handle.h" 14#include "base/threading/thread.h" 15#include "jingle/notifier/listener/push_client.h" 16#include "sync/notifier/invalidation_notifier.h" 17 18namespace syncer { 19 20class NonBlockingInvalidator::Core 21 : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>, 22 // InvalidationHandler to observe the InvalidationNotifier we create. 23 public InvalidationHandler { 24 public: 25 // Called on parent thread. |delegate_observer| should be 26 // initialized. 27 explicit Core( 28 const WeakHandle<InvalidationHandler>& delegate_observer); 29 30 // Helpers called on I/O thread. 31 void Initialize( 32 const notifier::NotifierOptions& notifier_options, 33 const std::string& invalidator_client_id, 34 const InvalidationStateMap& initial_invalidation_state_map, 35 const std::string& invalidation_bootstrap_data, 36 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 37 const std::string& client_info); 38 void Teardown(); 39 void UpdateRegisteredIds(const ObjectIdSet& ids); 40 void Acknowledge(const invalidation::ObjectId& id, 41 const AckHandle& ack_handle); 42 void UpdateCredentials(const std::string& email, const std::string& token); 43 44 // InvalidationHandler implementation (all called on I/O thread by 45 // InvalidationNotifier). 46 virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE; 47 virtual void OnIncomingInvalidation( 48 const ObjectIdInvalidationMap& invalidation_map) OVERRIDE; 49 50 private: 51 friend class 52 base::RefCountedThreadSafe<NonBlockingInvalidator::Core>; 53 // Called on parent or I/O thread. 54 virtual ~Core(); 55 56 // The variables below should be used only on the I/O thread. 57 const WeakHandle<InvalidationHandler> delegate_observer_; 58 scoped_ptr<InvalidationNotifier> invalidation_notifier_; 59 scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_; 60 61 DISALLOW_COPY_AND_ASSIGN(Core); 62}; 63 64NonBlockingInvalidator::Core::Core( 65 const WeakHandle<InvalidationHandler>& delegate_observer) 66 : delegate_observer_(delegate_observer) { 67 DCHECK(delegate_observer_.IsInitialized()); 68} 69 70NonBlockingInvalidator::Core::~Core() { 71} 72 73void NonBlockingInvalidator::Core::Initialize( 74 const notifier::NotifierOptions& notifier_options, 75 const std::string& invalidator_client_id, 76 const InvalidationStateMap& initial_invalidation_state_map, 77 const std::string& invalidation_bootstrap_data, 78 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 79 const std::string& client_info) { 80 DCHECK(notifier_options.request_context_getter.get()); 81 DCHECK_EQ(notifier::NOTIFICATION_SERVER, 82 notifier_options.notification_method); 83 network_task_runner_ = notifier_options.request_context_getter-> 84 GetNetworkTaskRunner(); 85 DCHECK(network_task_runner_->BelongsToCurrentThread()); 86 invalidation_notifier_.reset( 87 new InvalidationNotifier( 88 notifier::PushClient::CreateDefaultOnIOThread(notifier_options), 89 invalidator_client_id, 90 initial_invalidation_state_map, 91 invalidation_bootstrap_data, 92 invalidation_state_tracker, 93 client_info)); 94 invalidation_notifier_->RegisterHandler(this); 95} 96 97void NonBlockingInvalidator::Core::Teardown() { 98 DCHECK(network_task_runner_->BelongsToCurrentThread()); 99 invalidation_notifier_->UnregisterHandler(this); 100 invalidation_notifier_.reset(); 101 network_task_runner_ = NULL; 102} 103 104void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) { 105 DCHECK(network_task_runner_->BelongsToCurrentThread()); 106 invalidation_notifier_->UpdateRegisteredIds(this, ids); 107} 108 109void NonBlockingInvalidator::Core::Acknowledge(const invalidation::ObjectId& id, 110 const AckHandle& ack_handle) { 111 DCHECK(network_task_runner_->BelongsToCurrentThread()); 112 invalidation_notifier_->Acknowledge(id, ack_handle); 113} 114 115void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email, 116 const std::string& token) { 117 DCHECK(network_task_runner_->BelongsToCurrentThread()); 118 invalidation_notifier_->UpdateCredentials(email, token); 119} 120 121void NonBlockingInvalidator::Core::OnInvalidatorStateChange( 122 InvalidatorState reason) { 123 DCHECK(network_task_runner_->BelongsToCurrentThread()); 124 delegate_observer_.Call( 125 FROM_HERE, &InvalidationHandler::OnInvalidatorStateChange, reason); 126} 127 128void NonBlockingInvalidator::Core::OnIncomingInvalidation( 129 const ObjectIdInvalidationMap& invalidation_map) { 130 DCHECK(network_task_runner_->BelongsToCurrentThread()); 131 delegate_observer_.Call(FROM_HERE, 132 &InvalidationHandler::OnIncomingInvalidation, 133 invalidation_map); 134} 135 136NonBlockingInvalidator::NonBlockingInvalidator( 137 const notifier::NotifierOptions& notifier_options, 138 const std::string& invalidator_client_id, 139 const InvalidationStateMap& initial_invalidation_state_map, 140 const std::string& invalidation_bootstrap_data, 141 const WeakHandle<InvalidationStateTracker>& 142 invalidation_state_tracker, 143 const std::string& client_info) 144 : weak_ptr_factory_(this), 145 core_( 146 new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()))), 147 parent_task_runner_( 148 base::ThreadTaskRunnerHandle::Get()), 149 network_task_runner_(notifier_options.request_context_getter-> 150 GetNetworkTaskRunner()) { 151 if (!network_task_runner_->PostTask( 152 FROM_HERE, 153 base::Bind( 154 &NonBlockingInvalidator::Core::Initialize, 155 core_.get(), 156 notifier_options, 157 invalidator_client_id, 158 initial_invalidation_state_map, 159 invalidation_bootstrap_data, 160 invalidation_state_tracker, 161 client_info))) { 162 NOTREACHED(); 163 } 164} 165 166NonBlockingInvalidator::~NonBlockingInvalidator() { 167 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 168 if (!network_task_runner_->PostTask( 169 FROM_HERE, 170 base::Bind(&NonBlockingInvalidator::Core::Teardown, 171 core_.get()))) { 172 DVLOG(1) << "Network thread stopped before invalidator is destroyed."; 173 } 174} 175 176void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) { 177 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 178 registrar_.RegisterHandler(handler); 179} 180 181void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler, 182 const ObjectIdSet& ids) { 183 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 184 registrar_.UpdateRegisteredIds(handler, ids); 185 if (!network_task_runner_->PostTask( 186 FROM_HERE, 187 base::Bind( 188 &NonBlockingInvalidator::Core::UpdateRegisteredIds, 189 core_.get(), 190 registrar_.GetAllRegisteredIds()))) { 191 NOTREACHED(); 192 } 193} 194 195void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) { 196 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 197 registrar_.UnregisterHandler(handler); 198} 199 200void NonBlockingInvalidator::Acknowledge(const invalidation::ObjectId& id, 201 const AckHandle& ack_handle) { 202 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 203 if (!network_task_runner_->PostTask( 204 FROM_HERE, 205 base::Bind( 206 &NonBlockingInvalidator::Core::Acknowledge, 207 core_.get(), 208 id, 209 ack_handle))) { 210 NOTREACHED(); 211 } 212} 213 214InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const { 215 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 216 return registrar_.GetInvalidatorState(); 217} 218 219void NonBlockingInvalidator::UpdateCredentials(const std::string& email, 220 const std::string& token) { 221 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 222 if (!network_task_runner_->PostTask( 223 FROM_HERE, 224 base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials, 225 core_.get(), email, token))) { 226 NOTREACHED(); 227 } 228} 229 230void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) { 231 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 232 registrar_.UpdateInvalidatorState(state); 233} 234 235void NonBlockingInvalidator::OnIncomingInvalidation( 236 const ObjectIdInvalidationMap& invalidation_map) { 237 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 238 registrar_.DispatchInvalidationsToHandlers(invalidation_map); 239} 240 241} // namespace syncer 242