1// Copyright 2015 The Weave Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "src/notification/xmpp_channel.h"
6
7#include <string>
8
9#include <base/bind.h>
10#include <base/strings/string_number_conversions.h>
11#include <weave/provider/network.h>
12#include <weave/provider/task_runner.h>
13
14#include "src/backoff_entry.h"
15#include "src/data_encoding.h"
16#include "src/notification/notification_delegate.h"
17#include "src/notification/notification_parser.h"
18#include "src/notification/xml_node.h"
19#include "src/privet/openssl_utils.h"
20#include "src/string_utils.h"
21#include "src/utils.h"
22
23namespace weave {
24
25namespace {
26
27std::string BuildXmppStartStreamCommand() {
28  return "<stream:stream to='clouddevices.gserviceaccount.com' "
29         "xmlns:stream='http://etherx.jabber.org/streams' "
30         "xml:lang='*' version='1.0' xmlns='jabber:client'>";
31}
32
33std::string BuildXmppAuthenticateCommand(const std::string& account,
34                                         const std::string& token) {
35  std::vector<uint8_t> credentials;
36  credentials.push_back(0);
37  credentials.insert(credentials.end(), account.begin(), account.end());
38  credentials.push_back(0);
39  credentials.insert(credentials.end(), token.begin(), token.end());
40  std::string msg =
41      "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
42      "mechanism='X-OAUTH2' auth:service='oauth2' "
43      "auth:allow-non-google-login='true' "
44      "auth:client-uses-full-bind-result='true' "
45      "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
46      Base64Encode(credentials) + "</auth>";
47  return msg;
48}
49
50// Backoff policy.
51// Note: In order to ensure a minimum of 20 seconds between server errors,
52// we have a 30s +- 10s (33%) jitter initial backoff.
53const BackoffEntry::Policy kDefaultBackoffPolicy = {
54    // Number of initial errors (in sequence) to ignore before applying
55    // exponential back-off rules.
56    0,
57
58    // Initial delay for exponential back-off in ms.
59    30 * 1000,  // 30 seconds.
60
61    // Factor by which the waiting time will be multiplied.
62    2,
63
64    // Fuzzing percentage. ex: 10% will spread requests randomly
65    // between 90%-100% of the calculated time.
66    0.33,  // 33%.
67
68    // Maximum amount of time we are willing to delay our request in ms.
69    10 * 60 * 1000,  // 10 minutes.
70
71    // Time to keep an entry from being discarded even when it
72    // has no significant state, -1 to never discard.
73    -1,
74
75    // Don't use initial delay unless the last request was an error.
76    false,
77};
78
79// Used for keeping connection alive.
80const int kRegularPingIntervalSeconds = 60;
81const int kRegularPingTimeoutSeconds = 30;
82
83// Used for diagnostic when connectivity changed.
84const int kAgressivePingIntervalSeconds = 5;
85const int kAgressivePingTimeoutSeconds = 10;
86
87const int kConnectingTimeoutAfterNetChangeSeconds = 30;
88
89}  // namespace
90
91XmppChannel::XmppChannel(const std::string& account,
92                         const std::string& access_token,
93                         const std::string& xmpp_endpoint,
94                         provider::TaskRunner* task_runner,
95                         provider::Network* network)
96    : account_{account},
97      access_token_{access_token},
98      xmpp_endpoint_{xmpp_endpoint},
99      network_{network},
100      backoff_entry_{&kDefaultBackoffPolicy},
101      task_runner_{task_runner},
102      iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
103  read_socket_data_.resize(4096);
104  if (network) {
105    network->AddConnectionChangedCallback(base::Bind(
106        &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
107  }
108}
109
110void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) {
111  read_pending_ = false;
112  if (error)
113    return Restart();
114  std::string msg(read_socket_data_.data(), size);
115  VLOG(2) << "Received XMPP packet: '" << msg << "'";
116
117  if (!size)
118    return Restart();
119
120  stream_parser_.ParseData(msg);
121  WaitForMessage();
122}
123
124void XmppChannel::OnStreamStart(const std::string& node_name,
125                                std::map<std::string, std::string> attributes) {
126  VLOG(2) << "XMPP stream start: " << node_name;
127}
128
129void XmppChannel::OnStreamEnd(const std::string& node_name) {
130  VLOG(2) << "XMPP stream ended: " << node_name;
131  Stop();
132  if (IsConnected()) {
133    // If we had a fully-established connection, restart it now.
134    // However, if the connection has never been established yet (e.g.
135    // authorization failed), do not restart right now. Wait till we get
136    // new credentials.
137    task_runner_->PostDelayedTask(
138        FROM_HERE,
139        base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {});
140  } else if (delegate_) {
141    delegate_->OnPermanentFailure();
142  }
143}
144
145void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
146  // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
147  // from expat XML parser and some stanza could cause the XMPP stream to be
148  // reset and the parser to be re-initialized. We don't want to destroy the
149  // parser while it is performing a callback invocation.
150  task_runner_->PostDelayedTask(
151      FROM_HERE,
152      base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
153                 base::Passed(std::move(stanza))),
154      {});
155}
156
157void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
158  VLOG(2) << "XMPP stanza received: " << stanza->ToString();
159
160  switch (state_) {
161    case XmppState::kConnected:
162      if (stanza->name() == "stream:features") {
163        auto children = stanza->FindChildren("mechanisms/mechanism", false);
164        for (const auto& child : children) {
165          if (child->text() == "X-OAUTH2") {
166            state_ = XmppState::kAuthenticationStarted;
167            SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
168            return;
169          }
170        }
171      }
172      break;
173    case XmppState::kAuthenticationStarted:
174      if (stanza->name() == "success") {
175        state_ = XmppState::kStreamRestartedPostAuthentication;
176        RestartXmppStream();
177        return;
178      } else if (stanza->name() == "failure") {
179        if (stanza->FindFirstChild("not-authorized", false)) {
180          state_ = XmppState::kAuthenticationFailed;
181          return;
182        }
183      }
184      break;
185    case XmppState::kStreamRestartedPostAuthentication:
186      if (stanza->name() == "stream:features" &&
187          stanza->FindFirstChild("bind", false)) {
188        state_ = XmppState::kBindSent;
189        iq_stanza_handler_->SendRequest(
190            "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
191            base::Bind(&XmppChannel::OnBindCompleted,
192                       task_ptr_factory_.GetWeakPtr()),
193            base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
194        return;
195      }
196      break;
197    default:
198      if (stanza->name() == "message") {
199        HandleMessageStanza(std::move(stanza));
200        return;
201      } else if (stanza->name() == "iq") {
202        if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
203          LOG(ERROR) << "Failed to handle IQ stanza";
204          CloseStream();
205        }
206        return;
207      }
208      LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
209      return;
210  }
211  // Something bad happened. Close the stream and start over.
212  LOG(ERROR) << "Error condition occurred handling stanza: "
213             << stanza->ToString() << " in state: " << static_cast<int>(state_);
214  CloseStream();
215}
216
217void XmppChannel::CloseStream() {
218  SendMessage("</stream:stream>");
219}
220
221void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
222  if (reply->GetAttributeOrEmpty("type") != "result") {
223    CloseStream();
224    return;
225  }
226  const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
227  if (!jid_node) {
228    LOG(ERROR) << "XMPP Bind response is missing JID";
229    CloseStream();
230    return;
231  }
232
233  jid_ = jid_node->text();
234  state_ = XmppState::kSessionStarted;
235  iq_stanza_handler_->SendRequest(
236      "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
237      base::Bind(&XmppChannel::OnSessionEstablished,
238                 task_ptr_factory_.GetWeakPtr()),
239      base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
240}
241
242void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
243  if (reply->GetAttributeOrEmpty("type") != "result") {
244    CloseStream();
245    return;
246  }
247  state_ = XmppState::kSubscribeStarted;
248  std::string body =
249      "<subscribe xmlns='google:push'>"
250      "<item channel='cloud_devices' from=''/></subscribe>";
251  iq_stanza_handler_->SendRequest(
252      "set", "", account_, body,
253      base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
254      base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
255}
256
257void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
258  if (reply->GetAttributeOrEmpty("type") != "result") {
259    CloseStream();
260    return;
261  }
262  state_ = XmppState::kSubscribed;
263  if (delegate_)
264    delegate_->OnConnected(GetName());
265}
266
267void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
268  const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
269  if (!node) {
270    LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
271    return;
272  }
273  std::string data = node->text();
274  std::string json_data;
275  if (!Base64Decode(data, &json_data)) {
276    LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
277    return;
278  }
279
280  VLOG(2) << "XMPP push notification data: " << json_data;
281  auto json_dict = LoadJsonDict(json_data, nullptr);
282  if (json_dict && delegate_)
283    ParseNotificationJson(*json_dict, delegate_, GetName());
284}
285
286void XmppChannel::CreateSslSocket() {
287  CHECK(!stream_);
288  state_ = XmppState::kConnecting;
289  LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_;
290
291  std::pair<std::string, std::string> host_port =
292      SplitAtFirst(xmpp_endpoint_, ":", true);
293  CHECK(!host_port.first.empty());
294  CHECK(!host_port.second.empty());
295  uint32_t port = 0;
296  CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_;
297
298  network_->OpenSslSocket(host_port.first, port,
299                          base::Bind(&XmppChannel::OnSslSocketReady,
300                                     task_ptr_factory_.GetWeakPtr()));
301}
302
303void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream,
304                                   ErrorPtr error) {
305  if (error) {
306    LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
307    backoff_entry_.InformOfRequest(false);
308
309    LOG(INFO) << "Delaying connection to XMPP server for "
310              << backoff_entry_.GetTimeUntilRelease();
311    return task_runner_->PostDelayedTask(
312        FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket,
313                              task_ptr_factory_.GetWeakPtr()),
314        backoff_entry_.GetTimeUntilRelease());
315  }
316  CHECK(XmppState::kConnecting == state_);
317  backoff_entry_.InformOfRequest(true);
318  stream_ = std::move(stream);
319  state_ = XmppState::kConnected;
320  RestartXmppStream();
321  ScheduleRegularPing();
322}
323
324void XmppChannel::SendMessage(const std::string& message) {
325  CHECK(stream_) << "No XMPP socket stream available";
326  if (write_pending_) {
327    queued_write_data_ += message;
328    return;
329  }
330  write_socket_data_ = queued_write_data_ + message;
331  queued_write_data_.clear();
332  VLOG(2) << "Sending XMPP message: " << message;
333
334  write_pending_ = true;
335  stream_->Write(
336      write_socket_data_.data(), write_socket_data_.size(),
337      base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()));
338}
339
340void XmppChannel::OnMessageSent(ErrorPtr error) {
341  write_pending_ = false;
342  if (error)
343    return Restart();
344  if (queued_write_data_.empty()) {
345    WaitForMessage();
346  } else {
347    SendMessage(std::string{});
348  }
349}
350
351void XmppChannel::WaitForMessage() {
352  if (read_pending_ || !stream_)
353    return;
354
355  read_pending_ = true;
356  stream_->Read(
357      read_socket_data_.data(), read_socket_data_.size(),
358      base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()));
359}
360
361std::string XmppChannel::GetName() const {
362  return "xmpp";
363}
364
365bool XmppChannel::IsConnected() const {
366  return state_ == XmppState::kSubscribed;
367}
368
369void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
370  // No extra parameters needed for XMPP.
371}
372
373void XmppChannel::Restart() {
374  LOG(INFO) << "Restarting XMPP";
375  Stop();
376  Start(delegate_);
377}
378
379void XmppChannel::Start(NotificationDelegate* delegate) {
380  CHECK(state_ == XmppState::kNotStarted);
381  delegate_ = delegate;
382
383  CreateSslSocket();
384}
385
386void XmppChannel::Stop() {
387  if (IsConnected() && delegate_)
388    delegate_->OnDisconnected();
389
390  task_ptr_factory_.InvalidateWeakPtrs();
391  ping_ptr_factory_.InvalidateWeakPtrs();
392
393  stream_.reset();
394  state_ = XmppState::kNotStarted;
395}
396
397void XmppChannel::RestartXmppStream() {
398  stream_parser_.Reset();
399  stream_->CancelPendingOperations();
400  read_pending_ = false;
401  write_pending_ = false;
402  SendMessage(BuildXmppStartStreamCommand());
403}
404
405void XmppChannel::SchedulePing(base::TimeDelta interval,
406                               base::TimeDelta timeout) {
407  VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
408  ping_ptr_factory_.InvalidateWeakPtrs();
409  task_runner_->PostDelayedTask(
410      FROM_HERE, base::Bind(&XmppChannel::PingServer,
411                            ping_ptr_factory_.GetWeakPtr(), timeout),
412      interval);
413}
414
415void XmppChannel::ScheduleRegularPing() {
416  SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
417               base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
418}
419
420void XmppChannel::ScheduleFastPing() {
421  SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
422               base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
423}
424
425void XmppChannel::PingServer(base::TimeDelta timeout) {
426  VLOG(1) << "Sending XMPP ping";
427  if (!IsConnected()) {
428    LOG(WARNING) << "XMPP channel is not connected";
429    Restart();
430    return;
431  }
432
433  // Send an XMPP Ping request as defined in XEP-0199 extension:
434  // http://xmpp.org/extensions/xep-0199.html
435  iq_stanza_handler_->SendRequestWithCustomTimeout(
436      "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
437      base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
438                 base::Time::Now()),
439      base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
440                 base::Time::Now()));
441}
442
443void XmppChannel::OnPingResponse(base::Time sent_time,
444                                 std::unique_ptr<XmlNode> reply) {
445  VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
446  // Ping response received from server. Everything seems to be in order.
447  // Reschedule with default intervals.
448  ScheduleRegularPing();
449}
450
451void XmppChannel::OnPingTimeout(base::Time sent_time) {
452  LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
453               << (base::Time::Now() - sent_time);
454  Restart();
455}
456
457void XmppChannel::OnConnectivityChanged() {
458  if (state_ == XmppState::kNotStarted)
459    return;
460
461  if (state_ == XmppState::kConnecting &&
462      backoff_entry_.GetTimeUntilRelease() <
463          base::TimeDelta::FromSeconds(
464              kConnectingTimeoutAfterNetChangeSeconds)) {
465    VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
466    return;
467  }
468
469  ScheduleFastPing();
470}
471
472}  // namespace weave
473