cache_invalidation_packet_handler.cc revision 3345a6884c488ff3a535c2c9acdd33d74b37e311
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6
7#include <string>
8
9#include "base/base64.h"
10#include "base/callback.h"
11#include "base/compiler_specific.h"
12#include "base/logging.h"
13#include "base/rand_util.h"
14#include "base/string_number_conversions.h"
15#include "chrome/browser/sync/sync_constants.h"
16#include "google/cacheinvalidation/invalidation-client.h"
17#include "jingle/notifier/listener/xml_element_util.h"
18#include "talk/xmpp/constants.h"
19#include "talk/xmpp/jid.h"
20#include "talk/xmpp/xmppclient.h"
21#include "talk/xmpp/xmpptask.h"
22
23namespace sync_notifier {
24
25namespace {
26
27const char kBotJid[] = "tango@bot.talk.google.com";
28
29const buzz::QName kQnData("google:notifier", "data");
30const buzz::QName kQnSeq("", "seq");
31const buzz::QName kQnSid("", "sid");
32const buzz::QName kQnServiceUrl("", "serviceUrl");
33
34// TODO(akalin): Move these task classes out so that they can be
35// unit-tested.  This'll probably be done easier once we consolidate
36// all the packet sending/receiving classes.
37
38// A task that listens for ClientInvalidation messages and calls the
39// given callback on them.
40class CacheInvalidationListenTask : public buzz::XmppTask {
41 public:
42  // Takes ownership of callback.
43  CacheInvalidationListenTask(Task* parent,
44                              Callback1<const std::string&>::Type* callback)
45      : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
46  virtual ~CacheInvalidationListenTask() {}
47
48  virtual int ProcessStart() {
49    VLOG(2) << "CacheInvalidationListenTask started";
50    return STATE_RESPONSE;
51  }
52
53  virtual int ProcessResponse() {
54    const buzz::XmlElement* stanza = NextStanza();
55    if (stanza == NULL) {
56      VLOG(2) << "CacheInvalidationListenTask blocked";
57      return STATE_BLOCKED;
58    }
59    VLOG(2) << "CacheInvalidationListenTask response received";
60    std::string data;
61    if (GetCacheInvalidationIqPacketData(stanza, &data)) {
62      callback_->Run(data);
63    } else {
64      LOG(ERROR) << "Could not get packet data";
65    }
66    // Acknowledge receipt of the iq to the buzz server.
67    // TODO(akalin): Send an error response for malformed packets.
68    scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
69    SendStanza(response_stanza.get());
70    return STATE_RESPONSE;
71  }
72
73  virtual bool HandleStanza(const buzz::XmlElement* stanza) {
74    VLOG(1) << "Stanza received: "
75              << notifier::XmlElementToString(*stanza);
76    if (IsValidCacheInvalidationIqPacket(stanza)) {
77      VLOG(2) << "Queueing stanza";
78      QueueStanza(stanza);
79      return true;
80    }
81    VLOG(2) << "Stanza skipped";
82    return false;
83  }
84
85 private:
86  bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
87    // We make sure to compare jids (which are normalized) instead of
88    // just strings -- server may use non-normalized jids in
89    // attributes.
90    //
91    // TODO(akalin): Add unit tests for this.
92    buzz::Jid to(stanza->Attr(buzz::QN_TO));
93    return
94        (MatchRequestIq(stanza, buzz::STR_SET, kQnData) &&
95         (to == GetClient()->jid()));
96  }
97
98  bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
99                            std::string* data) {
100    DCHECK(IsValidCacheInvalidationIqPacket(stanza));
101    const buzz::XmlElement* cache_invalidation_iq_packet =
102        stanza->FirstNamed(kQnData);
103    if (!cache_invalidation_iq_packet) {
104      LOG(ERROR) << "Could not find cache invalidation IQ packet element";
105      return false;
106    }
107    *data = cache_invalidation_iq_packet->BodyText();
108    return true;
109  }
110
111  scoped_ptr<Callback1<const std::string&>::Type> callback_;
112  DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
113};
114
115// A task that sends a single outbound ClientInvalidation message.
116class CacheInvalidationSendMessageTask : public buzz::XmppTask {
117 public:
118  CacheInvalidationSendMessageTask(Task* parent,
119                                   const buzz::Jid& to_jid,
120                                   const std::string& msg,
121                                   int seq,
122                                   const std::string& sid)
123      : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
124        to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {}
125  virtual ~CacheInvalidationSendMessageTask() {}
126
127  virtual int ProcessStart() {
128    scoped_ptr<buzz::XmlElement> stanza(
129        MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
130                                      seq_, sid_));
131    VLOG(1) << "Sending message: "
132              << notifier::XmlElementToString(*stanza.get());
133    if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
134      VLOG(2) << "Error when sending message";
135      return STATE_ERROR;
136    }
137    return STATE_RESPONSE;
138  }
139
140  virtual int ProcessResponse() {
141    const buzz::XmlElement* stanza = NextStanza();
142    if (stanza == NULL) {
143      VLOG(2) << "CacheInvalidationSendMessageTask blocked...";
144      return STATE_BLOCKED;
145    }
146    VLOG(2) << "CacheInvalidationSendMessageTask response received: "
147              << notifier::XmlElementToString(*stanza);
148    // TODO(akalin): Handle errors here.
149    return STATE_DONE;
150  }
151
152  virtual bool HandleStanza(const buzz::XmlElement* stanza) {
153    VLOG(1) << "Stanza received: "
154              << notifier::XmlElementToString(*stanza);
155    if (!MatchResponseIq(stanza, to_jid_, task_id())) {
156      VLOG(2) << "Stanza skipped";
157      return false;
158    }
159    VLOG(2) << "Queueing stanza";
160    QueueStanza(stanza);
161    return true;
162  }
163
164 private:
165  static buzz::XmlElement* MakeCacheInvalidationIqPacket(
166      const buzz::Jid& to_jid,
167      const std::string& task_id,
168      const std::string& msg,
169      int seq, const std::string& sid) {
170    buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
171    buzz::XmlElement* cache_invalidation_iq_packet =
172        new buzz::XmlElement(kQnData, true);
173    iq->AddElement(cache_invalidation_iq_packet);
174    cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq));
175    cache_invalidation_iq_packet->SetAttr(kQnSid, sid);
176    cache_invalidation_iq_packet->SetAttr(kQnServiceUrl,
177                                          browser_sync::kSyncServiceUrl);
178    cache_invalidation_iq_packet->SetBodyText(msg);
179    return iq;
180  }
181
182  const buzz::Jid to_jid_;
183  std::string msg_;
184  int seq_;
185  std::string sid_;
186
187  DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
188};
189
190std::string MakeSid() {
191  uint64 sid = base::RandUint64();
192  return std::string("chrome-sync-") + base::Uint64ToString(sid);
193}
194
195}  // namespace
196
197CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
198    talk_base::Task* base_task,
199    invalidation::InvalidationClient* invalidation_client)
200    : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
201      base_task_(base_task),
202      invalidation_client_(invalidation_client),
203      seq_(0),
204      sid_(MakeSid()) {
205  CHECK(base_task_);
206  CHECK(invalidation_client_);
207  invalidation::NetworkEndpoint* network_endpoint =
208      invalidation_client_->network_endpoint();
209  CHECK(network_endpoint);
210  network_endpoint->RegisterOutboundListener(
211      scoped_callback_factory_.NewCallback(
212          &CacheInvalidationPacketHandler::HandleOutboundPacket));
213  // Owned by base_task.
214  CacheInvalidationListenTask* listen_task =
215      new CacheInvalidationListenTask(
216          base_task, scoped_callback_factory_.NewCallback(
217              &CacheInvalidationPacketHandler::HandleInboundPacket));
218  listen_task->Start();
219}
220
221CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
222  invalidation::NetworkEndpoint* network_endpoint =
223      invalidation_client_->network_endpoint();
224  CHECK(network_endpoint);
225  network_endpoint->RegisterOutboundListener(NULL);
226}
227
228void CacheInvalidationPacketHandler::HandleOutboundPacket(
229    invalidation::NetworkEndpoint* const& network_endpoint) {
230  CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
231  invalidation::string message;
232  network_endpoint->TakeOutboundMessage(&message);
233  std::string encoded_message;
234  if (!base::Base64Encode(message, &encoded_message)) {
235    LOG(ERROR) << "Could not base64-encode message to send: "
236               << message;
237    return;
238  }
239  // Owned by base_task.
240  CacheInvalidationSendMessageTask* send_message_task =
241      new CacheInvalidationSendMessageTask(base_task_,
242                                           buzz::Jid(kBotJid),
243                                           encoded_message,
244                                           seq_, sid_);
245  send_message_task->Start();
246  ++seq_;
247}
248
249void CacheInvalidationPacketHandler::HandleInboundPacket(
250    const std::string& packet) {
251  invalidation::NetworkEndpoint* network_endpoint =
252      invalidation_client_->network_endpoint();
253  std::string decoded_message;
254  if (!base::Base64Decode(packet, &decoded_message)) {
255    LOG(ERROR) << "Could not base64-decode received message: "
256               << packet;
257    return;
258  }
259  network_endpoint->HandleInboundMessage(decoded_message);
260}
261
262}  // namespace sync_notifier
263