quic_client_session.cc revision 1e9bf3e0803691d0a228da41fc608347b6db4340
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_client_session.h" 6 7#include "base/callback_helpers.h" 8#include "base/message_loop/message_loop.h" 9#include "base/metrics/histogram.h" 10#include "base/metrics/sparse_histogram.h" 11#include "base/stl_util.h" 12#include "base/strings/string_number_conversions.h" 13#include "base/values.h" 14#include "net/base/io_buffer.h" 15#include "net/base/net_errors.h" 16#include "net/quic/quic_connection_helper.h" 17#include "net/quic/quic_crypto_client_stream_factory.h" 18#include "net/quic/quic_default_packet_writer.h" 19#include "net/quic/quic_stream_factory.h" 20#include "net/ssl/ssl_info.h" 21#include "net/udp/datagram_client_socket.h" 22 23namespace net { 24 25namespace { 26 27// Note: these values must be kept in sync with the corresponding values in: 28// tools/metrics/histograms/histograms.xml 29enum HandshakeState { 30 STATE_STARTED = 0, 31 STATE_ENCRYPTION_ESTABLISHED = 1, 32 STATE_HANDSHAKE_CONFIRMED = 2, 33 STATE_FAILED = 3, 34 NUM_HANDSHAKE_STATES = 4 35}; 36 37void RecordHandshakeState(HandshakeState state) { 38 UMA_HISTOGRAM_ENUMERATION("Net.QuicHandshakeState", state, 39 NUM_HANDSHAKE_STATES); 40} 41 42} // namespace 43 44QuicClientSession::StreamRequest::StreamRequest() : stream_(NULL) {} 45 46QuicClientSession::StreamRequest::~StreamRequest() { 47 CancelRequest(); 48} 49 50int QuicClientSession::StreamRequest::StartRequest( 51 const base::WeakPtr<QuicClientSession>& session, 52 QuicReliableClientStream** stream, 53 const CompletionCallback& callback) { 54 session_ = session; 55 stream_ = stream; 56 int rv = session_->TryCreateStream(this, stream_); 57 if (rv == ERR_IO_PENDING) { 58 callback_ = callback; 59 } 60 61 return rv; 62} 63 64void QuicClientSession::StreamRequest::CancelRequest() { 65 if (session_) 66 session_->CancelRequest(this); 67 session_.reset(); 68 callback_.Reset(); 69} 70 71void QuicClientSession::StreamRequest::OnRequestCompleteSuccess( 72 QuicReliableClientStream* stream) { 73 session_.reset(); 74 *stream_ = stream; 75 ResetAndReturn(&callback_).Run(OK); 76} 77 78void QuicClientSession::StreamRequest::OnRequestCompleteFailure(int rv) { 79 session_.reset(); 80 ResetAndReturn(&callback_).Run(rv); 81} 82 83QuicClientSession::QuicClientSession( 84 QuicConnection* connection, 85 scoped_ptr<DatagramClientSocket> socket, 86 scoped_ptr<QuicDefaultPacketWriter> writer, 87 QuicStreamFactory* stream_factory, 88 QuicCryptoClientStreamFactory* crypto_client_stream_factory, 89 const string& server_hostname, 90 const QuicConfig& config, 91 QuicCryptoClientConfig* crypto_config, 92 NetLog* net_log) 93 : QuicSession(connection, config, false), 94 require_confirmation_(false), 95 stream_factory_(stream_factory), 96 socket_(socket.Pass()), 97 writer_(writer.Pass()), 98 read_buffer_(new IOBufferWithSize(kMaxPacketSize)), 99 read_pending_(false), 100 num_total_streams_(0), 101 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_QUIC_SESSION)), 102 logger_(net_log_), 103 num_packets_read_(0), 104 weak_factory_(this) { 105 crypto_stream_.reset( 106 crypto_client_stream_factory ? 107 crypto_client_stream_factory->CreateQuicCryptoClientStream( 108 server_hostname, this, crypto_config) : 109 new QuicCryptoClientStream(server_hostname, this, crypto_config)); 110 111 connection->set_debug_visitor(&logger_); 112 // TODO(rch): pass in full host port proxy pair 113 net_log_.BeginEvent( 114 NetLog::TYPE_QUIC_SESSION, 115 NetLog::StringCallback("host", &server_hostname)); 116} 117 118QuicClientSession::~QuicClientSession() { 119 // The session must be closed before it is destroyed. 120 DCHECK(streams()->empty()); 121 CloseAllStreams(ERR_UNEXPECTED); 122 DCHECK(observers_.empty()); 123 CloseAllObservers(ERR_UNEXPECTED); 124 125 connection()->set_debug_visitor(NULL); 126 net_log_.EndEvent(NetLog::TYPE_QUIC_SESSION); 127 128 while (!stream_requests_.empty()) { 129 StreamRequest* request = stream_requests_.front(); 130 stream_requests_.pop_front(); 131 request->OnRequestCompleteFailure(ERR_ABORTED); 132 } 133 134 if (IsEncryptionEstablished()) 135 RecordHandshakeState(STATE_ENCRYPTION_ESTABLISHED); 136 if (IsCryptoHandshakeConfirmed()) 137 RecordHandshakeState(STATE_HANDSHAKE_CONFIRMED); 138 else 139 RecordHandshakeState(STATE_FAILED); 140 141 UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellos", 142 crypto_stream_->num_sent_client_hellos()); 143 if (IsCryptoHandshakeConfirmed()) { 144 UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellosCryptoHandshakeConfirmed", 145 crypto_stream_->num_sent_client_hellos()); 146 } 147 148 UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumTotalStreams", num_total_streams_); 149} 150 151bool QuicClientSession::OnStreamFrames( 152 const std::vector<QuicStreamFrame>& frames) { 153 // Record total number of stream frames. 154 UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesInPacket", frames.size()); 155 156 // Record number of frames per stream in packet. 157 typedef std::map<QuicStreamId, size_t> FrameCounter; 158 FrameCounter frames_per_stream; 159 for (size_t i = 0; i < frames.size(); ++i) { 160 frames_per_stream[frames[i].stream_id]++; 161 } 162 for (FrameCounter::const_iterator it = frames_per_stream.begin(); 163 it != frames_per_stream.end(); ++it) { 164 UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesPerStreamInPacket", 165 it->second); 166 } 167 168 return QuicSession::OnStreamFrames(frames); 169} 170 171void QuicClientSession::AddObserver(Observer* observer) { 172 DCHECK(!ContainsKey(observers_, observer)); 173 observers_.insert(observer); 174} 175 176void QuicClientSession::RemoveObserver(Observer* observer) { 177 DCHECK(ContainsKey(observers_, observer)); 178 observers_.erase(observer); 179} 180 181int QuicClientSession::TryCreateStream(StreamRequest* request, 182 QuicReliableClientStream** stream) { 183 if (!crypto_stream_->encryption_established()) { 184 DLOG(DFATAL) << "Encryption not established."; 185 return ERR_CONNECTION_CLOSED; 186 } 187 188 if (goaway_received()) { 189 DLOG(INFO) << "Going away."; 190 return ERR_CONNECTION_CLOSED; 191 } 192 193 if (!connection()->connected()) { 194 DLOG(INFO) << "Already closed."; 195 return ERR_CONNECTION_CLOSED; 196 } 197 198 if (GetNumOpenStreams() < get_max_open_streams()) { 199 *stream = CreateOutgoingReliableStreamImpl(); 200 return OK; 201 } 202 203 stream_requests_.push_back(request); 204 return ERR_IO_PENDING; 205} 206 207void QuicClientSession::CancelRequest(StreamRequest* request) { 208 // Remove |request| from the queue while preserving the order of the 209 // other elements. 210 StreamRequestQueue::iterator it = 211 std::find(stream_requests_.begin(), stream_requests_.end(), request); 212 if (it != stream_requests_.end()) { 213 it = stream_requests_.erase(it); 214 } 215} 216 217QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() { 218 if (!crypto_stream_->encryption_established()) { 219 DLOG(INFO) << "Encryption not active so no outgoing stream created."; 220 return NULL; 221 } 222 if (GetNumOpenStreams() >= get_max_open_streams()) { 223 DLOG(INFO) << "Failed to create a new outgoing stream. " 224 << "Already " << GetNumOpenStreams() << " open."; 225 return NULL; 226 } 227 if (goaway_received()) { 228 DLOG(INFO) << "Failed to create a new outgoing stream. " 229 << "Already received goaway."; 230 return NULL; 231 } 232 233 return CreateOutgoingReliableStreamImpl(); 234} 235 236QuicReliableClientStream* 237QuicClientSession::CreateOutgoingReliableStreamImpl() { 238 DCHECK(connection()->connected()); 239 QuicReliableClientStream* stream = 240 new QuicReliableClientStream(GetNextStreamId(), this, net_log_); 241 ActivateStream(stream); 242 ++num_total_streams_; 243 UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumOpenStreams", GetNumOpenStreams()); 244 return stream; 245} 246 247QuicCryptoClientStream* QuicClientSession::GetCryptoStream() { 248 return crypto_stream_.get(); 249}; 250 251bool QuicClientSession::GetSSLInfo(SSLInfo* ssl_info) { 252 DCHECK(crypto_stream_.get()); 253 return crypto_stream_->GetSSLInfo(ssl_info); 254} 255 256int QuicClientSession::CryptoConnect(bool require_confirmation, 257 const CompletionCallback& callback) { 258 require_confirmation_ = require_confirmation; 259 RecordHandshakeState(STATE_STARTED); 260 if (!crypto_stream_->CryptoConnect()) { 261 // TODO(wtc): change crypto_stream_.CryptoConnect() to return a 262 // QuicErrorCode and map it to a net error code. 263 return ERR_CONNECTION_FAILED; 264 } 265 266 bool can_notify = require_confirmation_ ? 267 IsCryptoHandshakeConfirmed() : IsEncryptionEstablished(); 268 if (can_notify) { 269 return OK; 270 } 271 272 callback_ = callback; 273 return ERR_IO_PENDING; 274} 275 276int QuicClientSession::GetNumSentClientHellos() const { 277 return crypto_stream_->num_sent_client_hellos(); 278} 279 280ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream( 281 QuicStreamId id) { 282 DLOG(ERROR) << "Server push not supported"; 283 return NULL; 284} 285 286void QuicClientSession::CloseStream(QuicStreamId stream_id) { 287 QuicSession::CloseStream(stream_id); 288 OnClosedStream(); 289} 290 291void QuicClientSession::SendRstStream(QuicStreamId id, 292 QuicRstStreamErrorCode error) { 293 QuicSession::SendRstStream(id, error); 294 OnClosedStream(); 295} 296 297void QuicClientSession::OnClosedStream() { 298 if (GetNumOpenStreams() < get_max_open_streams() && 299 !stream_requests_.empty() && 300 crypto_stream_->encryption_established() && 301 !goaway_received() && 302 connection()->connected()) { 303 StreamRequest* request = stream_requests_.front(); 304 stream_requests_.pop_front(); 305 request->OnRequestCompleteSuccess(CreateOutgoingReliableStreamImpl()); 306 } 307 308 if (GetNumOpenStreams() == 0) { 309 stream_factory_->OnIdleSession(this); 310 } 311} 312 313void QuicClientSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 314 if (!callback_.is_null() && 315 (!require_confirmation_ || event == HANDSHAKE_CONFIRMED)) { 316 // TODO(rtenneti): Currently for all CryptoHandshakeEvent events, callback_ 317 // could be called because there are no error events in CryptoHandshakeEvent 318 // enum. If error events are added to CryptoHandshakeEvent, then the 319 // following code needs to changed. 320 base::ResetAndReturn(&callback_).Run(OK); 321 } 322 if (event == HANDSHAKE_CONFIRMED) { 323 ObserverSet::iterator it = observers_.begin(); 324 while (it != observers_.end()) { 325 Observer* observer = *it; 326 ++it; 327 observer->OnCryptoHandshakeConfirmed(); 328 } 329 } 330 QuicSession::OnCryptoHandshakeEvent(event); 331} 332 333void QuicClientSession::OnCryptoHandshakeMessageSent( 334 const CryptoHandshakeMessage& message) { 335 logger_.OnCryptoHandshakeMessageSent(message); 336} 337 338void QuicClientSession::OnCryptoHandshakeMessageReceived( 339 const CryptoHandshakeMessage& message) { 340 logger_.OnCryptoHandshakeMessageReceived(message); 341} 342 343void QuicClientSession::OnConnectionClosed(QuicErrorCode error, 344 bool from_peer) { 345 DCHECK(!connection()->connected()); 346 logger_.OnConnectionClosed(error, from_peer); 347 if (from_peer) { 348 UMA_HISTOGRAM_SPARSE_SLOWLY( 349 "Net.QuicSession.ConnectionCloseErrorCodeServer", error); 350 } else { 351 UMA_HISTOGRAM_SPARSE_SLOWLY( 352 "Net.QuicSession.ConnectionCloseErrorCodeClient", error); 353 } 354 355 if (error == QUIC_CONNECTION_TIMED_OUT) { 356 UMA_HISTOGRAM_COUNTS( 357 "Net.QuicSession.ConnectionClose.NumOpenStreams.TimedOut", 358 GetNumOpenStreams()); 359 if (!IsCryptoHandshakeConfirmed()) { 360 // If there have been any streams created, they were 0-RTT speculative 361 // requests that have not be serviced. 362 UMA_HISTOGRAM_COUNTS( 363 "Net.QuicSession.ConnectionClose.NumTotalStreams.HandshakeTimedOut", 364 num_total_streams_); 365 } 366 } 367 368 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.QuicVersion", 369 connection()->version()); 370 NotifyFactoryOfSessionGoingAway(); 371 if (!callback_.is_null()) { 372 base::ResetAndReturn(&callback_).Run(ERR_QUIC_PROTOCOL_ERROR); 373 } 374 socket_->Close(); 375 QuicSession::OnConnectionClosed(error, from_peer); 376 NotifyFactoryOfSessionClosedLater(); 377} 378 379void QuicClientSession::OnSuccessfulVersionNegotiation( 380 const QuicVersion& version) { 381 logger_.OnSuccessfulVersionNegotiation(version); 382 QuicSession::OnSuccessfulVersionNegotiation(version); 383} 384 385void QuicClientSession::StartReading() { 386 if (read_pending_) { 387 return; 388 } 389 read_pending_ = true; 390 int rv = socket_->Read(read_buffer_.get(), 391 read_buffer_->size(), 392 base::Bind(&QuicClientSession::OnReadComplete, 393 weak_factory_.GetWeakPtr())); 394 if (rv == ERR_IO_PENDING) { 395 num_packets_read_ = 0; 396 return; 397 } 398 399 if (++num_packets_read_ > 32) { 400 num_packets_read_ = 0; 401 // Data was read, process it. 402 // Schedule the work through the message loop to avoid recursive 403 // callbacks. 404 base::MessageLoop::current()->PostTask( 405 FROM_HERE, 406 base::Bind(&QuicClientSession::OnReadComplete, 407 weak_factory_.GetWeakPtr(), rv)); 408 } else { 409 OnReadComplete(rv); 410 } 411} 412 413void QuicClientSession::CloseSessionOnError(int error) { 414 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.CloseSessionOnError", -error); 415 CloseSessionOnErrorInner(error, QUIC_INTERNAL_ERROR); 416 NotifyFactoryOfSessionClosed(); 417} 418 419void QuicClientSession::CloseSessionOnErrorInner(int net_error, 420 QuicErrorCode quic_error) { 421 if (!callback_.is_null()) { 422 base::ResetAndReturn(&callback_).Run(net_error); 423 } 424 CloseAllStreams(net_error); 425 CloseAllObservers(net_error); 426 net_log_.AddEvent( 427 NetLog::TYPE_QUIC_SESSION_CLOSE_ON_ERROR, 428 NetLog::IntegerCallback("net_error", net_error)); 429 430 connection()->CloseConnection(quic_error, false); 431 DCHECK(!connection()->connected()); 432} 433 434void QuicClientSession::CloseAllStreams(int net_error) { 435 while (!streams()->empty()) { 436 ReliableQuicStream* stream = streams()->begin()->second; 437 QuicStreamId id = stream->id(); 438 static_cast<QuicReliableClientStream*>(stream)->OnError(net_error); 439 CloseStream(id); 440 } 441} 442 443void QuicClientSession::CloseAllObservers(int net_error) { 444 while (!observers_.empty()) { 445 Observer* observer = *observers_.begin(); 446 observers_.erase(observer); 447 observer->OnSessionClosed(net_error); 448 } 449} 450 451base::Value* QuicClientSession::GetInfoAsValue(const HostPortPair& pair) const { 452 base::DictionaryValue* dict = new base::DictionaryValue(); 453 dict->SetString("host_port_pair", pair.ToString()); 454 dict->SetString("version", QuicVersionToString(connection()->version())); 455 dict->SetInteger("open_streams", GetNumOpenStreams()); 456 dict->SetInteger("total_streams", num_total_streams_); 457 dict->SetString("peer_address", peer_address().ToString()); 458 dict->SetString("guid", base::Uint64ToString(guid())); 459 dict->SetBoolean("connected", connection()->connected()); 460 return dict; 461} 462 463base::WeakPtr<QuicClientSession> QuicClientSession::GetWeakPtr() { 464 return weak_factory_.GetWeakPtr(); 465} 466 467void QuicClientSession::OnReadComplete(int result) { 468 read_pending_ = false; 469 if (result == 0) 470 result = ERR_CONNECTION_CLOSED; 471 472 if (result < 0) { 473 DLOG(INFO) << "Closing session on read error: " << result; 474 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result); 475 NotifyFactoryOfSessionGoingAway(); 476 CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR); 477 NotifyFactoryOfSessionClosedLater(); 478 return; 479 } 480 481 scoped_refptr<IOBufferWithSize> buffer(read_buffer_); 482 read_buffer_ = new IOBufferWithSize(kMaxPacketSize); 483 QuicEncryptedPacket packet(buffer->data(), result); 484 IPEndPoint local_address; 485 IPEndPoint peer_address; 486 socket_->GetLocalAddress(&local_address); 487 socket_->GetPeerAddress(&peer_address); 488 // ProcessUdpPacket might result in |this| being deleted, so we 489 // use a weak pointer to be safe. 490 connection()->ProcessUdpPacket(local_address, peer_address, packet); 491 if (!connection()->connected()) { 492 stream_factory_->OnSessionClosed(this); 493 return; 494 } 495 StartReading(); 496} 497 498void QuicClientSession::NotifyFactoryOfSessionGoingAway() { 499 if (stream_factory_) 500 stream_factory_->OnSessionGoingAway(this); 501} 502 503void QuicClientSession::NotifyFactoryOfSessionClosedLater() { 504 DCHECK_EQ(0u, GetNumOpenStreams()); 505 DCHECK(!connection()->connected()); 506 base::MessageLoop::current()->PostTask( 507 FROM_HERE, 508 base::Bind(&QuicClientSession::NotifyFactoryOfSessionClosed, 509 weak_factory_.GetWeakPtr())); 510} 511 512void QuicClientSession::NotifyFactoryOfSessionClosed() { 513 DCHECK_EQ(0u, GetNumOpenStreams()); 514 // Will delete |this|. 515 if (stream_factory_) 516 stream_factory_->OnSessionClosed(this); 517} 518 519} // namespace net 520