xmpp_channel.cc revision 5a1f600e9d7d26c36b3e22ff0dc0ae9e3b2425fc
1// Copyright 2015 The Weave Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "src/notification/xmpp_channel.h" 6 7#include <string> 8 9#include <base/bind.h> 10#include <weave/provider/network.h> 11#include <weave/provider/task_runner.h> 12 13#include "src/backoff_entry.h" 14#include "src/data_encoding.h" 15#include "src/notification/notification_delegate.h" 16#include "src/notification/notification_parser.h" 17#include "src/notification/xml_node.h" 18#include "src/privet/openssl_utils.h" 19#include "src/utils.h" 20 21namespace weave { 22 23namespace { 24 25std::string BuildXmppStartStreamCommand() { 26 return "<stream:stream to='clouddevices.gserviceaccount.com' " 27 "xmlns:stream='http://etherx.jabber.org/streams' " 28 "xml:lang='*' version='1.0' xmlns='jabber:client'>"; 29} 30 31std::string BuildXmppAuthenticateCommand(const std::string& account, 32 const std::string& token) { 33 std::vector<uint8_t> credentials; 34 credentials.push_back(0); 35 credentials.insert(credentials.end(), account.begin(), account.end()); 36 credentials.push_back(0); 37 credentials.insert(credentials.end(), token.begin(), token.end()); 38 std::string msg = 39 "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' " 40 "mechanism='X-OAUTH2' auth:service='oauth2' " 41 "auth:allow-non-google-login='true' " 42 "auth:client-uses-full-bind-result='true' " 43 "xmlns:auth='http://www.google.com/talk/protocol/auth'>" + 44 Base64Encode(credentials) + "</auth>"; 45 return msg; 46} 47 48// Backoff policy. 49// Note: In order to ensure a minimum of 20 seconds between server errors, 50// we have a 30s +- 10s (33%) jitter initial backoff. 51const BackoffEntry::Policy kDefaultBackoffPolicy = { 52 // Number of initial errors (in sequence) to ignore before applying 53 // exponential back-off rules. 54 0, 55 56 // Initial delay for exponential back-off in ms. 57 30 * 1000, // 30 seconds. 58 59 // Factor by which the waiting time will be multiplied. 60 2, 61 62 // Fuzzing percentage. ex: 10% will spread requests randomly 63 // between 90%-100% of the calculated time. 64 0.33, // 33%. 65 66 // Maximum amount of time we are willing to delay our request in ms. 67 10 * 60 * 1000, // 10 minutes. 68 69 // Time to keep an entry from being discarded even when it 70 // has no significant state, -1 to never discard. 71 -1, 72 73 // Don't use initial delay unless the last request was an error. 74 false, 75}; 76 77const char kDefaultXmppHost[] = "talk.google.com"; 78const uint16_t kDefaultXmppPort = 5223; 79 80// Used for keeping connection alive. 81const int kRegularPingIntervalSeconds = 60; 82const int kRegularPingTimeoutSeconds = 30; 83 84// Used for diagnostic when connectivity changed. 85const int kAgressivePingIntervalSeconds = 5; 86const int kAgressivePingTimeoutSeconds = 10; 87 88const int kConnectingTimeoutAfterNetChangeSeconds = 30; 89 90} // namespace 91 92XmppChannel::XmppChannel(const std::string& account, 93 const std::string& access_token, 94 provider::TaskRunner* task_runner, 95 provider::Network* network) 96 : account_{account}, 97 access_token_{access_token}, 98 network_{network}, 99 backoff_entry_{&kDefaultBackoffPolicy}, 100 task_runner_{task_runner}, 101 iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} { 102 read_socket_data_.resize(4096); 103 if (network) { 104 network->AddConnectionChangedCallback(base::Bind( 105 &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr())); 106 } 107} 108 109void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) { 110 read_pending_ = false; 111 if (error) 112 return Restart(); 113 std::string msg(read_socket_data_.data(), size); 114 VLOG(2) << "Received XMPP packet: '" << msg << "'"; 115 116 if (!size) 117 return Restart(); 118 119 stream_parser_.ParseData(msg); 120 WaitForMessage(); 121} 122 123void XmppChannel::OnStreamStart(const std::string& node_name, 124 std::map<std::string, std::string> attributes) { 125 VLOG(2) << "XMPP stream start: " << node_name; 126} 127 128void XmppChannel::OnStreamEnd(const std::string& node_name) { 129 VLOG(2) << "XMPP stream ended: " << node_name; 130 Stop(); 131 if (IsConnected()) { 132 // If we had a fully-established connection, restart it now. 133 // However, if the connection has never been established yet (e.g. 134 // authorization failed), do not restart right now. Wait till we get 135 // new credentials. 136 task_runner_->PostDelayedTask( 137 FROM_HERE, 138 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {}); 139 } else if (delegate_) { 140 delegate_->OnPermanentFailure(); 141 } 142} 143 144void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) { 145 // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback 146 // from expat XML parser and some stanza could cause the XMPP stream to be 147 // reset and the parser to be re-initialized. We don't want to destroy the 148 // parser while it is performing a callback invocation. 149 task_runner_->PostDelayedTask( 150 FROM_HERE, 151 base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(), 152 base::Passed(std::move(stanza))), 153 {}); 154} 155 156void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) { 157 VLOG(2) << "XMPP stanza received: " << stanza->ToString(); 158 159 switch (state_) { 160 case XmppState::kConnected: 161 if (stanza->name() == "stream:features") { 162 auto children = stanza->FindChildren("mechanisms/mechanism", false); 163 for (const auto& child : children) { 164 if (child->text() == "X-OAUTH2") { 165 state_ = XmppState::kAuthenticationStarted; 166 SendMessage(BuildXmppAuthenticateCommand(account_, access_token_)); 167 return; 168 } 169 } 170 } 171 break; 172 case XmppState::kAuthenticationStarted: 173 if (stanza->name() == "success") { 174 state_ = XmppState::kStreamRestartedPostAuthentication; 175 RestartXmppStream(); 176 return; 177 } else if (stanza->name() == "failure") { 178 if (stanza->FindFirstChild("not-authorized", false)) { 179 state_ = XmppState::kAuthenticationFailed; 180 return; 181 } 182 } 183 break; 184 case XmppState::kStreamRestartedPostAuthentication: 185 if (stanza->name() == "stream:features" && 186 stanza->FindFirstChild("bind", false)) { 187 state_ = XmppState::kBindSent; 188 iq_stanza_handler_->SendRequest( 189 "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>", 190 base::Bind(&XmppChannel::OnBindCompleted, 191 task_ptr_factory_.GetWeakPtr()), 192 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); 193 return; 194 } 195 break; 196 default: 197 if (stanza->name() == "message") { 198 HandleMessageStanza(std::move(stanza)); 199 return; 200 } else if (stanza->name() == "iq") { 201 if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) { 202 LOG(ERROR) << "Failed to handle IQ stanza"; 203 CloseStream(); 204 } 205 return; 206 } 207 LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString(); 208 return; 209 } 210 // Something bad happened. Close the stream and start over. 211 LOG(ERROR) << "Error condition occurred handling stanza: " 212 << stanza->ToString() << " in state: " << static_cast<int>(state_); 213 CloseStream(); 214} 215 216void XmppChannel::CloseStream() { 217 SendMessage("</stream:stream>"); 218} 219 220void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) { 221 if (reply->GetAttributeOrEmpty("type") != "result") { 222 CloseStream(); 223 return; 224 } 225 const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false); 226 if (!jid_node) { 227 LOG(ERROR) << "XMPP Bind response is missing JID"; 228 CloseStream(); 229 return; 230 } 231 232 jid_ = jid_node->text(); 233 state_ = XmppState::kSessionStarted; 234 iq_stanza_handler_->SendRequest( 235 "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>", 236 base::Bind(&XmppChannel::OnSessionEstablished, 237 task_ptr_factory_.GetWeakPtr()), 238 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); 239} 240 241void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) { 242 if (reply->GetAttributeOrEmpty("type") != "result") { 243 CloseStream(); 244 return; 245 } 246 state_ = XmppState::kSubscribeStarted; 247 std::string body = 248 "<subscribe xmlns='google:push'>" 249 "<item channel='cloud_devices' from=''/></subscribe>"; 250 iq_stanza_handler_->SendRequest( 251 "set", "", account_, body, 252 base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()), 253 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); 254} 255 256void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) { 257 if (reply->GetAttributeOrEmpty("type") != "result") { 258 CloseStream(); 259 return; 260 } 261 state_ = XmppState::kSubscribed; 262 if (delegate_) 263 delegate_->OnConnected(GetName()); 264} 265 266void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) { 267 const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true); 268 if (!node) { 269 LOG(WARNING) << "XMPP message stanza is missing <push:data> element"; 270 return; 271 } 272 std::string data = node->text(); 273 std::string json_data; 274 if (!Base64Decode(data, &json_data)) { 275 LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data; 276 return; 277 } 278 279 VLOG(2) << "XMPP push notification data: " << json_data; 280 auto json_dict = LoadJsonDict(json_data, nullptr); 281 if (json_dict && delegate_) 282 ParseNotificationJson(*json_dict, delegate_, GetName()); 283} 284 285void XmppChannel::CreateSslSocket() { 286 CHECK(!stream_); 287 state_ = XmppState::kConnecting; 288 LOG(INFO) << "Starting XMPP connection to " << kDefaultXmppHost << ":" 289 << kDefaultXmppPort; 290 291 network_->OpenSslSocket(kDefaultXmppHost, kDefaultXmppPort, 292 base::Bind(&XmppChannel::OnSslSocketReady, 293 task_ptr_factory_.GetWeakPtr())); 294} 295 296void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream, 297 ErrorPtr error) { 298 if (error) { 299 LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection"; 300 backoff_entry_.InformOfRequest(false); 301 302 LOG(INFO) << "Delaying connection to XMPP server for " 303 << backoff_entry_.GetTimeUntilRelease(); 304 return task_runner_->PostDelayedTask( 305 FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket, 306 task_ptr_factory_.GetWeakPtr()), 307 backoff_entry_.GetTimeUntilRelease()); 308 } 309 CHECK(XmppState::kConnecting == state_); 310 backoff_entry_.InformOfRequest(true); 311 stream_ = std::move(stream); 312 state_ = XmppState::kConnected; 313 RestartXmppStream(); 314 ScheduleRegularPing(); 315} 316 317void XmppChannel::SendMessage(const std::string& message) { 318 CHECK(stream_) << "No XMPP socket stream available"; 319 if (write_pending_) { 320 queued_write_data_ += message; 321 return; 322 } 323 write_socket_data_ = queued_write_data_ + message; 324 queued_write_data_.clear(); 325 VLOG(2) << "Sending XMPP message: " << message; 326 327 write_pending_ = true; 328 stream_->Write( 329 write_socket_data_.data(), write_socket_data_.size(), 330 base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr())); 331} 332 333void XmppChannel::OnMessageSent(ErrorPtr error) { 334 write_pending_ = false; 335 if (error) 336 return Restart(); 337 if (queued_write_data_.empty()) { 338 WaitForMessage(); 339 } else { 340 SendMessage(std::string{}); 341 } 342} 343 344void XmppChannel::WaitForMessage() { 345 if (read_pending_ || !stream_) 346 return; 347 348 read_pending_ = true; 349 stream_->Read( 350 read_socket_data_.data(), read_socket_data_.size(), 351 base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr())); 352} 353 354std::string XmppChannel::GetName() const { 355 return "xmpp"; 356} 357 358bool XmppChannel::IsConnected() const { 359 return state_ == XmppState::kSubscribed; 360} 361 362void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) { 363 // No extra parameters needed for XMPP. 364} 365 366void XmppChannel::Restart() { 367 LOG(INFO) << "Restarting XMPP"; 368 Stop(); 369 Start(delegate_); 370} 371 372void XmppChannel::Start(NotificationDelegate* delegate) { 373 CHECK(state_ == XmppState::kNotStarted); 374 delegate_ = delegate; 375 376 CreateSslSocket(); 377} 378 379void XmppChannel::Stop() { 380 if (IsConnected() && delegate_) 381 delegate_->OnDisconnected(); 382 383 task_ptr_factory_.InvalidateWeakPtrs(); 384 ping_ptr_factory_.InvalidateWeakPtrs(); 385 386 stream_.reset(); 387 state_ = XmppState::kNotStarted; 388} 389 390void XmppChannel::RestartXmppStream() { 391 stream_parser_.Reset(); 392 stream_->CancelPendingOperations(); 393 read_pending_ = false; 394 write_pending_ = false; 395 SendMessage(BuildXmppStartStreamCommand()); 396} 397 398void XmppChannel::SchedulePing(base::TimeDelta interval, 399 base::TimeDelta timeout) { 400 VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout; 401 ping_ptr_factory_.InvalidateWeakPtrs(); 402 task_runner_->PostDelayedTask( 403 FROM_HERE, base::Bind(&XmppChannel::PingServer, 404 ping_ptr_factory_.GetWeakPtr(), timeout), 405 interval); 406} 407 408void XmppChannel::ScheduleRegularPing() { 409 SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds), 410 base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds)); 411} 412 413void XmppChannel::ScheduleFastPing() { 414 SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds), 415 base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds)); 416} 417 418void XmppChannel::PingServer(base::TimeDelta timeout) { 419 VLOG(1) << "Sending XMPP ping"; 420 if (!IsConnected()) { 421 LOG(WARNING) << "XMPP channel is not connected"; 422 Restart(); 423 return; 424 } 425 426 // Send an XMPP Ping request as defined in XEP-0199 extension: 427 // http://xmpp.org/extensions/xep-0199.html 428 iq_stanza_handler_->SendRequestWithCustomTimeout( 429 "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout, 430 base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(), 431 base::Time::Now()), 432 base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(), 433 base::Time::Now())); 434} 435 436void XmppChannel::OnPingResponse(base::Time sent_time, 437 std::unique_ptr<XmlNode> reply) { 438 VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time); 439 // Ping response received from server. Everything seems to be in order. 440 // Reschedule with default intervals. 441 ScheduleRegularPing(); 442} 443 444void XmppChannel::OnPingTimeout(base::Time sent_time) { 445 LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after " 446 << (base::Time::Now() - sent_time); 447 Restart(); 448} 449 450void XmppChannel::OnConnectivityChanged() { 451 if (state_ == XmppState::kNotStarted) 452 return; 453 454 if (state_ == XmppState::kConnecting && 455 backoff_entry_.GetTimeUntilRelease() < 456 base::TimeDelta::FromSeconds( 457 kConnectingTimeoutAfterNetChangeSeconds)) { 458 VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease(); 459 return; 460 } 461 462 ScheduleFastPing(); 463} 464 465} // namespace weave 466