quic_session.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_session.h" 6 7#include "base/stl_util.h" 8#include "net/quic/crypto/proof_verifier.h" 9#include "net/quic/quic_connection.h" 10#include "net/quic/quic_headers_stream.h" 11#include "net/ssl/ssl_info.h" 12 13using base::StringPiece; 14using base::hash_map; 15using base::hash_set; 16using std::make_pair; 17using std::vector; 18 19namespace net { 20 21const size_t kMaxPrematurelyClosedStreamsTracked = 20; 22const size_t kMaxZombieStreams = 20; 23 24#define ENDPOINT (is_server() ? "Server: " : " Client: ") 25 26// We want to make sure we delete any closed streams in a safe manner. 27// To avoid deleting a stream in mid-operation, we have a simple shim between 28// us and the stream, so we can delete any streams when we return from 29// processing. 30// 31// We could just override the base methods, but this makes it easier to make 32// sure we don't miss any. 33class VisitorShim : public QuicConnectionVisitorInterface { 34 public: 35 explicit VisitorShim(QuicSession* session) : session_(session) {} 36 37 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 38 bool accepted = session_->OnStreamFrames(frames); 39 session_->PostProcessAfterData(); 40 return accepted; 41 } 42 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 43 session_->OnRstStream(frame); 44 session_->PostProcessAfterData(); 45 } 46 47 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 48 session_->OnGoAway(frame); 49 session_->PostProcessAfterData(); 50 } 51 52 virtual bool OnCanWrite() OVERRIDE { 53 bool rc = session_->OnCanWrite(); 54 session_->PostProcessAfterData(); 55 return rc; 56 } 57 58 virtual void OnSuccessfulVersionNegotiation( 59 const QuicVersion& version) OVERRIDE { 60 session_->OnSuccessfulVersionNegotiation(version); 61 } 62 63 virtual void OnConnectionClosed( 64 QuicErrorCode error, bool from_peer) OVERRIDE { 65 session_->OnConnectionClosed(error, from_peer); 66 // The session will go away, so don't bother with cleanup. 67 } 68 69 virtual void OnWriteBlocked() OVERRIDE { 70 session_->OnWriteBlocked(); 71 } 72 73 virtual bool HasPendingHandshake() const OVERRIDE { 74 return session_->HasPendingHandshake(); 75 } 76 77 private: 78 QuicSession* session_; 79}; 80 81QuicSession::QuicSession(QuicConnection* connection, 82 const QuicConfig& config) 83 : connection_(connection), 84 visitor_shim_(new VisitorShim(this)), 85 config_(config), 86 max_open_streams_(config_.max_streams_per_connection()), 87 next_stream_id_(is_server() ? 2 : 3), 88 largest_peer_created_stream_id_(0), 89 error_(QUIC_NO_ERROR), 90 goaway_received_(false), 91 goaway_sent_(false), 92 has_pending_handshake_(false) { 93 94 connection_->set_visitor(visitor_shim_.get()); 95 connection_->SetFromConfig(config_); 96 if (connection_->connected()) { 97 connection_->SetOverallConnectionTimeout( 98 config_.max_time_before_crypto_handshake()); 99 } 100 if (connection_->version() > QUIC_VERSION_12) { 101 headers_stream_.reset(new QuicHeadersStream(this)); 102 if (!is_server()) { 103 // For version above QUIC v12, the headers stream is stream 3, so the 104 // next available local stream ID should be 5. 105 DCHECK_EQ(kHeadersStreamId, next_stream_id_); 106 next_stream_id_ += 2; 107 } 108 } 109} 110 111QuicSession::~QuicSession() { 112 STLDeleteElements(&closed_streams_); 113 STLDeleteValues(&stream_map_); 114} 115 116bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 117 for (size_t i = 0; i < frames.size(); ++i) { 118 // TODO(rch) deal with the error case of stream id 0 119 if (IsClosedStream(frames[i].stream_id)) { 120 // If we get additional frames for a stream where we didn't process 121 // headers, it's highly likely our compression context will end up 122 // permanently out of sync with the peer's, so we give up and close the 123 // connection. 124 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) { 125 connection()->SendConnectionClose( 126 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 127 return false; 128 } 129 continue; 130 } 131 132 ReliableQuicStream* stream = GetStream(frames[i].stream_id); 133 if (stream == NULL) return false; 134 if (!stream->WillAcceptStreamFrame(frames[i])) return false; 135 136 // TODO(alyssar) check against existing connection address: if changed, make 137 // sure we update the connection. 138 } 139 140 for (size_t i = 0; i < frames.size(); ++i) { 141 QuicStreamId stream_id = frames[i].stream_id; 142 ReliableQuicStream* stream = GetStream(stream_id); 143 if (!stream) { 144 continue; 145 } 146 stream->OnStreamFrame(frames[i]); 147 148 // If the stream is a data stream had been prematurely closed, and the 149 // headers are now decompressed, then we are finally finished 150 // with this stream. 151 if (ContainsKey(zombie_streams_, stream_id) && 152 static_cast<QuicDataStream*>(stream)->headers_decompressed()) { 153 CloseZombieStream(stream_id); 154 } 155 } 156 157 while (!decompression_blocked_streams_.empty()) { 158 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first; 159 if (header_id != decompressor_.current_header_id()) { 160 break; 161 } 162 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second; 163 decompression_blocked_streams_.erase(header_id); 164 QuicDataStream* stream = GetDataStream(stream_id); 165 if (!stream) { 166 connection()->SendConnectionClose( 167 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 168 return false; 169 } 170 stream->OnDecompressorAvailable(); 171 } 172 return true; 173} 174 175void QuicSession::OnStreamHeaders(QuicStreamId stream_id, 176 StringPiece headers_data) { 177 QuicDataStream* stream = GetDataStream(stream_id); 178 if (!stream) { 179 // It's quite possible to receive headers after a stream has been reset. 180 return; 181 } 182 stream->OnStreamHeaders(headers_data); 183} 184 185void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id, 186 QuicPriority priority) { 187 QuicDataStream* stream = GetDataStream(stream_id); 188 if (!stream) { 189 // It's quite possible to receive headers after a stream has been reset. 190 return; 191 } 192 stream->OnStreamHeadersPriority(priority); 193} 194 195void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id, 196 bool fin, 197 size_t frame_len) { 198 QuicDataStream* stream = GetDataStream(stream_id); 199 if (!stream) { 200 // It's quite possible to receive headers after a stream has been reset. 201 return; 202 } 203 stream->OnStreamHeadersComplete(fin, frame_len); 204} 205 206void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 207 if (frame.stream_id == kCryptoStreamId) { 208 connection()->SendConnectionCloseWithDetails( 209 QUIC_INVALID_STREAM_ID, 210 "Attempt to reset the crypto stream"); 211 return; 212 } 213 if (frame.stream_id == kHeadersStreamId && 214 connection()->version() > QUIC_VERSION_12) { 215 connection()->SendConnectionCloseWithDetails( 216 QUIC_INVALID_STREAM_ID, 217 "Attempt to reset the headers stream"); 218 return; 219 } 220 QuicDataStream* stream = GetDataStream(frame.stream_id); 221 if (!stream) { 222 return; // Errors are handled by GetStream. 223 } 224 if (ContainsKey(zombie_streams_, stream->id())) { 225 // If this was a zombie stream then we close it out now. 226 CloseZombieStream(stream->id()); 227 // However, since the headers still have not been decompressed, we want to 228 // mark it a prematurely closed so that if we ever receive frames 229 // for this stream we can close the connection. 230 DCHECK(!stream->headers_decompressed()); 231 AddPrematurelyClosedStream(frame.stream_id); 232 return; 233 } 234 if (connection()->version() <= QUIC_VERSION_12) { 235 if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) { 236 connection()->SendConnectionClose( 237 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 238 } 239 } 240 stream->OnStreamReset(frame); 241} 242 243void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 244 DCHECK(frame.last_good_stream_id < next_stream_id_); 245 goaway_received_ = true; 246} 247 248void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { 249 DCHECK(!connection_->connected()); 250 if (error_ == QUIC_NO_ERROR) { 251 error_ = error; 252 } 253 254 while (!stream_map_.empty()) { 255 DataStreamMap::iterator it = stream_map_.begin(); 256 QuicStreamId id = it->first; 257 it->second->OnConnectionClosed(error, from_peer); 258 // The stream should call CloseStream as part of OnConnectionClosed. 259 if (stream_map_.find(id) != stream_map_.end()) { 260 LOG(DFATAL) << ENDPOINT 261 << "Stream failed to close under OnConnectionClosed"; 262 CloseStream(id); 263 } 264 } 265} 266 267bool QuicSession::OnCanWrite() { 268 // We latch this here rather than doing a traditional loop, because streams 269 // may be modifying the list as we loop. 270 int remaining_writes = write_blocked_streams_.NumBlockedStreams(); 271 272 while (remaining_writes > 0 && connection_->CanWriteStreamData()) { 273 DCHECK(write_blocked_streams_.HasWriteBlockedStreams()); 274 if (!write_blocked_streams_.HasWriteBlockedStreams()) { 275 LOG(DFATAL) << "WriteBlockedStream is missing"; 276 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 277 return true; // We have no write blocked streams. 278 } 279 QuicStreamId stream_id = write_blocked_streams_.PopFront(); 280 if (stream_id == kCryptoStreamId) { 281 has_pending_handshake_ = false; // We just popped it. 282 } 283 ReliableQuicStream* stream = GetStream(stream_id); 284 if (stream != NULL) { 285 // If the stream can't write all bytes, it'll re-add itself to the blocked 286 // list. 287 stream->OnCanWrite(); 288 } 289 --remaining_writes; 290 } 291 292 return !write_blocked_streams_.HasWriteBlockedStreams(); 293} 294 295bool QuicSession::HasPendingHandshake() const { 296 return has_pending_handshake_; 297} 298 299QuicConsumedData QuicSession::WritevData( 300 QuicStreamId id, 301 const struct iovec* iov, 302 int iov_count, 303 QuicStreamOffset offset, 304 bool fin, 305 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 306 IOVector data; 307 data.AppendIovec(iov, iov_count); 308 return connection_->SendStreamData(id, data, offset, fin, 309 ack_notifier_delegate); 310} 311 312size_t QuicSession::WriteHeaders(QuicStreamId id, 313 const SpdyHeaderBlock& headers, 314 bool fin) { 315 DCHECK_LT(QUIC_VERSION_12, connection()->version()); 316 if (connection()->version() <= QUIC_VERSION_12) { 317 return 0; 318 } 319 return headers_stream_->WriteHeaders(id, headers, fin); 320} 321 322void QuicSession::SendRstStream(QuicStreamId id, 323 QuicRstStreamErrorCode error, 324 QuicStreamOffset bytes_written) { 325 connection_->SendRstStream(id, error, bytes_written); 326 CloseStreamInner(id, true); 327} 328 329void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 330 goaway_sent_ = true; 331 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 332} 333 334void QuicSession::CloseStream(QuicStreamId stream_id) { 335 CloseStreamInner(stream_id, false); 336} 337 338void QuicSession::CloseStreamInner(QuicStreamId stream_id, 339 bool locally_reset) { 340 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; 341 342 DataStreamMap::iterator it = stream_map_.find(stream_id); 343 if (it == stream_map_.end()) { 344 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; 345 return; 346 } 347 QuicDataStream* stream = it->second; 348 349 // Tell the stream that a RST has been sent. 350 if (locally_reset) { 351 stream->set_rst_sent(true); 352 } 353 354 if (connection_->version() <= QUIC_VERSION_12 && 355 connection_->connected() && !stream->headers_decompressed()) { 356 // If the stream is being closed locally (for example a client cancelling 357 // a request before receiving the response) then we need to make sure that 358 // we keep the stream alive long enough to process any response or 359 // RST_STREAM frames. 360 if (locally_reset && !is_server()) { 361 AddZombieStream(stream_id); 362 return; 363 } 364 365 // This stream has been closed before the headers were decompressed. 366 // This might cause problems with head of line blocking of headers. 367 // If the peer sent headers which were lost but we now close the stream 368 // we will never be able to decompress headers for other streams. 369 // To deal with this, we keep track of streams which have been closed 370 // prematurely. If we ever receive data frames for this steam, then we 371 // know there actually has been a problem and we close the connection. 372 AddPrematurelyClosedStream(stream->id()); 373 } 374 closed_streams_.push_back(it->second); 375 if (ContainsKey(zombie_streams_, stream->id())) { 376 zombie_streams_.erase(stream->id()); 377 } 378 stream_map_.erase(it); 379 stream->OnClose(); 380} 381 382void QuicSession::AddZombieStream(QuicStreamId stream_id) { 383 if (zombie_streams_.size() == kMaxZombieStreams) { 384 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first; 385 CloseZombieStream(oldest_zombie_stream_id); 386 // However, since the headers still have not been decompressed, we want to 387 // mark it a prematurely closed so that if we ever receive frames 388 // for this stream we can close the connection. 389 AddPrematurelyClosedStream(oldest_zombie_stream_id); 390 } 391 zombie_streams_.insert(make_pair(stream_id, true)); 392} 393 394void QuicSession::CloseZombieStream(QuicStreamId stream_id) { 395 DCHECK(ContainsKey(zombie_streams_, stream_id)); 396 zombie_streams_.erase(stream_id); 397 QuicDataStream* stream = GetDataStream(stream_id); 398 if (!stream) { 399 return; 400 } 401 stream_map_.erase(stream_id); 402 stream->OnClose(); 403 closed_streams_.push_back(stream); 404} 405 406void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) { 407 if (connection()->version() > QUIC_VERSION_12) { 408 return; 409 } 410 if (prematurely_closed_streams_.size() == 411 kMaxPrematurelyClosedStreamsTracked) { 412 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin()); 413 } 414 prematurely_closed_streams_.insert(make_pair(stream_id, true)); 415} 416 417bool QuicSession::IsEncryptionEstablished() { 418 return GetCryptoStream()->encryption_established(); 419} 420 421bool QuicSession::IsCryptoHandshakeConfirmed() { 422 return GetCryptoStream()->handshake_confirmed(); 423} 424 425void QuicSession::OnConfigNegotiated() { 426 connection_->SetFromConfig(config_); 427} 428 429void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 430 switch (event) { 431 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 432 // to QuicSession since it is the glue. 433 case ENCRYPTION_FIRST_ESTABLISHED: 434 break; 435 436 case ENCRYPTION_REESTABLISHED: 437 // Retransmit originally packets that were sent, since they can't be 438 // decrypted by the peer. 439 connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY); 440 break; 441 442 case HANDSHAKE_CONFIRMED: 443 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 444 << "Handshake confirmed without parameter negotiation."; 445 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 446 max_open_streams_ = config_.max_streams_per_connection(); 447 break; 448 449 default: 450 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 451 } 452} 453 454void QuicSession::OnCryptoHandshakeMessageSent( 455 const CryptoHandshakeMessage& message) { 456} 457 458void QuicSession::OnCryptoHandshakeMessageReceived( 459 const CryptoHandshakeMessage& message) { 460} 461 462QuicConfig* QuicSession::config() { 463 return &config_; 464} 465 466void QuicSession::ActivateStream(QuicDataStream* stream) { 467 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size() 468 << ". activating " << stream->id(); 469 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 470 stream_map_[stream->id()] = stream; 471} 472 473QuicStreamId QuicSession::GetNextStreamId() { 474 QuicStreamId id = next_stream_id_; 475 next_stream_id_ += 2; 476 return id; 477} 478 479ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 480 if (stream_id == kCryptoStreamId) { 481 return GetCryptoStream(); 482 } 483 if (stream_id == kHeadersStreamId && 484 connection_->version() > QUIC_VERSION_12) { 485 return headers_stream_.get(); 486 } 487 return GetDataStream(stream_id); 488} 489 490QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) { 491 if (stream_id == kCryptoStreamId) { 492 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id"; 493 return NULL; 494 } 495 if (stream_id == kHeadersStreamId && 496 connection_->version() > QUIC_VERSION_12) { 497 DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id"; 498 return NULL; 499 } 500 501 DataStreamMap::iterator it = stream_map_.find(stream_id); 502 if (it != stream_map_.end()) { 503 return it->second; 504 } 505 506 if (IsClosedStream(stream_id)) { 507 return NULL; 508 } 509 510 if (stream_id % 2 == next_stream_id_ % 2) { 511 // We've received a frame for a locally-created stream that is not 512 // currently active. This is an error. 513 if (connection()->connected()) { 514 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 515 } 516 return NULL; 517 } 518 519 return GetIncomingDataStream(stream_id); 520} 521 522QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) { 523 if (IsClosedStream(stream_id)) { 524 return NULL; 525 } 526 527 if (goaway_sent_) { 528 // We've already sent a GoAway 529 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); 530 return NULL; 531 } 532 533 implicitly_created_streams_.erase(stream_id); 534 if (stream_id > largest_peer_created_stream_id_) { 535 // TODO(rch) add unit test for this 536 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 537 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 538 return NULL; 539 } 540 if (largest_peer_created_stream_id_ == 0) { 541 if (is_server() && connection()->version() > QUIC_VERSION_12) { 542 largest_peer_created_stream_id_= 3; 543 } else { 544 largest_peer_created_stream_id_= 1; 545 } 546 } 547 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 548 id < stream_id; 549 id += 2) { 550 implicitly_created_streams_.insert(id); 551 } 552 largest_peer_created_stream_id_ = stream_id; 553 } 554 QuicDataStream* stream = CreateIncomingDataStream(stream_id); 555 if (stream == NULL) { 556 return NULL; 557 } 558 ActivateStream(stream); 559 return stream; 560} 561 562bool QuicSession::IsClosedStream(QuicStreamId id) { 563 DCHECK_NE(0u, id); 564 if (id == kCryptoStreamId) { 565 return false; 566 } 567 if (connection()->version() > QUIC_VERSION_12) { 568 if (id == kHeadersStreamId) { 569 return false; 570 } 571 } 572 if (ContainsKey(zombie_streams_, id)) { 573 return true; 574 } 575 if (ContainsKey(stream_map_, id)) { 576 // Stream is active 577 return false; 578 } 579 if (id % 2 == next_stream_id_ % 2) { 580 // Locally created streams are strictly in-order. If the id is in the 581 // range of created streams and it's not active, it must have been closed. 582 return id < next_stream_id_; 583 } 584 // For peer created streams, we also need to consider implicitly created 585 // streams. 586 return id <= largest_peer_created_stream_id_ && 587 implicitly_created_streams_.count(id) == 0; 588} 589 590size_t QuicSession::GetNumOpenStreams() const { 591 return stream_map_.size() + implicitly_created_streams_.size() - 592 zombie_streams_.size(); 593} 594 595void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 596#ifndef NDEBUG 597 ReliableQuicStream* stream = GetStream(id); 598 if (stream != NULL) { 599 LOG_IF(DFATAL, priority != stream->EffectivePriority()) 600 << "Priorities do not match. Got: " << priority 601 << " Expected: " << stream->EffectivePriority(); 602 } else { 603 LOG(DFATAL) << "Marking unknown stream " << id << " blocked."; 604 } 605#endif 606 607 if (id == kCryptoStreamId) { 608 DCHECK(!has_pending_handshake_); 609 has_pending_handshake_ = true; 610 // TODO(jar): Be sure to use the highest priority for the crypto stream, 611 // perhaps by adding a "special" priority for it that is higher than 612 // kHighestPriority. 613 priority = kHighestPriority; 614 } 615 write_blocked_streams_.PushBack(id, priority, connection()->version()); 616} 617 618bool QuicSession::HasDataToWrite() const { 619 return write_blocked_streams_.HasWriteBlockedStreams() || 620 connection_->HasQueuedData(); 621} 622 623void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id, 624 QuicStreamId stream_id) { 625 DCHECK_GE(QUIC_VERSION_12, connection()->version()); 626 decompression_blocked_streams_[header_id] = stream_id; 627} 628 629bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) { 630 NOTIMPLEMENTED(); 631 return false; 632} 633 634void QuicSession::PostProcessAfterData() { 635 STLDeleteElements(&closed_streams_); 636 closed_streams_.clear(); 637} 638 639} // namespace net 640