cache_invalidation_packet_handler.cc revision 3345a6884c488ff3a535c2c9acdd33d74b37e311
1// Copyright (c) 2010 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 6 7#include <string> 8 9#include "base/base64.h" 10#include "base/callback.h" 11#include "base/compiler_specific.h" 12#include "base/logging.h" 13#include "base/rand_util.h" 14#include "base/string_number_conversions.h" 15#include "chrome/browser/sync/sync_constants.h" 16#include "google/cacheinvalidation/invalidation-client.h" 17#include "jingle/notifier/listener/xml_element_util.h" 18#include "talk/xmpp/constants.h" 19#include "talk/xmpp/jid.h" 20#include "talk/xmpp/xmppclient.h" 21#include "talk/xmpp/xmpptask.h" 22 23namespace sync_notifier { 24 25namespace { 26 27const char kBotJid[] = "tango@bot.talk.google.com"; 28 29const buzz::QName kQnData("google:notifier", "data"); 30const buzz::QName kQnSeq("", "seq"); 31const buzz::QName kQnSid("", "sid"); 32const buzz::QName kQnServiceUrl("", "serviceUrl"); 33 34// TODO(akalin): Move these task classes out so that they can be 35// unit-tested. This'll probably be done easier once we consolidate 36// all the packet sending/receiving classes. 37 38// A task that listens for ClientInvalidation messages and calls the 39// given callback on them. 40class CacheInvalidationListenTask : public buzz::XmppTask { 41 public: 42 // Takes ownership of callback. 43 CacheInvalidationListenTask(Task* parent, 44 Callback1<const std::string&>::Type* callback) 45 : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} 46 virtual ~CacheInvalidationListenTask() {} 47 48 virtual int ProcessStart() { 49 VLOG(2) << "CacheInvalidationListenTask started"; 50 return STATE_RESPONSE; 51 } 52 53 virtual int ProcessResponse() { 54 const buzz::XmlElement* stanza = NextStanza(); 55 if (stanza == NULL) { 56 VLOG(2) << "CacheInvalidationListenTask blocked"; 57 return STATE_BLOCKED; 58 } 59 VLOG(2) << "CacheInvalidationListenTask response received"; 60 std::string data; 61 if (GetCacheInvalidationIqPacketData(stanza, &data)) { 62 callback_->Run(data); 63 } else { 64 LOG(ERROR) << "Could not get packet data"; 65 } 66 // Acknowledge receipt of the iq to the buzz server. 67 // TODO(akalin): Send an error response for malformed packets. 68 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); 69 SendStanza(response_stanza.get()); 70 return STATE_RESPONSE; 71 } 72 73 virtual bool HandleStanza(const buzz::XmlElement* stanza) { 74 VLOG(1) << "Stanza received: " 75 << notifier::XmlElementToString(*stanza); 76 if (IsValidCacheInvalidationIqPacket(stanza)) { 77 VLOG(2) << "Queueing stanza"; 78 QueueStanza(stanza); 79 return true; 80 } 81 VLOG(2) << "Stanza skipped"; 82 return false; 83 } 84 85 private: 86 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { 87 // We make sure to compare jids (which are normalized) instead of 88 // just strings -- server may use non-normalized jids in 89 // attributes. 90 // 91 // TODO(akalin): Add unit tests for this. 92 buzz::Jid to(stanza->Attr(buzz::QN_TO)); 93 return 94 (MatchRequestIq(stanza, buzz::STR_SET, kQnData) && 95 (to == GetClient()->jid())); 96 } 97 98 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, 99 std::string* data) { 100 DCHECK(IsValidCacheInvalidationIqPacket(stanza)); 101 const buzz::XmlElement* cache_invalidation_iq_packet = 102 stanza->FirstNamed(kQnData); 103 if (!cache_invalidation_iq_packet) { 104 LOG(ERROR) << "Could not find cache invalidation IQ packet element"; 105 return false; 106 } 107 *data = cache_invalidation_iq_packet->BodyText(); 108 return true; 109 } 110 111 scoped_ptr<Callback1<const std::string&>::Type> callback_; 112 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); 113}; 114 115// A task that sends a single outbound ClientInvalidation message. 116class CacheInvalidationSendMessageTask : public buzz::XmppTask { 117 public: 118 CacheInvalidationSendMessageTask(Task* parent, 119 const buzz::Jid& to_jid, 120 const std::string& msg, 121 int seq, 122 const std::string& sid) 123 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), 124 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {} 125 virtual ~CacheInvalidationSendMessageTask() {} 126 127 virtual int ProcessStart() { 128 scoped_ptr<buzz::XmlElement> stanza( 129 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, 130 seq_, sid_)); 131 VLOG(1) << "Sending message: " 132 << notifier::XmlElementToString(*stanza.get()); 133 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { 134 VLOG(2) << "Error when sending message"; 135 return STATE_ERROR; 136 } 137 return STATE_RESPONSE; 138 } 139 140 virtual int ProcessResponse() { 141 const buzz::XmlElement* stanza = NextStanza(); 142 if (stanza == NULL) { 143 VLOG(2) << "CacheInvalidationSendMessageTask blocked..."; 144 return STATE_BLOCKED; 145 } 146 VLOG(2) << "CacheInvalidationSendMessageTask response received: " 147 << notifier::XmlElementToString(*stanza); 148 // TODO(akalin): Handle errors here. 149 return STATE_DONE; 150 } 151 152 virtual bool HandleStanza(const buzz::XmlElement* stanza) { 153 VLOG(1) << "Stanza received: " 154 << notifier::XmlElementToString(*stanza); 155 if (!MatchResponseIq(stanza, to_jid_, task_id())) { 156 VLOG(2) << "Stanza skipped"; 157 return false; 158 } 159 VLOG(2) << "Queueing stanza"; 160 QueueStanza(stanza); 161 return true; 162 } 163 164 private: 165 static buzz::XmlElement* MakeCacheInvalidationIqPacket( 166 const buzz::Jid& to_jid, 167 const std::string& task_id, 168 const std::string& msg, 169 int seq, const std::string& sid) { 170 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); 171 buzz::XmlElement* cache_invalidation_iq_packet = 172 new buzz::XmlElement(kQnData, true); 173 iq->AddElement(cache_invalidation_iq_packet); 174 cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq)); 175 cache_invalidation_iq_packet->SetAttr(kQnSid, sid); 176 cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, 177 browser_sync::kSyncServiceUrl); 178 cache_invalidation_iq_packet->SetBodyText(msg); 179 return iq; 180 } 181 182 const buzz::Jid to_jid_; 183 std::string msg_; 184 int seq_; 185 std::string sid_; 186 187 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); 188}; 189 190std::string MakeSid() { 191 uint64 sid = base::RandUint64(); 192 return std::string("chrome-sync-") + base::Uint64ToString(sid); 193} 194 195} // namespace 196 197CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( 198 talk_base::Task* base_task, 199 invalidation::InvalidationClient* invalidation_client) 200 : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 201 base_task_(base_task), 202 invalidation_client_(invalidation_client), 203 seq_(0), 204 sid_(MakeSid()) { 205 CHECK(base_task_); 206 CHECK(invalidation_client_); 207 invalidation::NetworkEndpoint* network_endpoint = 208 invalidation_client_->network_endpoint(); 209 CHECK(network_endpoint); 210 network_endpoint->RegisterOutboundListener( 211 scoped_callback_factory_.NewCallback( 212 &CacheInvalidationPacketHandler::HandleOutboundPacket)); 213 // Owned by base_task. 214 CacheInvalidationListenTask* listen_task = 215 new CacheInvalidationListenTask( 216 base_task, scoped_callback_factory_.NewCallback( 217 &CacheInvalidationPacketHandler::HandleInboundPacket)); 218 listen_task->Start(); 219} 220 221CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { 222 invalidation::NetworkEndpoint* network_endpoint = 223 invalidation_client_->network_endpoint(); 224 CHECK(network_endpoint); 225 network_endpoint->RegisterOutboundListener(NULL); 226} 227 228void CacheInvalidationPacketHandler::HandleOutboundPacket( 229 invalidation::NetworkEndpoint* const& network_endpoint) { 230 CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint()); 231 invalidation::string message; 232 network_endpoint->TakeOutboundMessage(&message); 233 std::string encoded_message; 234 if (!base::Base64Encode(message, &encoded_message)) { 235 LOG(ERROR) << "Could not base64-encode message to send: " 236 << message; 237 return; 238 } 239 // Owned by base_task. 240 CacheInvalidationSendMessageTask* send_message_task = 241 new CacheInvalidationSendMessageTask(base_task_, 242 buzz::Jid(kBotJid), 243 encoded_message, 244 seq_, sid_); 245 send_message_task->Start(); 246 ++seq_; 247} 248 249void CacheInvalidationPacketHandler::HandleInboundPacket( 250 const std::string& packet) { 251 invalidation::NetworkEndpoint* network_endpoint = 252 invalidation_client_->network_endpoint(); 253 std::string decoded_message; 254 if (!base::Base64Decode(packet, &decoded_message)) { 255 LOG(ERROR) << "Could not base64-decode received message: " 256 << packet; 257 return; 258 } 259 network_endpoint->HandleInboundMessage(decoded_message); 260} 261 262} // namespace sync_notifier 263