quic_session.cc revision d0247b1b59f9c528cb6df88b4f2b9afaf80d181e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_session.h" 6 7#include "base/stl_util.h" 8#include "net/quic/crypto/proof_verifier.h" 9#include "net/quic/quic_connection.h" 10#include "net/ssl/ssl_info.h" 11 12using base::StringPiece; 13using base::hash_map; 14using base::hash_set; 15using std::make_pair; 16using std::vector; 17 18namespace net { 19 20const size_t kMaxPrematurelyClosedStreamsTracked = 20; 21const size_t kMaxZombieStreams = 20; 22 23#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 24 25// We want to make sure we delete any closed streams in a safe manner. 26// To avoid deleting a stream in mid-operation, we have a simple shim between 27// us and the stream, so we can delete any streams when we return from 28// processing. 29// 30// We could just override the base methods, but this makes it easier to make 31// sure we don't miss any. 32class VisitorShim : public QuicConnectionVisitorInterface { 33 public: 34 explicit VisitorShim(QuicSession* session) : session_(session) {} 35 36 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 37 bool accepted = session_->OnStreamFrames(frames); 38 session_->PostProcessAfterData(); 39 return accepted; 40 } 41 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 42 session_->OnRstStream(frame); 43 session_->PostProcessAfterData(); 44 } 45 46 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 47 session_->OnGoAway(frame); 48 session_->PostProcessAfterData(); 49 } 50 51 virtual bool OnCanWrite() OVERRIDE { 52 bool rc = session_->OnCanWrite(); 53 session_->PostProcessAfterData(); 54 return rc; 55 } 56 57 virtual void OnSuccessfulVersionNegotiation( 58 const QuicVersion& version) OVERRIDE { 59 session_->OnSuccessfulVersionNegotiation(version); 60 } 61 62 virtual void ConnectionClose(QuicErrorCode error, bool from_peer) OVERRIDE { 63 session_->ConnectionClose(error, from_peer); 64 // The session will go away, so don't bother with cleanup. 65 } 66 67 virtual bool HasPendingHandshake() const OVERRIDE { 68 return session_->HasPendingHandshake(); 69 } 70 71 private: 72 QuicSession* session_; 73}; 74 75QuicSession::QuicSession(QuicConnection* connection, 76 const QuicConfig& config, 77 bool is_server) 78 : connection_(connection), 79 visitor_shim_(new VisitorShim(this)), 80 config_(config), 81 max_open_streams_(config_.max_streams_per_connection()), 82 next_stream_id_(is_server ? 2 : 3), 83 is_server_(is_server), 84 largest_peer_created_stream_id_(0), 85 error_(QUIC_NO_ERROR), 86 goaway_received_(false), 87 goaway_sent_(false), 88 has_pending_handshake_(false) { 89 90 connection_->set_visitor(visitor_shim_.get()); 91 connection_->SetIdleNetworkTimeout(config_.idle_connection_state_lifetime()); 92 if (connection_->connected()) { 93 connection_->SetOverallConnectionTimeout( 94 config_.max_time_before_crypto_handshake()); 95 } 96 // TODO(satyamshekhar): Set congestion control and ICSL also. 97} 98 99QuicSession::~QuicSession() { 100 STLDeleteElements(&closed_streams_); 101 STLDeleteValues(&stream_map_); 102} 103 104bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 105 for (size_t i = 0; i < frames.size(); ++i) { 106 // TODO(rch) deal with the error case of stream id 0 107 if (IsClosedStream(frames[i].stream_id)) { 108 // If we get additional frames for a stream where we didn't process 109 // headers, it's highly likely our compression context will end up 110 // permanently out of sync with the peer's, so we give up and close the 111 // connection. 112 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) { 113 connection()->SendConnectionClose( 114 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 115 return false; 116 } 117 continue; 118 } 119 120 ReliableQuicStream* stream = GetStream(frames[i].stream_id); 121 if (stream == NULL) return false; 122 if (!stream->WillAcceptStreamFrame(frames[i])) return false; 123 124 // TODO(alyssar) check against existing connection address: if changed, make 125 // sure we update the connection. 126 } 127 128 for (size_t i = 0; i < frames.size(); ++i) { 129 QuicStreamId stream_id = frames[i].stream_id; 130 ReliableQuicStream* stream = GetStream(stream_id); 131 if (!stream) { 132 continue; 133 } 134 stream->OnStreamFrame(frames[i]); 135 136 // If the stream had been prematurely closed, and the 137 // headers are now decompressed, then we are finally finished 138 // with this stream. 139 if (ContainsKey(zombie_streams_, stream_id) && 140 stream->headers_decompressed()) { 141 CloseZombieStream(stream_id); 142 } 143 } 144 145 while (!decompression_blocked_streams_.empty()) { 146 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first; 147 if (header_id != decompressor_.current_header_id()) { 148 break; 149 } 150 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second; 151 decompression_blocked_streams_.erase(header_id); 152 ReliableQuicStream* stream = GetStream(stream_id); 153 if (!stream) { 154 connection()->SendConnectionClose( 155 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 156 return false; 157 } 158 stream->OnDecompressorAvailable(); 159 } 160 return true; 161} 162 163void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 164 ReliableQuicStream* stream = GetStream(frame.stream_id); 165 if (!stream) { 166 return; // Errors are handled by GetStream. 167 } 168 if (ContainsKey(zombie_streams_, stream->id())) { 169 // If this was a zombie stream then we close it out now. 170 CloseZombieStream(stream->id()); 171 // However, since the headers still have not been decompressed, we want to 172 // mark it a prematurely closed so that if we ever receive frames 173 // for this stream we can close the connection. 174 DCHECK(!stream->headers_decompressed()); 175 AddPrematurelyClosedStream(frame.stream_id); 176 return; 177 } 178 stream->OnStreamReset(frame.error_code); 179} 180 181void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 182 DCHECK(frame.last_good_stream_id < next_stream_id_); 183 goaway_received_ = true; 184} 185 186void QuicSession::ConnectionClose(QuicErrorCode error, bool from_peer) { 187 DCHECK(!connection_->connected()); 188 if (error_ == QUIC_NO_ERROR) { 189 error_ = error; 190 } 191 192 while (stream_map_.size() != 0) { 193 ReliableStreamMap::iterator it = stream_map_.begin(); 194 QuicStreamId id = it->first; 195 it->second->ConnectionClose(error, from_peer); 196 // The stream should call CloseStream as part of ConnectionClose. 197 if (stream_map_.find(id) != stream_map_.end()) { 198 LOG(DFATAL) << ENDPOINT << "Stream failed to close under ConnectionClose"; 199 CloseStream(id); 200 } 201 } 202} 203 204bool QuicSession::OnCanWrite() { 205 // We latch this here rather than doing a traditional loop, because streams 206 // may be modifying the list as we loop. 207 int remaining_writes = write_blocked_streams_.NumBlockedStreams(); 208 209 while (!connection_->HasQueuedData() && 210 remaining_writes > 0) { 211 DCHECK(write_blocked_streams_.HasWriteBlockedStreams()); 212 int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList(); 213 if (index == -1) { 214 LOG(DFATAL) << "WriteBlockedStream is missing"; 215 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 216 return true; // We have no write blocked streams. 217 } 218 QuicStreamId stream_id = write_blocked_streams_.PopFront(index); 219 if (stream_id == kCryptoStreamId) { 220 has_pending_handshake_ = false; // We just popped it. 221 } 222 ReliableQuicStream* stream = GetStream(stream_id); 223 if (stream != NULL) { 224 // If the stream can't write all bytes, it'll re-add itself to the blocked 225 // list. 226 stream->OnCanWrite(); 227 } 228 --remaining_writes; 229 } 230 231 return !write_blocked_streams_.HasWriteBlockedStreams(); 232} 233 234bool QuicSession::HasPendingHandshake() const { 235 return has_pending_handshake_; 236} 237 238QuicConsumedData QuicSession::WritevData(QuicStreamId id, 239 const struct iovec* iov, 240 int iov_count, 241 QuicStreamOffset offset, 242 bool fin) { 243 return connection_->SendvStreamData(id, iov, iov_count, offset, fin); 244} 245 246void QuicSession::SendRstStream(QuicStreamId id, 247 QuicRstStreamErrorCode error) { 248 connection_->SendRstStream(id, error); 249 CloseStreamInner(id, true); 250} 251 252void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 253 goaway_sent_ = true; 254 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 255} 256 257void QuicSession::CloseStream(QuicStreamId stream_id) { 258 CloseStreamInner(stream_id, false); 259} 260 261void QuicSession::CloseStreamInner(QuicStreamId stream_id, 262 bool locally_reset) { 263 DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id; 264 265 ReliableStreamMap::iterator it = stream_map_.find(stream_id); 266 if (it == stream_map_.end()) { 267 DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id; 268 return; 269 } 270 ReliableQuicStream* stream = it->second; 271 if (connection_->connected() && !stream->headers_decompressed()) { 272 // If the stream is being closed locally (for example a client cancelling 273 // a request before receiving the response) then we need to make sure that 274 // we keep the stream alive long enough to process any response or 275 // RST_STREAM frames. 276 if (locally_reset && !is_server_) { 277 AddZombieStream(stream_id); 278 return; 279 } 280 281 // This stream has been closed before the headers were decompressed. 282 // This might cause problems with head of line blocking of headers. 283 // If the peer sent headers which were lost but we now close the stream 284 // we will never be able to decompress headers for other streams. 285 // To deal with this, we keep track of streams which have been closed 286 // prematurely. If we ever receive data frames for this steam, then we 287 // know there actually has been a problem and we close the connection. 288 AddPrematurelyClosedStream(stream->id()); 289 } 290 closed_streams_.push_back(it->second); 291 if (ContainsKey(zombie_streams_, stream->id())) { 292 zombie_streams_.erase(stream->id()); 293 } 294 stream_map_.erase(it); 295 stream->OnClose(); 296} 297 298void QuicSession::AddZombieStream(QuicStreamId stream_id) { 299 if (zombie_streams_.size() == kMaxZombieStreams) { 300 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first; 301 CloseZombieStream(oldest_zombie_stream_id); 302 // However, since the headers still have not been decompressed, we want to 303 // mark it a prematurely closed so that if we ever receive frames 304 // for this stream we can close the connection. 305 AddPrematurelyClosedStream(oldest_zombie_stream_id); 306 } 307 zombie_streams_.insert(make_pair(stream_id, true)); 308} 309 310void QuicSession::CloseZombieStream(QuicStreamId stream_id) { 311 DCHECK(ContainsKey(zombie_streams_, stream_id)); 312 zombie_streams_.erase(stream_id); 313 ReliableQuicStream* stream = GetStream(stream_id); 314 if (!stream) { 315 return; 316 } 317 stream_map_.erase(stream_id); 318 stream->OnClose(); 319 closed_streams_.push_back(stream); 320} 321 322void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) { 323 if (prematurely_closed_streams_.size() == 324 kMaxPrematurelyClosedStreamsTracked) { 325 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin()); 326 } 327 prematurely_closed_streams_.insert(make_pair(stream_id, true)); 328} 329 330bool QuicSession::IsEncryptionEstablished() { 331 return GetCryptoStream()->encryption_established(); 332} 333 334bool QuicSession::IsCryptoHandshakeConfirmed() { 335 return GetCryptoStream()->handshake_confirmed(); 336} 337 338void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 339 switch (event) { 340 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 341 // to QuicSession since it is the glue. 342 case ENCRYPTION_FIRST_ESTABLISHED: 343 break; 344 345 case ENCRYPTION_REESTABLISHED: 346 // Retransmit originally packets that were sent, since they can't be 347 // decrypted by the peer. 348 connection_->RetransmitUnackedPackets( 349 QuicConnection::INITIAL_ENCRYPTION_ONLY); 350 break; 351 352 case HANDSHAKE_CONFIRMED: 353 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 354 << "Handshake confirmed without parameter negotiation."; 355 connection_->SetIdleNetworkTimeout( 356 config_.idle_connection_state_lifetime()); 357 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 358 max_open_streams_ = config_.max_streams_per_connection(); 359 break; 360 361 default: 362 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 363 } 364} 365 366void QuicSession::OnCryptoHandshakeMessageSent( 367 const CryptoHandshakeMessage& message) { 368} 369 370void QuicSession::OnCryptoHandshakeMessageReceived( 371 const CryptoHandshakeMessage& message) { 372} 373 374QuicConfig* QuicSession::config() { 375 return &config_; 376} 377 378void QuicSession::ActivateStream(ReliableQuicStream* stream) { 379 DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size() 380 << ". activating " << stream->id(); 381 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 382 stream_map_[stream->id()] = stream; 383} 384 385QuicStreamId QuicSession::GetNextStreamId() { 386 QuicStreamId id = next_stream_id_; 387 next_stream_id_ += 2; 388 return id; 389} 390 391ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 392 if (stream_id == kCryptoStreamId) { 393 return GetCryptoStream(); 394 } 395 396 ReliableStreamMap::iterator it = stream_map_.find(stream_id); 397 if (it != stream_map_.end()) { 398 return it->second; 399 } 400 401 if (IsClosedStream(stream_id)) { 402 return NULL; 403 } 404 405 if (stream_id % 2 == next_stream_id_ % 2) { 406 // We've received a frame for a locally-created stream that is not 407 // currently active. This is an error. 408 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 409 return NULL; 410 } 411 412 return GetIncomingReliableStream(stream_id); 413} 414 415ReliableQuicStream* QuicSession::GetIncomingReliableStream( 416 QuicStreamId stream_id) { 417 if (IsClosedStream(stream_id)) { 418 return NULL; 419 } 420 421 if (goaway_sent_) { 422 // We've already sent a GoAway 423 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY); 424 return NULL; 425 } 426 427 implicitly_created_streams_.erase(stream_id); 428 if (stream_id > largest_peer_created_stream_id_) { 429 // TODO(rch) add unit test for this 430 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 431 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 432 return NULL; 433 } 434 if (largest_peer_created_stream_id_ == 0) { 435 largest_peer_created_stream_id_= 1; 436 } 437 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 438 id < stream_id; 439 id += 2) { 440 implicitly_created_streams_.insert(id); 441 } 442 largest_peer_created_stream_id_ = stream_id; 443 } 444 ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id); 445 if (stream == NULL) { 446 return NULL; 447 } 448 ActivateStream(stream); 449 return stream; 450} 451 452bool QuicSession::IsClosedStream(QuicStreamId id) { 453 DCHECK_NE(0u, id); 454 if (id == kCryptoStreamId) { 455 return false; 456 } 457 if (ContainsKey(zombie_streams_, id)) { 458 return true; 459 } 460 if (ContainsKey(stream_map_, id)) { 461 // Stream is active 462 return false; 463 } 464 if (id % 2 == next_stream_id_ % 2) { 465 // Locally created streams are strictly in-order. If the id is in the 466 // range of created streams and it's not active, it must have been closed. 467 return id < next_stream_id_; 468 } 469 // For peer created streams, we also need to consider implicitly created 470 // streams. 471 return id <= largest_peer_created_stream_id_ && 472 implicitly_created_streams_.count(id) == 0; 473} 474 475size_t QuicSession::GetNumOpenStreams() const { 476 return stream_map_.size() + implicitly_created_streams_.size() - 477 zombie_streams_.size(); 478} 479 480void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 481 if (id == kCryptoStreamId) { 482 DCHECK(!has_pending_handshake_); 483 has_pending_handshake_ = true; 484 // TODO(jar): Be sure to use the highest priority for the crypto stream, 485 // perhaps by adding a "special" priority for it that is higher than 486 // kHighestPriority. 487 priority = kHighestPriority; 488 } 489 write_blocked_streams_.PushBack(id, priority); 490} 491 492void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id, 493 QuicStreamId stream_id) { 494 decompression_blocked_streams_[header_id] = stream_id; 495} 496 497bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) { 498 NOTIMPLEMENTED(); 499 return false; 500} 501 502void QuicSession::PostProcessAfterData() { 503 STLDeleteElements(&closed_streams_); 504 closed_streams_.clear(); 505} 506 507} // namespace net 508