xmpp_channel.cc revision 5a1f600e9d7d26c36b3e22ff0dc0ae9e3b2425fc
1// Copyright 2015 The Weave Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "src/notification/xmpp_channel.h"
6
7#include <string>
8
9#include <base/bind.h>
10#include <weave/provider/network.h>
11#include <weave/provider/task_runner.h>
12
13#include "src/backoff_entry.h"
14#include "src/data_encoding.h"
15#include "src/notification/notification_delegate.h"
16#include "src/notification/notification_parser.h"
17#include "src/notification/xml_node.h"
18#include "src/privet/openssl_utils.h"
19#include "src/utils.h"
20
21namespace weave {
22
23namespace {
24
25std::string BuildXmppStartStreamCommand() {
26  return "<stream:stream to='clouddevices.gserviceaccount.com' "
27         "xmlns:stream='http://etherx.jabber.org/streams' "
28         "xml:lang='*' version='1.0' xmlns='jabber:client'>";
29}
30
31std::string BuildXmppAuthenticateCommand(const std::string& account,
32                                         const std::string& token) {
33  std::vector<uint8_t> credentials;
34  credentials.push_back(0);
35  credentials.insert(credentials.end(), account.begin(), account.end());
36  credentials.push_back(0);
37  credentials.insert(credentials.end(), token.begin(), token.end());
38  std::string msg =
39      "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
40      "mechanism='X-OAUTH2' auth:service='oauth2' "
41      "auth:allow-non-google-login='true' "
42      "auth:client-uses-full-bind-result='true' "
43      "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
44      Base64Encode(credentials) + "</auth>";
45  return msg;
46}
47
48// Backoff policy.
49// Note: In order to ensure a minimum of 20 seconds between server errors,
50// we have a 30s +- 10s (33%) jitter initial backoff.
51const BackoffEntry::Policy kDefaultBackoffPolicy = {
52    // Number of initial errors (in sequence) to ignore before applying
53    // exponential back-off rules.
54    0,
55
56    // Initial delay for exponential back-off in ms.
57    30 * 1000,  // 30 seconds.
58
59    // Factor by which the waiting time will be multiplied.
60    2,
61
62    // Fuzzing percentage. ex: 10% will spread requests randomly
63    // between 90%-100% of the calculated time.
64    0.33,  // 33%.
65
66    // Maximum amount of time we are willing to delay our request in ms.
67    10 * 60 * 1000,  // 10 minutes.
68
69    // Time to keep an entry from being discarded even when it
70    // has no significant state, -1 to never discard.
71    -1,
72
73    // Don't use initial delay unless the last request was an error.
74    false,
75};
76
77const char kDefaultXmppHost[] = "talk.google.com";
78const uint16_t kDefaultXmppPort = 5223;
79
80// Used for keeping connection alive.
81const int kRegularPingIntervalSeconds = 60;
82const int kRegularPingTimeoutSeconds = 30;
83
84// Used for diagnostic when connectivity changed.
85const int kAgressivePingIntervalSeconds = 5;
86const int kAgressivePingTimeoutSeconds = 10;
87
88const int kConnectingTimeoutAfterNetChangeSeconds = 30;
89
90}  // namespace
91
92XmppChannel::XmppChannel(const std::string& account,
93                         const std::string& access_token,
94                         provider::TaskRunner* task_runner,
95                         provider::Network* network)
96    : account_{account},
97      access_token_{access_token},
98      network_{network},
99      backoff_entry_{&kDefaultBackoffPolicy},
100      task_runner_{task_runner},
101      iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
102  read_socket_data_.resize(4096);
103  if (network) {
104    network->AddConnectionChangedCallback(base::Bind(
105        &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
106  }
107}
108
109void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) {
110  read_pending_ = false;
111  if (error)
112    return Restart();
113  std::string msg(read_socket_data_.data(), size);
114  VLOG(2) << "Received XMPP packet: '" << msg << "'";
115
116  if (!size)
117    return Restart();
118
119  stream_parser_.ParseData(msg);
120  WaitForMessage();
121}
122
123void XmppChannel::OnStreamStart(const std::string& node_name,
124                                std::map<std::string, std::string> attributes) {
125  VLOG(2) << "XMPP stream start: " << node_name;
126}
127
128void XmppChannel::OnStreamEnd(const std::string& node_name) {
129  VLOG(2) << "XMPP stream ended: " << node_name;
130  Stop();
131  if (IsConnected()) {
132    // If we had a fully-established connection, restart it now.
133    // However, if the connection has never been established yet (e.g.
134    // authorization failed), do not restart right now. Wait till we get
135    // new credentials.
136    task_runner_->PostDelayedTask(
137        FROM_HERE,
138        base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {});
139  } else if (delegate_) {
140    delegate_->OnPermanentFailure();
141  }
142}
143
144void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
145  // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
146  // from expat XML parser and some stanza could cause the XMPP stream to be
147  // reset and the parser to be re-initialized. We don't want to destroy the
148  // parser while it is performing a callback invocation.
149  task_runner_->PostDelayedTask(
150      FROM_HERE,
151      base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
152                 base::Passed(std::move(stanza))),
153      {});
154}
155
156void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
157  VLOG(2) << "XMPP stanza received: " << stanza->ToString();
158
159  switch (state_) {
160    case XmppState::kConnected:
161      if (stanza->name() == "stream:features") {
162        auto children = stanza->FindChildren("mechanisms/mechanism", false);
163        for (const auto& child : children) {
164          if (child->text() == "X-OAUTH2") {
165            state_ = XmppState::kAuthenticationStarted;
166            SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
167            return;
168          }
169        }
170      }
171      break;
172    case XmppState::kAuthenticationStarted:
173      if (stanza->name() == "success") {
174        state_ = XmppState::kStreamRestartedPostAuthentication;
175        RestartXmppStream();
176        return;
177      } else if (stanza->name() == "failure") {
178        if (stanza->FindFirstChild("not-authorized", false)) {
179          state_ = XmppState::kAuthenticationFailed;
180          return;
181        }
182      }
183      break;
184    case XmppState::kStreamRestartedPostAuthentication:
185      if (stanza->name() == "stream:features" &&
186          stanza->FindFirstChild("bind", false)) {
187        state_ = XmppState::kBindSent;
188        iq_stanza_handler_->SendRequest(
189            "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
190            base::Bind(&XmppChannel::OnBindCompleted,
191                       task_ptr_factory_.GetWeakPtr()),
192            base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
193        return;
194      }
195      break;
196    default:
197      if (stanza->name() == "message") {
198        HandleMessageStanza(std::move(stanza));
199        return;
200      } else if (stanza->name() == "iq") {
201        if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
202          LOG(ERROR) << "Failed to handle IQ stanza";
203          CloseStream();
204        }
205        return;
206      }
207      LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
208      return;
209  }
210  // Something bad happened. Close the stream and start over.
211  LOG(ERROR) << "Error condition occurred handling stanza: "
212             << stanza->ToString() << " in state: " << static_cast<int>(state_);
213  CloseStream();
214}
215
216void XmppChannel::CloseStream() {
217  SendMessage("</stream:stream>");
218}
219
220void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
221  if (reply->GetAttributeOrEmpty("type") != "result") {
222    CloseStream();
223    return;
224  }
225  const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
226  if (!jid_node) {
227    LOG(ERROR) << "XMPP Bind response is missing JID";
228    CloseStream();
229    return;
230  }
231
232  jid_ = jid_node->text();
233  state_ = XmppState::kSessionStarted;
234  iq_stanza_handler_->SendRequest(
235      "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
236      base::Bind(&XmppChannel::OnSessionEstablished,
237                 task_ptr_factory_.GetWeakPtr()),
238      base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
239}
240
241void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
242  if (reply->GetAttributeOrEmpty("type") != "result") {
243    CloseStream();
244    return;
245  }
246  state_ = XmppState::kSubscribeStarted;
247  std::string body =
248      "<subscribe xmlns='google:push'>"
249      "<item channel='cloud_devices' from=''/></subscribe>";
250  iq_stanza_handler_->SendRequest(
251      "set", "", account_, body,
252      base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
253      base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
254}
255
256void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
257  if (reply->GetAttributeOrEmpty("type") != "result") {
258    CloseStream();
259    return;
260  }
261  state_ = XmppState::kSubscribed;
262  if (delegate_)
263    delegate_->OnConnected(GetName());
264}
265
266void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
267  const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
268  if (!node) {
269    LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
270    return;
271  }
272  std::string data = node->text();
273  std::string json_data;
274  if (!Base64Decode(data, &json_data)) {
275    LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
276    return;
277  }
278
279  VLOG(2) << "XMPP push notification data: " << json_data;
280  auto json_dict = LoadJsonDict(json_data, nullptr);
281  if (json_dict && delegate_)
282    ParseNotificationJson(*json_dict, delegate_, GetName());
283}
284
285void XmppChannel::CreateSslSocket() {
286  CHECK(!stream_);
287  state_ = XmppState::kConnecting;
288  LOG(INFO) << "Starting XMPP connection to " << kDefaultXmppHost << ":"
289            << kDefaultXmppPort;
290
291  network_->OpenSslSocket(kDefaultXmppHost, kDefaultXmppPort,
292                          base::Bind(&XmppChannel::OnSslSocketReady,
293                                     task_ptr_factory_.GetWeakPtr()));
294}
295
296void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream,
297                                   ErrorPtr error) {
298  if (error) {
299    LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
300    backoff_entry_.InformOfRequest(false);
301
302    LOG(INFO) << "Delaying connection to XMPP server for "
303              << backoff_entry_.GetTimeUntilRelease();
304    return task_runner_->PostDelayedTask(
305        FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket,
306                              task_ptr_factory_.GetWeakPtr()),
307        backoff_entry_.GetTimeUntilRelease());
308  }
309  CHECK(XmppState::kConnecting == state_);
310  backoff_entry_.InformOfRequest(true);
311  stream_ = std::move(stream);
312  state_ = XmppState::kConnected;
313  RestartXmppStream();
314  ScheduleRegularPing();
315}
316
317void XmppChannel::SendMessage(const std::string& message) {
318  CHECK(stream_) << "No XMPP socket stream available";
319  if (write_pending_) {
320    queued_write_data_ += message;
321    return;
322  }
323  write_socket_data_ = queued_write_data_ + message;
324  queued_write_data_.clear();
325  VLOG(2) << "Sending XMPP message: " << message;
326
327  write_pending_ = true;
328  stream_->Write(
329      write_socket_data_.data(), write_socket_data_.size(),
330      base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()));
331}
332
333void XmppChannel::OnMessageSent(ErrorPtr error) {
334  write_pending_ = false;
335  if (error)
336    return Restart();
337  if (queued_write_data_.empty()) {
338    WaitForMessage();
339  } else {
340    SendMessage(std::string{});
341  }
342}
343
344void XmppChannel::WaitForMessage() {
345  if (read_pending_ || !stream_)
346    return;
347
348  read_pending_ = true;
349  stream_->Read(
350      read_socket_data_.data(), read_socket_data_.size(),
351      base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()));
352}
353
354std::string XmppChannel::GetName() const {
355  return "xmpp";
356}
357
358bool XmppChannel::IsConnected() const {
359  return state_ == XmppState::kSubscribed;
360}
361
362void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
363  // No extra parameters needed for XMPP.
364}
365
366void XmppChannel::Restart() {
367  LOG(INFO) << "Restarting XMPP";
368  Stop();
369  Start(delegate_);
370}
371
372void XmppChannel::Start(NotificationDelegate* delegate) {
373  CHECK(state_ == XmppState::kNotStarted);
374  delegate_ = delegate;
375
376  CreateSslSocket();
377}
378
379void XmppChannel::Stop() {
380  if (IsConnected() && delegate_)
381    delegate_->OnDisconnected();
382
383  task_ptr_factory_.InvalidateWeakPtrs();
384  ping_ptr_factory_.InvalidateWeakPtrs();
385
386  stream_.reset();
387  state_ = XmppState::kNotStarted;
388}
389
390void XmppChannel::RestartXmppStream() {
391  stream_parser_.Reset();
392  stream_->CancelPendingOperations();
393  read_pending_ = false;
394  write_pending_ = false;
395  SendMessage(BuildXmppStartStreamCommand());
396}
397
398void XmppChannel::SchedulePing(base::TimeDelta interval,
399                               base::TimeDelta timeout) {
400  VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
401  ping_ptr_factory_.InvalidateWeakPtrs();
402  task_runner_->PostDelayedTask(
403      FROM_HERE, base::Bind(&XmppChannel::PingServer,
404                            ping_ptr_factory_.GetWeakPtr(), timeout),
405      interval);
406}
407
408void XmppChannel::ScheduleRegularPing() {
409  SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
410               base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
411}
412
413void XmppChannel::ScheduleFastPing() {
414  SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
415               base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
416}
417
418void XmppChannel::PingServer(base::TimeDelta timeout) {
419  VLOG(1) << "Sending XMPP ping";
420  if (!IsConnected()) {
421    LOG(WARNING) << "XMPP channel is not connected";
422    Restart();
423    return;
424  }
425
426  // Send an XMPP Ping request as defined in XEP-0199 extension:
427  // http://xmpp.org/extensions/xep-0199.html
428  iq_stanza_handler_->SendRequestWithCustomTimeout(
429      "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
430      base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
431                 base::Time::Now()),
432      base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
433                 base::Time::Now()));
434}
435
436void XmppChannel::OnPingResponse(base::Time sent_time,
437                                 std::unique_ptr<XmlNode> reply) {
438  VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
439  // Ping response received from server. Everything seems to be in order.
440  // Reschedule with default intervals.
441  ScheduleRegularPing();
442}
443
444void XmppChannel::OnPingTimeout(base::Time sent_time) {
445  LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
446               << (base::Time::Now() - sent_time);
447  Restart();
448}
449
450void XmppChannel::OnConnectivityChanged() {
451  if (state_ == XmppState::kNotStarted)
452    return;
453
454  if (state_ == XmppState::kConnecting &&
455      backoff_entry_.GetTimeUntilRelease() <
456          base::TimeDelta::FromSeconds(
457              kConnectingTimeoutAfterNetChangeSeconds)) {
458    VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
459    return;
460  }
461
462  ScheduleFastPing();
463}
464
465}  // namespace weave
466