cache_invalidation_packet_handler.cc revision ddb351dbec246cf1fab5ec20d2d5520909041de1
1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6
7#include <string>
8
9#include "base/base64.h"
10#include "base/callback.h"
11#include "base/compiler_specific.h"
12#include "base/logging.h"
13#include "base/rand_util.h"
14#include "base/string_number_conversions.h"
15#include "google/cacheinvalidation/invalidation-client.h"
16#include "jingle/notifier/listener/xml_element_util.h"
17#include "talk/xmpp/constants.h"
18#include "talk/xmpp/jid.h"
19#include "talk/xmpp/xmppclient.h"
20#include "talk/xmpp/xmpptask.h"
21
22namespace sync_notifier {
23
24namespace {
25
26const char kBotJid[] = "tango@bot.talk.google.com";
27const char kServiceUrl[] = "http://www.google.com/chrome/sync";
28
29const buzz::QName kQnData("google:notifier", "data");
30const buzz::QName kQnSeq("", "seq");
31const buzz::QName kQnSid("", "sid");
32const buzz::QName kQnServiceUrl("", "serviceUrl");
33
34// TODO(akalin): Move these task classes out so that they can be
35// unit-tested.  This'll probably be done easier once we consolidate
36// all the packet sending/receiving classes.
37
38// A task that listens for ClientInvalidation messages and calls the
39// given callback on them.
40class CacheInvalidationListenTask : public buzz::XmppTask {
41 public:
42  // Takes ownership of callback.
43  CacheInvalidationListenTask(Task* parent,
44                              Callback1<const std::string&>::Type* callback)
45      : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
46  virtual ~CacheInvalidationListenTask() {}
47
48  virtual int ProcessStart() {
49    VLOG(2) << "CacheInvalidationListenTask started";
50    return STATE_RESPONSE;
51  }
52
53  virtual int ProcessResponse() {
54    const buzz::XmlElement* stanza = NextStanza();
55    if (stanza == NULL) {
56      VLOG(2) << "CacheInvalidationListenTask blocked";
57      return STATE_BLOCKED;
58    }
59    VLOG(2) << "CacheInvalidationListenTask response received";
60    std::string data;
61    if (GetCacheInvalidationIqPacketData(stanza, &data)) {
62      callback_->Run(data);
63    } else {
64      LOG(ERROR) << "Could not get packet data";
65    }
66    // Acknowledge receipt of the iq to the buzz server.
67    // TODO(akalin): Send an error response for malformed packets.
68    scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
69    SendStanza(response_stanza.get());
70    return STATE_RESPONSE;
71  }
72
73  virtual bool HandleStanza(const buzz::XmlElement* stanza) {
74    VLOG(1) << "Stanza received: "
75              << notifier::XmlElementToString(*stanza);
76    if (IsValidCacheInvalidationIqPacket(stanza)) {
77      VLOG(2) << "Queueing stanza";
78      QueueStanza(stanza);
79      return true;
80    }
81    VLOG(2) << "Stanza skipped";
82    return false;
83  }
84
85 private:
86  bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
87    // We deliberately minimize the verification we do here: see
88    // http://crbug.com/71285 .
89    return MatchRequestIq(stanza, buzz::STR_SET, kQnData);
90  }
91
92  bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
93                            std::string* data) {
94    DCHECK(IsValidCacheInvalidationIqPacket(stanza));
95    const buzz::XmlElement* cache_invalidation_iq_packet =
96        stanza->FirstNamed(kQnData);
97    if (!cache_invalidation_iq_packet) {
98      LOG(ERROR) << "Could not find cache invalidation IQ packet element";
99      return false;
100    }
101    *data = cache_invalidation_iq_packet->BodyText();
102    return true;
103  }
104
105  scoped_ptr<Callback1<const std::string&>::Type> callback_;
106  DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
107};
108
109// A task that sends a single outbound ClientInvalidation message.
110class CacheInvalidationSendMessageTask : public buzz::XmppTask {
111 public:
112  CacheInvalidationSendMessageTask(Task* parent,
113                                   const buzz::Jid& to_jid,
114                                   const std::string& msg,
115                                   int seq,
116                                   const std::string& sid)
117      : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
118        to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {}
119  virtual ~CacheInvalidationSendMessageTask() {}
120
121  virtual int ProcessStart() {
122    scoped_ptr<buzz::XmlElement> stanza(
123        MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
124                                      seq_, sid_));
125    VLOG(1) << "Sending message: "
126              << notifier::XmlElementToString(*stanza.get());
127    if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
128      VLOG(2) << "Error when sending message";
129      return STATE_ERROR;
130    }
131    return STATE_RESPONSE;
132  }
133
134  virtual int ProcessResponse() {
135    const buzz::XmlElement* stanza = NextStanza();
136    if (stanza == NULL) {
137      VLOG(2) << "CacheInvalidationSendMessageTask blocked...";
138      return STATE_BLOCKED;
139    }
140    VLOG(2) << "CacheInvalidationSendMessageTask response received: "
141              << notifier::XmlElementToString(*stanza);
142    // TODO(akalin): Handle errors here.
143    return STATE_DONE;
144  }
145
146  virtual bool HandleStanza(const buzz::XmlElement* stanza) {
147    VLOG(1) << "Stanza received: "
148              << notifier::XmlElementToString(*stanza);
149    if (!MatchResponseIq(stanza, to_jid_, task_id())) {
150      VLOG(2) << "Stanza skipped";
151      return false;
152    }
153    VLOG(2) << "Queueing stanza";
154    QueueStanza(stanza);
155    return true;
156  }
157
158 private:
159  static buzz::XmlElement* MakeCacheInvalidationIqPacket(
160      const buzz::Jid& to_jid,
161      const std::string& task_id,
162      const std::string& msg,
163      int seq, const std::string& sid) {
164    buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
165    buzz::XmlElement* cache_invalidation_iq_packet =
166        new buzz::XmlElement(kQnData, true);
167    iq->AddElement(cache_invalidation_iq_packet);
168    cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq));
169    cache_invalidation_iq_packet->SetAttr(kQnSid, sid);
170    cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, kServiceUrl);
171    cache_invalidation_iq_packet->SetBodyText(msg);
172    return iq;
173  }
174
175  const buzz::Jid to_jid_;
176  std::string msg_;
177  int seq_;
178  std::string sid_;
179
180  DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
181};
182
183std::string MakeSid() {
184  uint64 sid = base::RandUint64();
185  return std::string("chrome-sync-") + base::Uint64ToString(sid);
186}
187
188}  // namespace
189
190CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
191    base::WeakPtr<talk_base::Task> base_task,
192    invalidation::InvalidationClient* invalidation_client)
193    : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
194      base_task_(base_task),
195      invalidation_client_(invalidation_client),
196      seq_(0),
197      sid_(MakeSid()) {
198  CHECK(base_task_.get());
199  // Owned by base_task.  Takes ownership of the callback.
200  CacheInvalidationListenTask* listen_task =
201      new CacheInvalidationListenTask(
202          base_task_, scoped_callback_factory_.NewCallback(
203              &CacheInvalidationPacketHandler::HandleInboundPacket));
204  listen_task->Start();
205}
206
207CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
208  DCHECK(non_thread_safe_.CalledOnValidThread());
209}
210
211void CacheInvalidationPacketHandler::HandleOutboundPacket(
212    invalidation::NetworkEndpoint* network_endpoint) {
213  DCHECK(non_thread_safe_.CalledOnValidThread());
214  if (!base_task_.get()) {
215    return;
216  }
217  CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
218  invalidation::string message;
219  network_endpoint->TakeOutboundMessage(&message);
220  std::string encoded_message;
221  if (!base::Base64Encode(message, &encoded_message)) {
222    LOG(ERROR) << "Could not base64-encode message to send: "
223               << message;
224    return;
225  }
226  // Owned by base_task_.
227  CacheInvalidationSendMessageTask* send_message_task =
228      new CacheInvalidationSendMessageTask(base_task_,
229                                           buzz::Jid(kBotJid),
230                                           encoded_message,
231                                           seq_, sid_);
232  send_message_task->Start();
233  ++seq_;
234}
235
236void CacheInvalidationPacketHandler::HandleInboundPacket(
237    const std::string& packet) {
238  DCHECK(non_thread_safe_.CalledOnValidThread());
239  invalidation::NetworkEndpoint* network_endpoint =
240      invalidation_client_->network_endpoint();
241  std::string decoded_message;
242  if (!base::Base64Decode(packet, &decoded_message)) {
243    LOG(ERROR) << "Could not base64-decode received message: "
244               << packet;
245    return;
246  }
247  network_endpoint->HandleInboundMessage(decoded_message);
248}
249
250}  // namespace sync_notifier
251