1// Copyright 2015 The Weave Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "src/notification/xmpp_channel.h" 6 7#include <string> 8 9#include <base/bind.h> 10#include <base/strings/string_number_conversions.h> 11#include <weave/provider/network.h> 12#include <weave/provider/task_runner.h> 13 14#include "src/backoff_entry.h" 15#include "src/data_encoding.h" 16#include "src/notification/notification_delegate.h" 17#include "src/notification/notification_parser.h" 18#include "src/notification/xml_node.h" 19#include "src/privet/openssl_utils.h" 20#include "src/string_utils.h" 21#include "src/utils.h" 22 23namespace weave { 24 25namespace { 26 27std::string BuildXmppStartStreamCommand() { 28 return "<stream:stream to='clouddevices.gserviceaccount.com' " 29 "xmlns:stream='http://etherx.jabber.org/streams' " 30 "xml:lang='*' version='1.0' xmlns='jabber:client'>"; 31} 32 33std::string BuildXmppAuthenticateCommand(const std::string& account, 34 const std::string& token) { 35 std::vector<uint8_t> credentials; 36 credentials.push_back(0); 37 credentials.insert(credentials.end(), account.begin(), account.end()); 38 credentials.push_back(0); 39 credentials.insert(credentials.end(), token.begin(), token.end()); 40 std::string msg = 41 "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' " 42 "mechanism='X-OAUTH2' auth:service='oauth2' " 43 "auth:allow-non-google-login='true' " 44 "auth:client-uses-full-bind-result='true' " 45 "xmlns:auth='http://www.google.com/talk/protocol/auth'>" + 46 Base64Encode(credentials) + "</auth>"; 47 return msg; 48} 49 50// Backoff policy. 51// Note: In order to ensure a minimum of 20 seconds between server errors, 52// we have a 30s +- 10s (33%) jitter initial backoff. 53const BackoffEntry::Policy kDefaultBackoffPolicy = { 54 // Number of initial errors (in sequence) to ignore before applying 55 // exponential back-off rules. 56 0, 57 58 // Initial delay for exponential back-off in ms. 59 30 * 1000, // 30 seconds. 60 61 // Factor by which the waiting time will be multiplied. 62 2, 63 64 // Fuzzing percentage. ex: 10% will spread requests randomly 65 // between 90%-100% of the calculated time. 66 0.33, // 33%. 67 68 // Maximum amount of time we are willing to delay our request in ms. 69 10 * 60 * 1000, // 10 minutes. 70 71 // Time to keep an entry from being discarded even when it 72 // has no significant state, -1 to never discard. 73 -1, 74 75 // Don't use initial delay unless the last request was an error. 76 false, 77}; 78 79// Used for keeping connection alive. 80const int kRegularPingIntervalSeconds = 60; 81const int kRegularPingTimeoutSeconds = 30; 82 83// Used for diagnostic when connectivity changed. 84const int kAgressivePingIntervalSeconds = 5; 85const int kAgressivePingTimeoutSeconds = 10; 86 87const int kConnectingTimeoutAfterNetChangeSeconds = 30; 88 89} // namespace 90 91XmppChannel::XmppChannel(const std::string& account, 92 const std::string& access_token, 93 const std::string& xmpp_endpoint, 94 provider::TaskRunner* task_runner, 95 provider::Network* network) 96 : account_{account}, 97 access_token_{access_token}, 98 xmpp_endpoint_{xmpp_endpoint}, 99 network_{network}, 100 backoff_entry_{&kDefaultBackoffPolicy}, 101 task_runner_{task_runner}, 102 iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} { 103 read_socket_data_.resize(4096); 104 if (network) { 105 network->AddConnectionChangedCallback(base::Bind( 106 &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr())); 107 } 108} 109 110void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) { 111 read_pending_ = false; 112 if (error) 113 return Restart(); 114 std::string msg(read_socket_data_.data(), size); 115 VLOG(2) << "Received XMPP packet: '" << msg << "'"; 116 117 if (!size) 118 return Restart(); 119 120 stream_parser_.ParseData(msg); 121 WaitForMessage(); 122} 123 124void XmppChannel::OnStreamStart(const std::string& node_name, 125 std::map<std::string, std::string> attributes) { 126 VLOG(2) << "XMPP stream start: " << node_name; 127} 128 129void XmppChannel::OnStreamEnd(const std::string& node_name) { 130 VLOG(2) << "XMPP stream ended: " << node_name; 131 Stop(); 132 if (IsConnected()) { 133 // If we had a fully-established connection, restart it now. 134 // However, if the connection has never been established yet (e.g. 135 // authorization failed), do not restart right now. Wait till we get 136 // new credentials. 137 task_runner_->PostDelayedTask( 138 FROM_HERE, 139 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {}); 140 } else if (delegate_) { 141 delegate_->OnPermanentFailure(); 142 } 143} 144 145void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) { 146 // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback 147 // from expat XML parser and some stanza could cause the XMPP stream to be 148 // reset and the parser to be re-initialized. We don't want to destroy the 149 // parser while it is performing a callback invocation. 150 task_runner_->PostDelayedTask( 151 FROM_HERE, 152 base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(), 153 base::Passed(std::move(stanza))), 154 {}); 155} 156 157void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) { 158 VLOG(2) << "XMPP stanza received: " << stanza->ToString(); 159 160 switch (state_) { 161 case XmppState::kConnected: 162 if (stanza->name() == "stream:features") { 163 auto children = stanza->FindChildren("mechanisms/mechanism", false); 164 for (const auto& child : children) { 165 if (child->text() == "X-OAUTH2") { 166 state_ = XmppState::kAuthenticationStarted; 167 SendMessage(BuildXmppAuthenticateCommand(account_, access_token_)); 168 return; 169 } 170 } 171 } 172 break; 173 case XmppState::kAuthenticationStarted: 174 if (stanza->name() == "success") { 175 state_ = XmppState::kStreamRestartedPostAuthentication; 176 RestartXmppStream(); 177 return; 178 } else if (stanza->name() == "failure") { 179 if (stanza->FindFirstChild("not-authorized", false)) { 180 state_ = XmppState::kAuthenticationFailed; 181 return; 182 } 183 } 184 break; 185 case XmppState::kStreamRestartedPostAuthentication: 186 if (stanza->name() == "stream:features" && 187 stanza->FindFirstChild("bind", false)) { 188 state_ = XmppState::kBindSent; 189 iq_stanza_handler_->SendRequest( 190 "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>", 191 base::Bind(&XmppChannel::OnBindCompleted, 192 task_ptr_factory_.GetWeakPtr()), 193 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); 194 return; 195 } 196 break; 197 default: 198 if (stanza->name() == "message") { 199 HandleMessageStanza(std::move(stanza)); 200 return; 201 } else if (stanza->name() == "iq") { 202 if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) { 203 LOG(ERROR) << "Failed to handle IQ stanza"; 204 CloseStream(); 205 } 206 return; 207 } 208 LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString(); 209 return; 210 } 211 // Something bad happened. Close the stream and start over. 212 LOG(ERROR) << "Error condition occurred handling stanza: " 213 << stanza->ToString() << " in state: " << static_cast<int>(state_); 214 CloseStream(); 215} 216 217void XmppChannel::CloseStream() { 218 SendMessage("</stream:stream>"); 219} 220 221void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) { 222 if (reply->GetAttributeOrEmpty("type") != "result") { 223 CloseStream(); 224 return; 225 } 226 const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false); 227 if (!jid_node) { 228 LOG(ERROR) << "XMPP Bind response is missing JID"; 229 CloseStream(); 230 return; 231 } 232 233 jid_ = jid_node->text(); 234 state_ = XmppState::kSessionStarted; 235 iq_stanza_handler_->SendRequest( 236 "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>", 237 base::Bind(&XmppChannel::OnSessionEstablished, 238 task_ptr_factory_.GetWeakPtr()), 239 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); 240} 241 242void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) { 243 if (reply->GetAttributeOrEmpty("type") != "result") { 244 CloseStream(); 245 return; 246 } 247 state_ = XmppState::kSubscribeStarted; 248 std::string body = 249 "<subscribe xmlns='google:push'>" 250 "<item channel='cloud_devices' from=''/></subscribe>"; 251 iq_stanza_handler_->SendRequest( 252 "set", "", account_, body, 253 base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()), 254 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); 255} 256 257void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) { 258 if (reply->GetAttributeOrEmpty("type") != "result") { 259 CloseStream(); 260 return; 261 } 262 state_ = XmppState::kSubscribed; 263 if (delegate_) 264 delegate_->OnConnected(GetName()); 265} 266 267void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) { 268 const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true); 269 if (!node) { 270 LOG(WARNING) << "XMPP message stanza is missing <push:data> element"; 271 return; 272 } 273 std::string data = node->text(); 274 std::string json_data; 275 if (!Base64Decode(data, &json_data)) { 276 LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data; 277 return; 278 } 279 280 VLOG(2) << "XMPP push notification data: " << json_data; 281 auto json_dict = LoadJsonDict(json_data, nullptr); 282 if (json_dict && delegate_) 283 ParseNotificationJson(*json_dict, delegate_, GetName()); 284} 285 286void XmppChannel::CreateSslSocket() { 287 CHECK(!stream_); 288 state_ = XmppState::kConnecting; 289 LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_; 290 291 std::pair<std::string, std::string> host_port = 292 SplitAtFirst(xmpp_endpoint_, ":", true); 293 CHECK(!host_port.first.empty()); 294 CHECK(!host_port.second.empty()); 295 uint32_t port = 0; 296 CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_; 297 298 network_->OpenSslSocket(host_port.first, port, 299 base::Bind(&XmppChannel::OnSslSocketReady, 300 task_ptr_factory_.GetWeakPtr())); 301} 302 303void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream, 304 ErrorPtr error) { 305 if (error) { 306 LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection"; 307 backoff_entry_.InformOfRequest(false); 308 309 LOG(INFO) << "Delaying connection to XMPP server for " 310 << backoff_entry_.GetTimeUntilRelease(); 311 return task_runner_->PostDelayedTask( 312 FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket, 313 task_ptr_factory_.GetWeakPtr()), 314 backoff_entry_.GetTimeUntilRelease()); 315 } 316 CHECK(XmppState::kConnecting == state_); 317 backoff_entry_.InformOfRequest(true); 318 stream_ = std::move(stream); 319 state_ = XmppState::kConnected; 320 RestartXmppStream(); 321 ScheduleRegularPing(); 322} 323 324void XmppChannel::SendMessage(const std::string& message) { 325 CHECK(stream_) << "No XMPP socket stream available"; 326 if (write_pending_) { 327 queued_write_data_ += message; 328 return; 329 } 330 write_socket_data_ = queued_write_data_ + message; 331 queued_write_data_.clear(); 332 VLOG(2) << "Sending XMPP message: " << message; 333 334 write_pending_ = true; 335 stream_->Write( 336 write_socket_data_.data(), write_socket_data_.size(), 337 base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr())); 338} 339 340void XmppChannel::OnMessageSent(ErrorPtr error) { 341 write_pending_ = false; 342 if (error) 343 return Restart(); 344 if (queued_write_data_.empty()) { 345 WaitForMessage(); 346 } else { 347 SendMessage(std::string{}); 348 } 349} 350 351void XmppChannel::WaitForMessage() { 352 if (read_pending_ || !stream_) 353 return; 354 355 read_pending_ = true; 356 stream_->Read( 357 read_socket_data_.data(), read_socket_data_.size(), 358 base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr())); 359} 360 361std::string XmppChannel::GetName() const { 362 return "xmpp"; 363} 364 365bool XmppChannel::IsConnected() const { 366 return state_ == XmppState::kSubscribed; 367} 368 369void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) { 370 // No extra parameters needed for XMPP. 371} 372 373void XmppChannel::Restart() { 374 LOG(INFO) << "Restarting XMPP"; 375 Stop(); 376 Start(delegate_); 377} 378 379void XmppChannel::Start(NotificationDelegate* delegate) { 380 CHECK(state_ == XmppState::kNotStarted); 381 delegate_ = delegate; 382 383 CreateSslSocket(); 384} 385 386void XmppChannel::Stop() { 387 if (IsConnected() && delegate_) 388 delegate_->OnDisconnected(); 389 390 task_ptr_factory_.InvalidateWeakPtrs(); 391 ping_ptr_factory_.InvalidateWeakPtrs(); 392 393 stream_.reset(); 394 state_ = XmppState::kNotStarted; 395} 396 397void XmppChannel::RestartXmppStream() { 398 stream_parser_.Reset(); 399 stream_->CancelPendingOperations(); 400 read_pending_ = false; 401 write_pending_ = false; 402 SendMessage(BuildXmppStartStreamCommand()); 403} 404 405void XmppChannel::SchedulePing(base::TimeDelta interval, 406 base::TimeDelta timeout) { 407 VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout; 408 ping_ptr_factory_.InvalidateWeakPtrs(); 409 task_runner_->PostDelayedTask( 410 FROM_HERE, base::Bind(&XmppChannel::PingServer, 411 ping_ptr_factory_.GetWeakPtr(), timeout), 412 interval); 413} 414 415void XmppChannel::ScheduleRegularPing() { 416 SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds), 417 base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds)); 418} 419 420void XmppChannel::ScheduleFastPing() { 421 SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds), 422 base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds)); 423} 424 425void XmppChannel::PingServer(base::TimeDelta timeout) { 426 VLOG(1) << "Sending XMPP ping"; 427 if (!IsConnected()) { 428 LOG(WARNING) << "XMPP channel is not connected"; 429 Restart(); 430 return; 431 } 432 433 // Send an XMPP Ping request as defined in XEP-0199 extension: 434 // http://xmpp.org/extensions/xep-0199.html 435 iq_stanza_handler_->SendRequestWithCustomTimeout( 436 "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout, 437 base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(), 438 base::Time::Now()), 439 base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(), 440 base::Time::Now())); 441} 442 443void XmppChannel::OnPingResponse(base::Time sent_time, 444 std::unique_ptr<XmlNode> reply) { 445 VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time); 446 // Ping response received from server. Everything seems to be in order. 447 // Reschedule with default intervals. 448 ScheduleRegularPing(); 449} 450 451void XmppChannel::OnPingTimeout(base::Time sent_time) { 452 LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after " 453 << (base::Time::Now() - sent_time); 454 Restart(); 455} 456 457void XmppChannel::OnConnectivityChanged() { 458 if (state_ == XmppState::kNotStarted) 459 return; 460 461 if (state_ == XmppState::kConnecting && 462 backoff_entry_.GetTimeUntilRelease() < 463 base::TimeDelta::FromSeconds( 464 kConnectingTimeoutAfterNetChangeSeconds)) { 465 VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease(); 466 return; 467 } 468 469 ScheduleFastPing(); 470} 471 472} // namespace weave 473