quic_session.cc revision 1e9bf3e0803691d0a228da41fc608347b6db4340
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_session.h" 6 7#include "base/stl_util.h" 8#include "net/quic/crypto/proof_verifier.h" 9#include "net/quic/quic_connection.h" 10#include "net/ssl/ssl_info.h" 11 12using base::StringPiece; 13using base::hash_map; 14using base::hash_set; 15using std::make_pair; 16using std::vector; 17 18namespace net { 19 20const size_t kMaxPrematurelyClosedStreamsTracked = 20; 21const size_t kMaxZombieStreams = 20; 22 23#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 24 25// We want to make sure we delete any closed streams in a safe manner. 26// To avoid deleting a stream in mid-operation, we have a simple shim between 27// us and the stream, so we can delete any streams when we return from 28// processing. 29// 30// We could just override the base methods, but this makes it easier to make 31// sure we don't miss any. 32class VisitorShim : public QuicConnectionVisitorInterface { 33 public: 34 explicit VisitorShim(QuicSession* session) : session_(session) {} 35 36 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 37 bool accepted = session_->OnStreamFrames(frames); 38 session_->PostProcessAfterData(); 39 return accepted; 40 } 41 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 42 session_->OnRstStream(frame); 43 session_->PostProcessAfterData(); 44 } 45 46 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 47 session_->OnGoAway(frame); 48 session_->PostProcessAfterData(); 49 } 50 51 virtual bool OnCanWrite() OVERRIDE { 52 bool rc = session_->OnCanWrite(); 53 session_->PostProcessAfterData(); 54 return rc; 55 } 56 57 virtual void OnSuccessfulVersionNegotiation( 58 const QuicVersion& version) OVERRIDE { 59 session_->OnSuccessfulVersionNegotiation(version); 60 } 61 62 virtual void OnConnectionClosed(QuicErrorCode error, 63 bool from_peer) OVERRIDE { 64 session_->OnConnectionClosed(error, from_peer); 65 // The session will go away, so don't bother with cleanup. 66 } 67 68 virtual bool HasPendingHandshake() const OVERRIDE { 69 return session_->HasPendingHandshake(); 70 } 71 72 private: 73 QuicSession* session_; 74}; 75 76QuicSession::QuicSession(QuicConnection* connection, 77 const QuicConfig& config, 78 bool is_server) 79 : connection_(connection), 80 visitor_shim_(new VisitorShim(this)), 81 config_(config), 82 max_open_streams_(config_.max_streams_per_connection()), 83 next_stream_id_(is_server ? 2 : 3), 84 is_server_(is_server), 85 largest_peer_created_stream_id_(0), 86 error_(QUIC_NO_ERROR), 87 goaway_received_(false), 88 goaway_sent_(false), 89 has_pending_handshake_(false) { 90 91 connection_->set_visitor(visitor_shim_.get()); 92 connection_->SetIdleNetworkTimeout(config_.idle_connection_state_lifetime()); 93 if (connection_->connected()) { 94 connection_->SetOverallConnectionTimeout( 95 config_.max_time_before_crypto_handshake()); 96 } 97 // TODO(satyamshekhar): Set congestion control and ICSL also. 98} 99 100QuicSession::~QuicSession() { 101 STLDeleteElements(&closed_streams_); 102 STLDeleteValues(&stream_map_); 103} 104 105bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 106 for (size_t i = 0; i < frames.size(); ++i) { 107 // TODO(rch) deal with the error case of stream id 0 108 if (IsClosedStream(frames[i].stream_id)) { 109 // If we get additional frames for a stream where we didn't process 110 // headers, it's highly likely our compression context will end up 111 // permanently out of sync with the peer's, so we give up and close the 112 // connection. 113 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) { 114 connection()->SendConnectionClose( 115 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 116 return false; 117 } 118 continue; 119 } 120 121 ReliableQuicStream* stream = GetStream(frames[i].stream_id); 122 if (stream == NULL) return false; 123 if (!stream->WillAcceptStreamFrame(frames[i])) return false; 124 125 // TODO(alyssar) check against existing connection address: if changed, make 126 // sure we update the connection. 127 } 128 129 for (size_t i = 0; i < frames.size(); ++i) { 130 QuicStreamId stream_id = frames[i].stream_id; 131 ReliableQuicStream* stream = GetStream(stream_id); 132 if (!stream) { 133 continue; 134 } 135 stream->OnStreamFrame(frames[i]); 136 137 // If the stream had been prematurely closed, and the 138 // headers are now decompressed, then we are finally finished 139 // with this stream. 140 if (ContainsKey(zombie_streams_, stream_id) && 141 stream->headers_decompressed()) { 142 CloseZombieStream(stream_id); 143 } 144 } 145 146 while (!decompression_blocked_streams_.empty()) { 147 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first; 148 if (header_id != decompressor_.current_header_id()) { 149 break; 150 } 151 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second; 152 decompression_blocked_streams_.erase(header_id); 153 ReliableQuicStream* stream = GetStream(stream_id); 154 if (!stream) { 155 connection()->SendConnectionClose( 156 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 157 return false; 158 } 159 stream->OnDecompressorAvailable(); 160 } 161 return true; 162} 163 164void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 165 ReliableQuicStream* stream = GetStream(frame.stream_id); 166 if (!stream) { 167 return; // Errors are handled by GetStream. 168 } 169 if (ContainsKey(zombie_streams_, stream->id())) { 170 // If this was a zombie stream then we close it out now. 171 CloseZombieStream(stream->id()); 172 // However, since the headers still have not been decompressed, we want to 173 // mark it a prematurely closed so that if we ever receive frames 174 // for this stream we can close the connection. 175 DCHECK(!stream->headers_decompressed()); 176 AddPrematurelyClosedStream(frame.stream_id); 177 return; 178 } 179 stream->OnStreamReset(frame.error_code); 180} 181 182void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 183 DCHECK(frame.last_good_stream_id < next_stream_id_); 184 goaway_received_ = true; 185} 186 187void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { 188 DCHECK(!connection_->connected()); 189 if (error_ == QUIC_NO_ERROR) { 190 error_ = error; 191 } 192 193 while (stream_map_.size() != 0) { 194 ReliableStreamMap::iterator it = stream_map_.begin(); 195 QuicStreamId id = it->first; 196 it->second->OnConnectionClosed(error, from_peer); 197 // The stream should call CloseStream as part of OnConnectionClosed. 198 if (stream_map_.find(id) != stream_map_.end()) { 199 LOG(DFATAL) << ENDPOINT 200 << "Stream failed to close under OnConnectionClosed"; 201 CloseStream(id); 202 } 203 } 204} 205 206bool QuicSession::OnCanWrite() { 207 // We latch this here rather than doing a traditional loop, because streams 208 // may be modifying the list as we loop. 209 int remaining_writes = write_blocked_streams_.NumBlockedStreams(); 210 211 while (!connection_->HasQueuedData() && 212 remaining_writes > 0) { 213 DCHECK(write_blocked_streams_.HasWriteBlockedStreams()); 214 int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList(); 215 if (index == -1) { 216 LOG(DFATAL) << "WriteBlockedStream is missing"; 217 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 218 return true; // We have no write blocked streams. 219 } 220 QuicStreamId stream_id = write_blocked_streams_.PopFront(index); 221 if (stream_id == kCryptoStreamId) { 222 has_pending_handshake_ = false; // We just popped it. 223 } 224 ReliableQuicStream* stream = GetStream(stream_id); 225 if (stream != NULL) { 226 // If the stream can't write all bytes, it'll re-add itself to the blocked 227 // list. 228 stream->OnCanWrite(); 229 } 230 --remaining_writes; 231 } 232 233 return !write_blocked_streams_.HasWriteBlockedStreams(); 234} 235 236bool QuicSession::HasPendingHandshake() const { 237 return has_pending_handshake_; 238} 239 240QuicConsumedData QuicSession::WritevData(QuicStreamId id, 241 const struct iovec* iov, 242 int iov_count, 243 QuicStreamOffset offset, 244 bool fin) { 245 IOVector data; 246 data.AppendIovec(iov, iov_count); 247 return connection_->SendStreamData(id, data, offset, fin); 248} 249 250void QuicSession::SendRstStream(QuicStreamId id, 251 QuicRstStreamErrorCode error) { 252 connection_->SendRstStream(id, error); 253 CloseStreamInner(id, true); 254} 255 256void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 257 goaway_sent_ = true; 258 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 259} 260 261void QuicSession::CloseStream(QuicStreamId stream_id) { 262 CloseStreamInner(stream_id, false); 263} 264 265void QuicSession::CloseStreamInner(QuicStreamId stream_id, 266 bool locally_reset) { 267 DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id; 268 269 ReliableStreamMap::iterator it = stream_map_.find(stream_id); 270 if (it == stream_map_.end()) { 271 DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id; 272 return; 273 } 274 ReliableQuicStream* stream = it->second; 275 if (connection_->connected() && !stream->headers_decompressed()) { 276 // If the stream is being closed locally (for example a client cancelling 277 // a request before receiving the response) then we need to make sure that 278 // we keep the stream alive long enough to process any response or 279 // RST_STREAM frames. 280 if (locally_reset && !is_server_) { 281 AddZombieStream(stream_id); 282 return; 283 } 284 285 // This stream has been closed before the headers were decompressed. 286 // This might cause problems with head of line blocking of headers. 287 // If the peer sent headers which were lost but we now close the stream 288 // we will never be able to decompress headers for other streams. 289 // To deal with this, we keep track of streams which have been closed 290 // prematurely. If we ever receive data frames for this steam, then we 291 // know there actually has been a problem and we close the connection. 292 AddPrematurelyClosedStream(stream->id()); 293 } 294 closed_streams_.push_back(it->second); 295 if (ContainsKey(zombie_streams_, stream->id())) { 296 zombie_streams_.erase(stream->id()); 297 } 298 stream_map_.erase(it); 299 stream->OnClose(); 300} 301 302void QuicSession::AddZombieStream(QuicStreamId stream_id) { 303 if (zombie_streams_.size() == kMaxZombieStreams) { 304 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first; 305 CloseZombieStream(oldest_zombie_stream_id); 306 // However, since the headers still have not been decompressed, we want to 307 // mark it a prematurely closed so that if we ever receive frames 308 // for this stream we can close the connection. 309 AddPrematurelyClosedStream(oldest_zombie_stream_id); 310 } 311 zombie_streams_.insert(make_pair(stream_id, true)); 312} 313 314void QuicSession::CloseZombieStream(QuicStreamId stream_id) { 315 DCHECK(ContainsKey(zombie_streams_, stream_id)); 316 zombie_streams_.erase(stream_id); 317 ReliableQuicStream* stream = GetStream(stream_id); 318 if (!stream) { 319 return; 320 } 321 stream_map_.erase(stream_id); 322 stream->OnClose(); 323 closed_streams_.push_back(stream); 324} 325 326void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) { 327 if (prematurely_closed_streams_.size() == 328 kMaxPrematurelyClosedStreamsTracked) { 329 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin()); 330 } 331 prematurely_closed_streams_.insert(make_pair(stream_id, true)); 332} 333 334bool QuicSession::IsEncryptionEstablished() { 335 return GetCryptoStream()->encryption_established(); 336} 337 338bool QuicSession::IsCryptoHandshakeConfirmed() { 339 return GetCryptoStream()->handshake_confirmed(); 340} 341 342void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 343 switch (event) { 344 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 345 // to QuicSession since it is the glue. 346 case ENCRYPTION_FIRST_ESTABLISHED: 347 break; 348 349 case ENCRYPTION_REESTABLISHED: 350 // Retransmit originally packets that were sent, since they can't be 351 // decrypted by the peer. 352 connection_->RetransmitUnackedPackets( 353 QuicConnection::INITIAL_ENCRYPTION_ONLY); 354 break; 355 356 case HANDSHAKE_CONFIRMED: 357 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 358 << "Handshake confirmed without parameter negotiation."; 359 connection_->SetIdleNetworkTimeout( 360 config_.idle_connection_state_lifetime()); 361 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 362 max_open_streams_ = config_.max_streams_per_connection(); 363 break; 364 365 default: 366 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 367 } 368} 369 370void QuicSession::OnCryptoHandshakeMessageSent( 371 const CryptoHandshakeMessage& message) { 372} 373 374void QuicSession::OnCryptoHandshakeMessageReceived( 375 const CryptoHandshakeMessage& message) { 376} 377 378QuicConfig* QuicSession::config() { 379 return &config_; 380} 381 382void QuicSession::ActivateStream(ReliableQuicStream* stream) { 383 DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size() 384 << ". activating " << stream->id(); 385 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 386 stream_map_[stream->id()] = stream; 387} 388 389QuicStreamId QuicSession::GetNextStreamId() { 390 QuicStreamId id = next_stream_id_; 391 next_stream_id_ += 2; 392 return id; 393} 394 395ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 396 if (stream_id == kCryptoStreamId) { 397 return GetCryptoStream(); 398 } 399 400 ReliableStreamMap::iterator it = stream_map_.find(stream_id); 401 if (it != stream_map_.end()) { 402 return it->second; 403 } 404 405 if (IsClosedStream(stream_id)) { 406 return NULL; 407 } 408 409 if (stream_id % 2 == next_stream_id_ % 2) { 410 // We've received a frame for a locally-created stream that is not 411 // currently active. This is an error. 412 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 413 return NULL; 414 } 415 416 return GetIncomingReliableStream(stream_id); 417} 418 419ReliableQuicStream* QuicSession::GetIncomingReliableStream( 420 QuicStreamId stream_id) { 421 if (IsClosedStream(stream_id)) { 422 return NULL; 423 } 424 425 if (goaway_sent_) { 426 // We've already sent a GoAway 427 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY); 428 return NULL; 429 } 430 431 implicitly_created_streams_.erase(stream_id); 432 if (stream_id > largest_peer_created_stream_id_) { 433 // TODO(rch) add unit test for this 434 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 435 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 436 return NULL; 437 } 438 if (largest_peer_created_stream_id_ == 0) { 439 largest_peer_created_stream_id_= 1; 440 } 441 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 442 id < stream_id; 443 id += 2) { 444 implicitly_created_streams_.insert(id); 445 } 446 largest_peer_created_stream_id_ = stream_id; 447 } 448 ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id); 449 if (stream == NULL) { 450 return NULL; 451 } 452 ActivateStream(stream); 453 return stream; 454} 455 456bool QuicSession::IsClosedStream(QuicStreamId id) { 457 DCHECK_NE(0u, id); 458 if (id == kCryptoStreamId) { 459 return false; 460 } 461 if (ContainsKey(zombie_streams_, id)) { 462 return true; 463 } 464 if (ContainsKey(stream_map_, id)) { 465 // Stream is active 466 return false; 467 } 468 if (id % 2 == next_stream_id_ % 2) { 469 // Locally created streams are strictly in-order. If the id is in the 470 // range of created streams and it's not active, it must have been closed. 471 return id < next_stream_id_; 472 } 473 // For peer created streams, we also need to consider implicitly created 474 // streams. 475 return id <= largest_peer_created_stream_id_ && 476 implicitly_created_streams_.count(id) == 0; 477} 478 479size_t QuicSession::GetNumOpenStreams() const { 480 return stream_map_.size() + implicitly_created_streams_.size() - 481 zombie_streams_.size(); 482} 483 484void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 485 if (id == kCryptoStreamId) { 486 DCHECK(!has_pending_handshake_); 487 has_pending_handshake_ = true; 488 // TODO(jar): Be sure to use the highest priority for the crypto stream, 489 // perhaps by adding a "special" priority for it that is higher than 490 // kHighestPriority. 491 priority = kHighestPriority; 492 } 493 write_blocked_streams_.PushBack(id, priority); 494} 495 496void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id, 497 QuicStreamId stream_id) { 498 decompression_blocked_streams_[header_id] = stream_id; 499} 500 501bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) { 502 NOTIMPLEMENTED(); 503 return false; 504} 505 506void QuicSession::PostProcessAfterData() { 507 STLDeleteElements(&closed_streams_); 508 closed_streams_.clear(); 509} 510 511} // namespace net 512