quic_session.cc revision 5f1c94371a64b3196d4be9466099bb892df9b88e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_session.h" 6 7#include "base/stl_util.h" 8#include "net/quic/crypto/proof_verifier.h" 9#include "net/quic/quic_connection.h" 10#include "net/quic/quic_flags.h" 11#include "net/quic/quic_flow_controller.h" 12#include "net/quic/quic_headers_stream.h" 13#include "net/ssl/ssl_info.h" 14 15using base::StringPiece; 16using base::hash_map; 17using base::hash_set; 18using std::make_pair; 19using std::vector; 20 21namespace net { 22 23#define ENDPOINT (is_server() ? "Server: " : " Client: ") 24 25// We want to make sure we delete any closed streams in a safe manner. 26// To avoid deleting a stream in mid-operation, we have a simple shim between 27// us and the stream, so we can delete any streams when we return from 28// processing. 29// 30// We could just override the base methods, but this makes it easier to make 31// sure we don't miss any. 32class VisitorShim : public QuicConnectionVisitorInterface { 33 public: 34 explicit VisitorShim(QuicSession* session) : session_(session) {} 35 36 virtual void OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 37 session_->OnStreamFrames(frames); 38 session_->PostProcessAfterData(); 39 } 40 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 41 session_->OnRstStream(frame); 42 session_->PostProcessAfterData(); 43 } 44 45 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 46 session_->OnGoAway(frame); 47 session_->PostProcessAfterData(); 48 } 49 50 virtual void OnWindowUpdateFrames(const vector<QuicWindowUpdateFrame>& frames) 51 OVERRIDE { 52 session_->OnWindowUpdateFrames(frames); 53 session_->PostProcessAfterData(); 54 } 55 56 virtual void OnBlockedFrames(const vector<QuicBlockedFrame>& frames) 57 OVERRIDE { 58 session_->OnBlockedFrames(frames); 59 session_->PostProcessAfterData(); 60 } 61 62 virtual void OnCanWrite() OVERRIDE { 63 session_->OnCanWrite(); 64 session_->PostProcessAfterData(); 65 } 66 67 virtual void OnSuccessfulVersionNegotiation( 68 const QuicVersion& version) OVERRIDE { 69 session_->OnSuccessfulVersionNegotiation(version); 70 } 71 72 virtual void OnConnectionClosed( 73 QuicErrorCode error, bool from_peer) OVERRIDE { 74 session_->OnConnectionClosed(error, from_peer); 75 // The session will go away, so don't bother with cleanup. 76 } 77 78 virtual void OnWriteBlocked() OVERRIDE { 79 session_->OnWriteBlocked(); 80 } 81 82 virtual bool WillingAndAbleToWrite() const OVERRIDE { 83 return session_->WillingAndAbleToWrite(); 84 } 85 86 virtual bool HasPendingHandshake() const OVERRIDE { 87 return session_->HasPendingHandshake(); 88 } 89 90 virtual bool HasOpenDataStreams() const OVERRIDE { 91 return session_->HasOpenDataStreams(); 92 } 93 94 private: 95 QuicSession* session_; 96}; 97 98QuicSession::QuicSession(QuicConnection* connection, const QuicConfig& config) 99 : connection_(connection), 100 visitor_shim_(new VisitorShim(this)), 101 config_(config), 102 max_open_streams_(config_.max_streams_per_connection()), 103 next_stream_id_(is_server() ? 2 : 3), 104 largest_peer_created_stream_id_(0), 105 error_(QUIC_NO_ERROR), 106 goaway_received_(false), 107 goaway_sent_(false), 108 has_pending_handshake_(false) { 109 if (connection_->version() <= QUIC_VERSION_19) { 110 flow_controller_.reset(new QuicFlowController( 111 connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow, 112 config_.GetInitialFlowControlWindowToSend(), 113 config_.GetInitialFlowControlWindowToSend())); 114 } else { 115 flow_controller_.reset(new QuicFlowController( 116 connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow, 117 config_.GetInitialSessionFlowControlWindowToSend(), 118 config_.GetInitialSessionFlowControlWindowToSend())); 119 } 120} 121 122void QuicSession::InitializeSession() { 123 connection_->set_visitor(visitor_shim_.get()); 124 connection_->SetFromConfig(config_); 125 if (connection_->connected()) { 126 connection_->SetOverallConnectionTimeout( 127 config_.max_time_before_crypto_handshake()); 128 } 129 headers_stream_.reset(new QuicHeadersStream(this)); 130 if (!is_server()) { 131 // For version above QUIC v12, the headers stream is stream 3, so the 132 // next available local stream ID should be 5. 133 DCHECK_EQ(kHeadersStreamId, next_stream_id_); 134 next_stream_id_ += 2; 135 } 136} 137 138QuicSession::~QuicSession() { 139 STLDeleteElements(&closed_streams_); 140 STLDeleteValues(&stream_map_); 141 142 DLOG_IF(WARNING, 143 locally_closed_streams_highest_offset_.size() > max_open_streams_) 144 << "Surprisingly high number of locally closed streams still waiting for " 145 "final byte offset: " << locally_closed_streams_highest_offset_.size(); 146} 147 148void QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 149 for (size_t i = 0; i < frames.size(); ++i) { 150 // TODO(rch) deal with the error case of stream id 0. 151 const QuicStreamFrame& frame = frames[i]; 152 QuicStreamId stream_id = frame.stream_id; 153 ReliableQuicStream* stream = GetStream(stream_id); 154 if (!stream) { 155 // The stream no longer exists, but we may still be interested in the 156 // final stream byte offset sent by the peer. A frame with a FIN can give 157 // us this offset. 158 if (frame.fin) { 159 QuicStreamOffset final_byte_offset = 160 frame.offset + frame.data.TotalBufferSize(); 161 UpdateFlowControlOnFinalReceivedByteOffset(stream_id, 162 final_byte_offset); 163 } 164 165 continue; 166 } 167 stream->OnStreamFrame(frames[i]); 168 } 169} 170 171void QuicSession::OnStreamHeaders(QuicStreamId stream_id, 172 StringPiece headers_data) { 173 QuicDataStream* stream = GetDataStream(stream_id); 174 if (!stream) { 175 // It's quite possible to receive headers after a stream has been reset. 176 return; 177 } 178 stream->OnStreamHeaders(headers_data); 179} 180 181void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id, 182 QuicPriority priority) { 183 QuicDataStream* stream = GetDataStream(stream_id); 184 if (!stream) { 185 // It's quite possible to receive headers after a stream has been reset. 186 return; 187 } 188 stream->OnStreamHeadersPriority(priority); 189} 190 191void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id, 192 bool fin, 193 size_t frame_len) { 194 QuicDataStream* stream = GetDataStream(stream_id); 195 if (!stream) { 196 // It's quite possible to receive headers after a stream has been reset. 197 return; 198 } 199 stream->OnStreamHeadersComplete(fin, frame_len); 200} 201 202void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 203 if (frame.stream_id == kCryptoStreamId) { 204 connection()->SendConnectionCloseWithDetails( 205 QUIC_INVALID_STREAM_ID, 206 "Attempt to reset the crypto stream"); 207 return; 208 } 209 if (frame.stream_id == kHeadersStreamId) { 210 connection()->SendConnectionCloseWithDetails( 211 QUIC_INVALID_STREAM_ID, 212 "Attempt to reset the headers stream"); 213 return; 214 } 215 216 QuicDataStream* stream = GetDataStream(frame.stream_id); 217 if (!stream) { 218 // The RST frame contains the final byte offset for the stream: we can now 219 // update the connection level flow controller if needed. 220 UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id, 221 frame.byte_offset); 222 return; // Errors are handled by GetStream. 223 } 224 225 stream->OnStreamReset(frame); 226} 227 228void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 229 DCHECK(frame.last_good_stream_id < next_stream_id_); 230 goaway_received_ = true; 231} 232 233void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { 234 DCHECK(!connection_->connected()); 235 if (error_ == QUIC_NO_ERROR) { 236 error_ = error; 237 } 238 239 while (!stream_map_.empty()) { 240 DataStreamMap::iterator it = stream_map_.begin(); 241 QuicStreamId id = it->first; 242 it->second->OnConnectionClosed(error, from_peer); 243 // The stream should call CloseStream as part of OnConnectionClosed. 244 if (stream_map_.find(id) != stream_map_.end()) { 245 LOG(DFATAL) << ENDPOINT 246 << "Stream failed to close under OnConnectionClosed"; 247 CloseStream(id); 248 } 249 } 250} 251 252void QuicSession::OnWindowUpdateFrames( 253 const vector<QuicWindowUpdateFrame>& frames) { 254 bool connection_window_updated = false; 255 for (size_t i = 0; i < frames.size(); ++i) { 256 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't 257 // assume that it still exists. 258 QuicStreamId stream_id = frames[i].stream_id; 259 if (stream_id == kConnectionLevelId) { 260 // This is a window update that applies to the connection, rather than an 261 // individual stream. 262 DVLOG(1) << ENDPOINT 263 << "Received connection level flow control window update with " 264 "byte offset: " << frames[i].byte_offset; 265 if (FLAGS_enable_quic_connection_flow_control_2 && 266 flow_controller_->UpdateSendWindowOffset(frames[i].byte_offset)) { 267 connection_window_updated = true; 268 } 269 continue; 270 } 271 272 if (connection_->version() <= QUIC_VERSION_20 && 273 (stream_id == kCryptoStreamId || stream_id == kHeadersStreamId)) { 274 DLOG(DFATAL) << "WindowUpdate for stream " << stream_id << " in version " 275 << QuicVersionToString(connection_->version()); 276 return; 277 } 278 279 ReliableQuicStream* stream = GetStream(stream_id); 280 if (stream) { 281 stream->OnWindowUpdateFrame(frames[i]); 282 } 283 } 284 285 // Connection level flow control window has increased, so blocked streams can 286 // write again. 287 if (connection_window_updated) { 288 OnCanWrite(); 289 } 290} 291 292void QuicSession::OnBlockedFrames(const vector<QuicBlockedFrame>& frames) { 293 for (size_t i = 0; i < frames.size(); ++i) { 294 // TODO(rjshade): Compare our flow control receive windows for specified 295 // streams: if we have a large window then maybe something 296 // had gone wrong with the flow control accounting. 297 DVLOG(1) << ENDPOINT << "Received BLOCKED frame with stream id: " 298 << frames[i].stream_id; 299 } 300} 301 302void QuicSession::OnCanWrite() { 303 // We limit the number of writes to the number of pending streams. If more 304 // streams become pending, WillingAndAbleToWrite will be true, which will 305 // cause the connection to request resumption before yielding to other 306 // connections. 307 size_t num_writes = write_blocked_streams_.NumBlockedStreams(); 308 if (flow_controller_->IsBlocked()) { 309 // If we are connection level flow control blocked, then only allow the 310 // crypto and headers streams to try writing as all other streams will be 311 // blocked. 312 num_writes = 0; 313 if (write_blocked_streams_.crypto_stream_blocked()) { 314 num_writes += 1; 315 } 316 if (write_blocked_streams_.headers_stream_blocked()) { 317 num_writes += 1; 318 } 319 } 320 if (num_writes == 0) { 321 return; 322 } 323 324 QuicConnection::ScopedPacketBundler ack_bundler( 325 connection_.get(), QuicConnection::NO_ACK); 326 for (size_t i = 0; i < num_writes; ++i) { 327 if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 328 write_blocked_streams_.HasWriteBlockedDataStreams())) { 329 // Writing one stream removed another!? Something's broken. 330 LOG(DFATAL) << "WriteBlockedStream is missing"; 331 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 332 return; 333 } 334 if (!connection_->CanWriteStreamData()) { 335 return; 336 } 337 QuicStreamId stream_id = write_blocked_streams_.PopFront(); 338 if (stream_id == kCryptoStreamId) { 339 has_pending_handshake_ = false; // We just popped it. 340 } 341 ReliableQuicStream* stream = GetStream(stream_id); 342 if (stream != NULL && !stream->flow_controller()->IsBlocked()) { 343 // If the stream can't write all bytes, it'll re-add itself to the blocked 344 // list. 345 stream->OnCanWrite(); 346 } 347 } 348} 349 350bool QuicSession::WillingAndAbleToWrite() const { 351 // If the crypto or headers streams are blocked, we want to schedule a write - 352 // they don't get blocked by connection level flow control. Otherwise only 353 // schedule a write if we are not flow control blocked at the connection 354 // level. 355 return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 356 (!flow_controller_->IsBlocked() && 357 write_blocked_streams_.HasWriteBlockedDataStreams()); 358} 359 360bool QuicSession::HasPendingHandshake() const { 361 return has_pending_handshake_; 362} 363 364bool QuicSession::HasOpenDataStreams() const { 365 return GetNumOpenStreams() > 0; 366} 367 368QuicConsumedData QuicSession::WritevData( 369 QuicStreamId id, 370 const IOVector& data, 371 QuicStreamOffset offset, 372 bool fin, 373 FecProtection fec_protection, 374 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 375 return connection_->SendStreamData(id, data, offset, fin, fec_protection, 376 ack_notifier_delegate); 377} 378 379size_t QuicSession::WriteHeaders( 380 QuicStreamId id, 381 const SpdyHeaderBlock& headers, 382 bool fin, 383 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 384 return headers_stream_->WriteHeaders(id, headers, fin, ack_notifier_delegate); 385} 386 387void QuicSession::SendRstStream(QuicStreamId id, 388 QuicRstStreamErrorCode error, 389 QuicStreamOffset bytes_written) { 390 if (connection()->connected()) { 391 // Only send a RST_STREAM frame if still connected. 392 connection_->SendRstStream(id, error, bytes_written); 393 } 394 CloseStreamInner(id, true); 395} 396 397void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 398 if (goaway_sent_) { 399 return; 400 } 401 goaway_sent_ = true; 402 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 403} 404 405void QuicSession::CloseStream(QuicStreamId stream_id) { 406 CloseStreamInner(stream_id, false); 407} 408 409void QuicSession::CloseStreamInner(QuicStreamId stream_id, 410 bool locally_reset) { 411 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; 412 413 DataStreamMap::iterator it = stream_map_.find(stream_id); 414 if (it == stream_map_.end()) { 415 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; 416 return; 417 } 418 QuicDataStream* stream = it->second; 419 420 // Tell the stream that a RST has been sent. 421 if (locally_reset) { 422 stream->set_rst_sent(true); 423 } 424 425 closed_streams_.push_back(it->second); 426 427 // If we haven't received a FIN or RST for this stream, we need to keep track 428 // of the how many bytes the stream's flow controller believes it has 429 // received, for accurate connection level flow control accounting. 430 if (!stream->HasFinalReceivedByteOffset() && 431 stream->flow_controller()->IsEnabled() && 432 FLAGS_enable_quic_connection_flow_control_2) { 433 locally_closed_streams_highest_offset_[stream_id] = 434 stream->flow_controller()->highest_received_byte_offset(); 435 } 436 437 stream_map_.erase(it); 438 stream->OnClose(); 439} 440 441void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset( 442 QuicStreamId stream_id, QuicStreamOffset final_byte_offset) { 443 if (!FLAGS_enable_quic_connection_flow_control_2) { 444 return; 445 } 446 447 map<QuicStreamId, QuicStreamOffset>::iterator it = 448 locally_closed_streams_highest_offset_.find(stream_id); 449 if (it == locally_closed_streams_highest_offset_.end()) { 450 return; 451 } 452 453 DVLOG(1) << ENDPOINT << "Received final byte offset " << final_byte_offset 454 << " for stream " << stream_id; 455 uint64 offset_diff = final_byte_offset - it->second; 456 if (flow_controller_->UpdateHighestReceivedOffset( 457 flow_controller_->highest_received_byte_offset() + offset_diff)) { 458 // If the final offset violates flow control, close the connection now. 459 if (flow_controller_->FlowControlViolation()) { 460 connection_->SendConnectionClose( 461 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); 462 return; 463 } 464 } 465 466 flow_controller_->AddBytesConsumed(offset_diff); 467 locally_closed_streams_highest_offset_.erase(it); 468} 469 470bool QuicSession::IsEncryptionEstablished() { 471 return GetCryptoStream()->encryption_established(); 472} 473 474bool QuicSession::IsCryptoHandshakeConfirmed() { 475 return GetCryptoStream()->handshake_confirmed(); 476} 477 478void QuicSession::OnConfigNegotiated() { 479 connection_->SetFromConfig(config_); 480 QuicVersion version = connection()->version(); 481 if (version <= QUIC_VERSION_16) { 482 return; 483 } 484 485 if (version <= QUIC_VERSION_19) { 486 // QUIC_VERSION_17,18,19 don't support independent stream/session flow 487 // control windows. 488 if (config_.HasReceivedInitialFlowControlWindowBytes()) { 489 // Streams which were created before the SHLO was received (0-RTT 490 // requests) are now informed of the peer's initial flow control window. 491 uint32 new_window = config_.ReceivedInitialFlowControlWindowBytes(); 492 OnNewStreamFlowControlWindow(new_window); 493 OnNewSessionFlowControlWindow(new_window); 494 } 495 496 return; 497 } 498 499 // QUIC_VERSION_20 and higher can have independent stream and session flow 500 // control windows. 501 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) { 502 // Streams which were created before the SHLO was received (0-RTT 503 // requests) are now informed of the peer's initial flow control window. 504 OnNewStreamFlowControlWindow( 505 config_.ReceivedInitialStreamFlowControlWindowBytes()); 506 } 507 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) { 508 OnNewSessionFlowControlWindow( 509 config_.ReceivedInitialSessionFlowControlWindowBytes()); 510 } 511} 512 513void QuicSession::OnNewStreamFlowControlWindow(uint32 new_window) { 514 if (new_window < kDefaultFlowControlSendWindow) { 515 LOG(ERROR) 516 << "Peer sent us an invalid stream flow control send window: " 517 << new_window << ", below default: " << kDefaultFlowControlSendWindow; 518 if (connection_->connected()) { 519 connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW); 520 } 521 return; 522 } 523 524 // Inform all existing streams about the new window. 525 if (connection_->version() > QUIC_VERSION_20) { 526 GetCryptoStream()->flow_controller()->UpdateSendWindowOffset(new_window); 527 headers_stream_->flow_controller()->UpdateSendWindowOffset(new_window); 528 } 529 for (DataStreamMap::iterator it = stream_map_.begin(); 530 it != stream_map_.end(); ++it) { 531 it->second->flow_controller()->UpdateSendWindowOffset(new_window); 532 } 533} 534 535void QuicSession::OnNewSessionFlowControlWindow(uint32 new_window) { 536 if (new_window < kDefaultFlowControlSendWindow) { 537 LOG(ERROR) 538 << "Peer sent us an invalid session flow control send window: " 539 << new_window << ", below default: " << kDefaultFlowControlSendWindow; 540 if (connection_->connected()) { 541 connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW); 542 } 543 return; 544 } 545 546 flow_controller_->UpdateSendWindowOffset(new_window); 547} 548 549void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 550 switch (event) { 551 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 552 // to QuicSession since it is the glue. 553 case ENCRYPTION_FIRST_ESTABLISHED: 554 break; 555 556 case ENCRYPTION_REESTABLISHED: 557 // Retransmit originally packets that were sent, since they can't be 558 // decrypted by the peer. 559 connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY); 560 break; 561 562 case HANDSHAKE_CONFIRMED: 563 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 564 << "Handshake confirmed without parameter negotiation."; 565 // Discard originally encrypted packets, since they can't be decrypted by 566 // the peer. 567 connection_->NeuterUnencryptedPackets(); 568 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 569 max_open_streams_ = config_.max_streams_per_connection(); 570 break; 571 572 default: 573 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 574 } 575} 576 577void QuicSession::OnCryptoHandshakeMessageSent( 578 const CryptoHandshakeMessage& message) { 579} 580 581void QuicSession::OnCryptoHandshakeMessageReceived( 582 const CryptoHandshakeMessage& message) { 583} 584 585QuicConfig* QuicSession::config() { 586 return &config_; 587} 588 589void QuicSession::ActivateStream(QuicDataStream* stream) { 590 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size() 591 << ". activating " << stream->id(); 592 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 593 stream_map_[stream->id()] = stream; 594} 595 596QuicStreamId QuicSession::GetNextStreamId() { 597 QuicStreamId id = next_stream_id_; 598 next_stream_id_ += 2; 599 return id; 600} 601 602ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 603 if (stream_id == kCryptoStreamId) { 604 return GetCryptoStream(); 605 } 606 if (stream_id == kHeadersStreamId) { 607 return headers_stream_.get(); 608 } 609 return GetDataStream(stream_id); 610} 611 612QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) { 613 if (stream_id == kCryptoStreamId) { 614 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id"; 615 return NULL; 616 } 617 if (stream_id == kHeadersStreamId) { 618 DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id"; 619 return NULL; 620 } 621 622 DataStreamMap::iterator it = stream_map_.find(stream_id); 623 if (it != stream_map_.end()) { 624 return it->second; 625 } 626 627 if (IsClosedStream(stream_id)) { 628 return NULL; 629 } 630 631 if (stream_id % 2 == next_stream_id_ % 2) { 632 // We've received a frame for a locally-created stream that is not 633 // currently active. This is an error. 634 if (connection()->connected()) { 635 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 636 } 637 return NULL; 638 } 639 640 return GetIncomingDataStream(stream_id); 641} 642 643QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) { 644 if (IsClosedStream(stream_id)) { 645 return NULL; 646 } 647 648 implicitly_created_streams_.erase(stream_id); 649 if (stream_id > largest_peer_created_stream_id_) { 650 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 651 // We may already have sent a connection close due to multiple reset 652 // streams in the same packet. 653 if (connection()->connected()) { 654 LOG(ERROR) << "Trying to get stream: " << stream_id 655 << ", largest peer created stream: " 656 << largest_peer_created_stream_id_ 657 << ", max delta: " << kMaxStreamIdDelta; 658 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 659 } 660 return NULL; 661 } 662 if (largest_peer_created_stream_id_ == 0) { 663 if (is_server()) { 664 largest_peer_created_stream_id_= 3; 665 } else { 666 largest_peer_created_stream_id_= 1; 667 } 668 } 669 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 670 id < stream_id; 671 id += 2) { 672 implicitly_created_streams_.insert(id); 673 } 674 largest_peer_created_stream_id_ = stream_id; 675 } 676 QuicDataStream* stream = CreateIncomingDataStream(stream_id); 677 if (stream == NULL) { 678 return NULL; 679 } 680 ActivateStream(stream); 681 return stream; 682} 683 684bool QuicSession::IsClosedStream(QuicStreamId id) { 685 DCHECK_NE(0u, id); 686 if (id == kCryptoStreamId) { 687 return false; 688 } 689 if (id == kHeadersStreamId) { 690 return false; 691 } 692 if (ContainsKey(stream_map_, id)) { 693 // Stream is active 694 return false; 695 } 696 if (id % 2 == next_stream_id_ % 2) { 697 // Locally created streams are strictly in-order. If the id is in the 698 // range of created streams and it's not active, it must have been closed. 699 return id < next_stream_id_; 700 } 701 // For peer created streams, we also need to consider implicitly created 702 // streams. 703 return id <= largest_peer_created_stream_id_ && 704 implicitly_created_streams_.count(id) == 0; 705} 706 707size_t QuicSession::GetNumOpenStreams() const { 708 return stream_map_.size() + implicitly_created_streams_.size(); 709} 710 711void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 712#ifndef NDEBUG 713 ReliableQuicStream* stream = GetStream(id); 714 if (stream != NULL) { 715 LOG_IF(DFATAL, priority != stream->EffectivePriority()) 716 << ENDPOINT << "Stream " << id 717 << "Priorities do not match. Got: " << priority 718 << " Expected: " << stream->EffectivePriority(); 719 } else { 720 LOG(DFATAL) << "Marking unknown stream " << id << " blocked."; 721 } 722#endif 723 724 if (id == kCryptoStreamId) { 725 DCHECK(!has_pending_handshake_); 726 has_pending_handshake_ = true; 727 // TODO(jar): Be sure to use the highest priority for the crypto stream, 728 // perhaps by adding a "special" priority for it that is higher than 729 // kHighestPriority. 730 priority = kHighestPriority; 731 } 732 write_blocked_streams_.PushBack(id, priority); 733} 734 735bool QuicSession::HasDataToWrite() const { 736 return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 737 write_blocked_streams_.HasWriteBlockedDataStreams() || 738 connection_->HasQueuedData(); 739} 740 741bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) const { 742 NOTIMPLEMENTED(); 743 return false; 744} 745 746void QuicSession::PostProcessAfterData() { 747 STLDeleteElements(&closed_streams_); 748 closed_streams_.clear(); 749} 750 751void QuicSession::OnSuccessfulVersionNegotiation(const QuicVersion& version) { 752 if (version < QUIC_VERSION_19) { 753 flow_controller_->Disable(); 754 } 755 756 // Disable stream level flow control based on negotiated version. Streams may 757 // have been created with a different version. 758 if (version <= QUIC_VERSION_20) { 759 GetCryptoStream()->flow_controller()->Disable(); 760 headers_stream_->flow_controller()->Disable(); 761 } 762 for (DataStreamMap::iterator it = stream_map_.begin(); 763 it != stream_map_.end(); ++it) { 764 if (version <= QUIC_VERSION_16) { 765 it->second->flow_controller()->Disable(); 766 } 767 } 768} 769 770} // namespace net 771