1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_session.h"
6
7#include "base/stl_util.h"
8#include "net/quic/crypto/proof_verifier.h"
9#include "net/quic/quic_connection.h"
10#include "net/quic/quic_flags.h"
11#include "net/quic/quic_flow_controller.h"
12#include "net/quic/quic_headers_stream.h"
13#include "net/ssl/ssl_info.h"
14
15using base::StringPiece;
16using base::hash_map;
17using base::hash_set;
18using std::make_pair;
19using std::vector;
20
21namespace net {
22
23#define ENDPOINT (is_server() ? "Server: " : " Client: ")
24
25// We want to make sure we delete any closed streams in a safe manner.
26// To avoid deleting a stream in mid-operation, we have a simple shim between
27// us and the stream, so we can delete any streams when we return from
28// processing.
29//
30// We could just override the base methods, but this makes it easier to make
31// sure we don't miss any.
32class VisitorShim : public QuicConnectionVisitorInterface {
33 public:
34  explicit VisitorShim(QuicSession* session) : session_(session) {}
35
36  virtual void OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
37    session_->OnStreamFrames(frames);
38    session_->PostProcessAfterData();
39  }
40  virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
41    session_->OnRstStream(frame);
42    session_->PostProcessAfterData();
43  }
44
45  virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
46    session_->OnGoAway(frame);
47    session_->PostProcessAfterData();
48  }
49
50  virtual void OnWindowUpdateFrames(const vector<QuicWindowUpdateFrame>& frames)
51      OVERRIDE {
52    session_->OnWindowUpdateFrames(frames);
53    session_->PostProcessAfterData();
54  }
55
56  virtual void OnBlockedFrames(const vector<QuicBlockedFrame>& frames)
57      OVERRIDE {
58    session_->OnBlockedFrames(frames);
59    session_->PostProcessAfterData();
60  }
61
62  virtual void OnCanWrite() OVERRIDE {
63    session_->OnCanWrite();
64    session_->PostProcessAfterData();
65  }
66
67  virtual void OnCongestionWindowChange(QuicTime now) OVERRIDE {
68    session_->OnCongestionWindowChange(now);
69  }
70
71  virtual void OnSuccessfulVersionNegotiation(
72      const QuicVersion& version) OVERRIDE {
73    session_->OnSuccessfulVersionNegotiation(version);
74  }
75
76  virtual void OnConnectionClosed(
77      QuicErrorCode error, bool from_peer) OVERRIDE {
78    session_->OnConnectionClosed(error, from_peer);
79    // The session will go away, so don't bother with cleanup.
80  }
81
82  virtual void OnWriteBlocked() OVERRIDE {
83    session_->OnWriteBlocked();
84  }
85
86  virtual bool WillingAndAbleToWrite() const OVERRIDE {
87    return session_->WillingAndAbleToWrite();
88  }
89
90  virtual bool HasPendingHandshake() const OVERRIDE {
91    return session_->HasPendingHandshake();
92  }
93
94  virtual bool HasOpenDataStreams() const OVERRIDE {
95    return session_->HasOpenDataStreams();
96  }
97
98 private:
99  QuicSession* session_;
100};
101
102QuicSession::QuicSession(QuicConnection* connection, const QuicConfig& config)
103    : connection_(connection),
104      visitor_shim_(new VisitorShim(this)),
105      config_(config),
106      max_open_streams_(config_.max_streams_per_connection()),
107      next_stream_id_(is_server() ? 2 : 5),
108      largest_peer_created_stream_id_(0),
109      error_(QUIC_NO_ERROR),
110      goaway_received_(false),
111      goaway_sent_(false),
112      has_pending_handshake_(false) {
113  if (connection_->version() <= QUIC_VERSION_19) {
114    flow_controller_.reset(new QuicFlowController(
115        connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow,
116        config_.GetInitialFlowControlWindowToSend(),
117        config_.GetInitialFlowControlWindowToSend()));
118  } else {
119    flow_controller_.reset(new QuicFlowController(
120        connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow,
121        config_.GetInitialSessionFlowControlWindowToSend(),
122        config_.GetInitialSessionFlowControlWindowToSend()));
123  }
124}
125
126void QuicSession::InitializeSession() {
127  connection_->set_visitor(visitor_shim_.get());
128  connection_->SetFromConfig(config_);
129  if (connection_->connected()) {
130    connection_->SetOverallConnectionTimeout(
131        config_.max_time_before_crypto_handshake());
132  }
133  headers_stream_.reset(new QuicHeadersStream(this));
134}
135
136QuicSession::~QuicSession() {
137  STLDeleteElements(&closed_streams_);
138  STLDeleteValues(&stream_map_);
139
140  DLOG_IF(WARNING,
141          locally_closed_streams_highest_offset_.size() > max_open_streams_)
142      << "Surprisingly high number of locally closed streams still waiting for "
143         "final byte offset: " << locally_closed_streams_highest_offset_.size();
144}
145
146void QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
147  for (size_t i = 0; i < frames.size(); ++i) {
148    // TODO(rch) deal with the error case of stream id 0.
149    const QuicStreamFrame& frame = frames[i];
150    QuicStreamId stream_id = frame.stream_id;
151    ReliableQuicStream* stream = GetStream(stream_id);
152    if (!stream) {
153      // The stream no longer exists, but we may still be interested in the
154      // final stream byte offset sent by the peer. A frame with a FIN can give
155      // us this offset.
156      if (frame.fin) {
157        QuicStreamOffset final_byte_offset =
158            frame.offset + frame.data.TotalBufferSize();
159        UpdateFlowControlOnFinalReceivedByteOffset(stream_id,
160                                                   final_byte_offset);
161      }
162
163      continue;
164    }
165    stream->OnStreamFrame(frames[i]);
166  }
167}
168
169void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
170                                  StringPiece headers_data) {
171  QuicDataStream* stream = GetDataStream(stream_id);
172  if (!stream) {
173    // It's quite possible to receive headers after a stream has been reset.
174    return;
175  }
176  stream->OnStreamHeaders(headers_data);
177}
178
179void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
180                                          QuicPriority priority) {
181  QuicDataStream* stream = GetDataStream(stream_id);
182  if (!stream) {
183    // It's quite possible to receive headers after a stream has been reset.
184    return;
185  }
186  stream->OnStreamHeadersPriority(priority);
187}
188
189void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
190                                          bool fin,
191                                          size_t frame_len) {
192  QuicDataStream* stream = GetDataStream(stream_id);
193  if (!stream) {
194    // It's quite possible to receive headers after a stream has been reset.
195    return;
196  }
197  stream->OnStreamHeadersComplete(fin, frame_len);
198}
199
200void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
201  if (frame.stream_id == kCryptoStreamId) {
202    connection()->SendConnectionCloseWithDetails(
203        QUIC_INVALID_STREAM_ID,
204        "Attempt to reset the crypto stream");
205    return;
206  }
207  if (frame.stream_id == kHeadersStreamId) {
208    connection()->SendConnectionCloseWithDetails(
209        QUIC_INVALID_STREAM_ID,
210        "Attempt to reset the headers stream");
211    return;
212  }
213
214  QuicDataStream* stream = GetDataStream(frame.stream_id);
215  if (!stream) {
216    // The RST frame contains the final byte offset for the stream: we can now
217    // update the connection level flow controller if needed.
218    UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id,
219                                               frame.byte_offset);
220    return;  // Errors are handled by GetStream.
221  }
222
223  stream->OnStreamReset(frame);
224}
225
226void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
227  DCHECK(frame.last_good_stream_id < next_stream_id_);
228  goaway_received_ = true;
229}
230
231void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
232  DCHECK(!connection_->connected());
233  if (error_ == QUIC_NO_ERROR) {
234    error_ = error;
235  }
236
237  while (!stream_map_.empty()) {
238    DataStreamMap::iterator it = stream_map_.begin();
239    QuicStreamId id = it->first;
240    it->second->OnConnectionClosed(error, from_peer);
241    // The stream should call CloseStream as part of OnConnectionClosed.
242    if (stream_map_.find(id) != stream_map_.end()) {
243      LOG(DFATAL) << ENDPOINT
244                  << "Stream failed to close under OnConnectionClosed";
245      CloseStream(id);
246    }
247  }
248}
249
250void QuicSession::OnWindowUpdateFrames(
251    const vector<QuicWindowUpdateFrame>& frames) {
252  bool connection_window_updated = false;
253  for (size_t i = 0; i < frames.size(); ++i) {
254    // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
255    // assume that it still exists.
256    QuicStreamId stream_id = frames[i].stream_id;
257    if (stream_id == kConnectionLevelId) {
258      // This is a window update that applies to the connection, rather than an
259      // individual stream.
260      DVLOG(1) << ENDPOINT
261               << "Received connection level flow control window update with "
262                  "byte offset: " << frames[i].byte_offset;
263      if (flow_controller_->UpdateSendWindowOffset(frames[i].byte_offset)) {
264        connection_window_updated = true;
265      }
266      continue;
267    }
268
269    if (connection_->version() < QUIC_VERSION_21 &&
270        (stream_id == kCryptoStreamId || stream_id == kHeadersStreamId)) {
271      DLOG(DFATAL) << "WindowUpdate for stream " << stream_id << " in version "
272                   << QuicVersionToString(connection_->version());
273      return;
274    }
275
276    ReliableQuicStream* stream = GetStream(stream_id);
277    if (stream) {
278      stream->OnWindowUpdateFrame(frames[i]);
279    }
280  }
281
282  // Connection level flow control window has increased, so blocked streams can
283  // write again.
284  if (connection_window_updated) {
285    OnCanWrite();
286  }
287}
288
289void QuicSession::OnBlockedFrames(const vector<QuicBlockedFrame>& frames) {
290  for (size_t i = 0; i < frames.size(); ++i) {
291    // TODO(rjshade): Compare our flow control receive windows for specified
292    //                streams: if we have a large window then maybe something
293    //                had gone wrong with the flow control accounting.
294    DVLOG(1) << ENDPOINT << "Received BLOCKED frame with stream id: "
295             << frames[i].stream_id;
296  }
297}
298
299void QuicSession::OnCanWrite() {
300  // We limit the number of writes to the number of pending streams. If more
301  // streams become pending, WillingAndAbleToWrite will be true, which will
302  // cause the connection to request resumption before yielding to other
303  // connections.
304  size_t num_writes = write_blocked_streams_.NumBlockedStreams();
305  if (flow_controller_->IsBlocked()) {
306    // If we are connection level flow control blocked, then only allow the
307    // crypto and headers streams to try writing as all other streams will be
308    // blocked.
309    num_writes = 0;
310    if (write_blocked_streams_.crypto_stream_blocked()) {
311      num_writes += 1;
312    }
313    if (write_blocked_streams_.headers_stream_blocked()) {
314      num_writes += 1;
315    }
316  }
317  if (num_writes == 0) {
318    return;
319  }
320
321  QuicConnection::ScopedPacketBundler ack_bundler(
322      connection_.get(), QuicConnection::NO_ACK);
323  for (size_t i = 0; i < num_writes; ++i) {
324    if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
325          write_blocked_streams_.HasWriteBlockedDataStreams())) {
326      // Writing one stream removed another!? Something's broken.
327      LOG(DFATAL) << "WriteBlockedStream is missing";
328      connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
329      return;
330    }
331    if (!connection_->CanWriteStreamData()) {
332      return;
333    }
334    QuicStreamId stream_id = write_blocked_streams_.PopFront();
335    if (stream_id == kCryptoStreamId) {
336      has_pending_handshake_ = false;  // We just popped it.
337    }
338    ReliableQuicStream* stream = GetStream(stream_id);
339    if (stream != NULL && !stream->flow_controller()->IsBlocked()) {
340      // If the stream can't write all bytes, it'll re-add itself to the blocked
341      // list.
342      stream->OnCanWrite();
343    }
344  }
345}
346
347bool QuicSession::WillingAndAbleToWrite() const {
348  // If the crypto or headers streams are blocked, we want to schedule a write -
349  // they don't get blocked by connection level flow control. Otherwise only
350  // schedule a write if we are not flow control blocked at the connection
351  // level.
352  return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
353         (!flow_controller_->IsBlocked() &&
354          write_blocked_streams_.HasWriteBlockedDataStreams());
355}
356
357bool QuicSession::HasPendingHandshake() const {
358  return has_pending_handshake_;
359}
360
361bool QuicSession::HasOpenDataStreams() const {
362  return GetNumOpenStreams() > 0;
363}
364
365QuicConsumedData QuicSession::WritevData(
366    QuicStreamId id,
367    const IOVector& data,
368    QuicStreamOffset offset,
369    bool fin,
370    FecProtection fec_protection,
371    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
372  return connection_->SendStreamData(id, data, offset, fin, fec_protection,
373                                     ack_notifier_delegate);
374}
375
376size_t QuicSession::WriteHeaders(
377    QuicStreamId id,
378    const SpdyHeaderBlock& headers,
379    bool fin,
380    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
381  return headers_stream_->WriteHeaders(id, headers, fin, ack_notifier_delegate);
382}
383
384void QuicSession::SendRstStream(QuicStreamId id,
385                                QuicRstStreamErrorCode error,
386                                QuicStreamOffset bytes_written) {
387  if (connection()->connected()) {
388    // Only send a RST_STREAM frame if still connected.
389    connection_->SendRstStream(id, error, bytes_written);
390  }
391  CloseStreamInner(id, true);
392}
393
394void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
395  if (goaway_sent_) {
396    return;
397  }
398  goaway_sent_ = true;
399  connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
400}
401
402void QuicSession::CloseStream(QuicStreamId stream_id) {
403  CloseStreamInner(stream_id, false);
404}
405
406void QuicSession::CloseStreamInner(QuicStreamId stream_id,
407                                   bool locally_reset) {
408  DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
409
410  DataStreamMap::iterator it = stream_map_.find(stream_id);
411  if (it == stream_map_.end()) {
412    DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
413    return;
414  }
415  QuicDataStream* stream = it->second;
416
417  // Tell the stream that a RST has been sent.
418  if (locally_reset) {
419    stream->set_rst_sent(true);
420  }
421
422  closed_streams_.push_back(it->second);
423
424  // If we haven't received a FIN or RST for this stream, we need to keep track
425  // of the how many bytes the stream's flow controller believes it has
426  // received, for accurate connection level flow control accounting.
427  if (!stream->HasFinalReceivedByteOffset() &&
428      stream->flow_controller()->IsEnabled()) {
429    locally_closed_streams_highest_offset_[stream_id] =
430        stream->flow_controller()->highest_received_byte_offset();
431  }
432
433  stream_map_.erase(it);
434  stream->OnClose();
435}
436
437void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset(
438    QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
439  map<QuicStreamId, QuicStreamOffset>::iterator it =
440      locally_closed_streams_highest_offset_.find(stream_id);
441  if (it == locally_closed_streams_highest_offset_.end()) {
442    return;
443  }
444
445  DVLOG(1) << ENDPOINT << "Received final byte offset " << final_byte_offset
446           << " for stream " << stream_id;
447  uint64 offset_diff = final_byte_offset - it->second;
448  if (flow_controller_->UpdateHighestReceivedOffset(
449      flow_controller_->highest_received_byte_offset() + offset_diff)) {
450    // If the final offset violates flow control, close the connection now.
451    if (flow_controller_->FlowControlViolation()) {
452      connection_->SendConnectionClose(
453          QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
454      return;
455    }
456  }
457
458  flow_controller_->AddBytesConsumed(offset_diff);
459  locally_closed_streams_highest_offset_.erase(it);
460}
461
462bool QuicSession::IsEncryptionEstablished() {
463  return GetCryptoStream()->encryption_established();
464}
465
466bool QuicSession::IsCryptoHandshakeConfirmed() {
467  return GetCryptoStream()->handshake_confirmed();
468}
469
470void QuicSession::OnConfigNegotiated() {
471  connection_->SetFromConfig(config_);
472  QuicVersion version = connection()->version();
473
474  // A server should accept a small number of additional streams beyond the
475  // limit sent to the client. This helps avoid early connection termination
476  // when FIN/RSTs for old streams are lost or arrive out of order.
477  if (FLAGS_quic_allow_more_open_streams) {
478    set_max_open_streams((is_server() ? kMaxStreamsMultiplier : 1.0) *
479                         config_.max_streams_per_connection());
480  }
481
482  if (version <= QUIC_VERSION_16) {
483    return;
484  }
485
486  if (version <= QUIC_VERSION_19) {
487    // QUIC_VERSION_17,18,19 don't support independent stream/session flow
488    // control windows.
489    if (config_.HasReceivedInitialFlowControlWindowBytes()) {
490      // Streams which were created before the SHLO was received (0-RTT
491      // requests) are now informed of the peer's initial flow control window.
492      uint32 new_window = config_.ReceivedInitialFlowControlWindowBytes();
493      OnNewStreamFlowControlWindow(new_window);
494      OnNewSessionFlowControlWindow(new_window);
495    }
496
497    return;
498  }
499
500  // QUIC_VERSION_21 and higher can have independent stream and session flow
501  // control windows.
502  if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
503    // Streams which were created before the SHLO was received (0-RTT
504    // requests) are now informed of the peer's initial flow control window.
505    OnNewStreamFlowControlWindow(
506        config_.ReceivedInitialStreamFlowControlWindowBytes());
507  }
508  if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
509    OnNewSessionFlowControlWindow(
510        config_.ReceivedInitialSessionFlowControlWindowBytes());
511  }
512}
513
514void QuicSession::OnNewStreamFlowControlWindow(uint32 new_window) {
515  if (new_window < kDefaultFlowControlSendWindow) {
516    LOG(ERROR)
517        << "Peer sent us an invalid stream flow control send window: "
518        << new_window << ", below default: " << kDefaultFlowControlSendWindow;
519    if (connection_->connected()) {
520      connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW);
521    }
522    return;
523  }
524
525  // Inform all existing streams about the new window.
526  if (connection_->version() >= QUIC_VERSION_21) {
527    GetCryptoStream()->UpdateSendWindowOffset(new_window);
528    headers_stream_->UpdateSendWindowOffset(new_window);
529  }
530  for (DataStreamMap::iterator it = stream_map_.begin();
531       it != stream_map_.end(); ++it) {
532    it->second->UpdateSendWindowOffset(new_window);
533  }
534}
535
536void QuicSession::OnNewSessionFlowControlWindow(uint32 new_window) {
537  if (new_window < kDefaultFlowControlSendWindow) {
538    LOG(ERROR)
539        << "Peer sent us an invalid session flow control send window: "
540        << new_window << ", below default: " << kDefaultFlowControlSendWindow;
541    if (connection_->connected()) {
542      connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW);
543    }
544    return;
545  }
546
547  flow_controller_->UpdateSendWindowOffset(new_window);
548}
549
550void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
551  switch (event) {
552    // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
553    // to QuicSession since it is the glue.
554    case ENCRYPTION_FIRST_ESTABLISHED:
555      break;
556
557    case ENCRYPTION_REESTABLISHED:
558      // Retransmit originally packets that were sent, since they can't be
559      // decrypted by the peer.
560      connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
561      break;
562
563    case HANDSHAKE_CONFIRMED:
564      LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
565          << "Handshake confirmed without parameter negotiation.";
566      // Discard originally encrypted packets, since they can't be decrypted by
567      // the peer.
568      connection_->NeuterUnencryptedPackets();
569      connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
570      if (!FLAGS_quic_allow_more_open_streams) {
571        max_open_streams_ = config_.max_streams_per_connection();
572      }
573      break;
574
575    default:
576      LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
577  }
578}
579
580void QuicSession::OnCryptoHandshakeMessageSent(
581    const CryptoHandshakeMessage& message) {
582}
583
584void QuicSession::OnCryptoHandshakeMessageReceived(
585    const CryptoHandshakeMessage& message) {
586}
587
588QuicConfig* QuicSession::config() {
589  return &config_;
590}
591
592void QuicSession::ActivateStream(QuicDataStream* stream) {
593  DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
594           << ". activating " << stream->id();
595  DCHECK_EQ(stream_map_.count(stream->id()), 0u);
596  stream_map_[stream->id()] = stream;
597}
598
599QuicStreamId QuicSession::GetNextStreamId() {
600  QuicStreamId id = next_stream_id_;
601  next_stream_id_ += 2;
602  return id;
603}
604
605ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
606  if (stream_id == kCryptoStreamId) {
607    return GetCryptoStream();
608  }
609  if (stream_id == kHeadersStreamId) {
610    return headers_stream_.get();
611  }
612  return GetDataStream(stream_id);
613}
614
615QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
616  if (stream_id == kCryptoStreamId) {
617    DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
618    return NULL;
619  }
620  if (stream_id == kHeadersStreamId) {
621    DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
622    return NULL;
623  }
624
625  DataStreamMap::iterator it = stream_map_.find(stream_id);
626  if (it != stream_map_.end()) {
627    return it->second;
628  }
629
630  if (IsClosedStream(stream_id)) {
631    return NULL;
632  }
633
634  if (stream_id % 2 == next_stream_id_ % 2) {
635    // We've received a frame for a locally-created stream that is not
636    // currently active.  This is an error.
637    if (connection()->connected()) {
638      connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
639    }
640    return NULL;
641  }
642
643  return GetIncomingDataStream(stream_id);
644}
645
646QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
647  if (IsClosedStream(stream_id)) {
648    return NULL;
649  }
650
651  implicitly_created_streams_.erase(stream_id);
652  if (stream_id > largest_peer_created_stream_id_) {
653    if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
654      // We may already have sent a connection close due to multiple reset
655      // streams in the same packet.
656      if (connection()->connected()) {
657        LOG(ERROR) << "Trying to get stream: " << stream_id
658                   << ", largest peer created stream: "
659                   << largest_peer_created_stream_id_
660                   << ", max delta: " << kMaxStreamIdDelta;
661        connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
662      }
663      return NULL;
664    }
665    if (largest_peer_created_stream_id_ == 0) {
666      if (is_server()) {
667        largest_peer_created_stream_id_= 3;
668      } else {
669        largest_peer_created_stream_id_= 1;
670      }
671    }
672    for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
673         id < stream_id;
674         id += 2) {
675      implicitly_created_streams_.insert(id);
676    }
677    largest_peer_created_stream_id_ = stream_id;
678  }
679  QuicDataStream* stream = CreateIncomingDataStream(stream_id);
680  if (stream == NULL) {
681    return NULL;
682  }
683  ActivateStream(stream);
684  return stream;
685}
686
687void QuicSession::set_max_open_streams(size_t max_open_streams) {
688  DVLOG(1) << "Setting max_open_streams_ to " << max_open_streams;
689  max_open_streams_ = max_open_streams;
690}
691
692bool QuicSession::IsClosedStream(QuicStreamId id) {
693  DCHECK_NE(0u, id);
694  if (id == kCryptoStreamId) {
695    return false;
696  }
697  if (id == kHeadersStreamId) {
698    return false;
699  }
700  if (ContainsKey(stream_map_, id)) {
701    // Stream is active
702    return false;
703  }
704  if (id % 2 == next_stream_id_ % 2) {
705    // Locally created streams are strictly in-order.  If the id is in the
706    // range of created streams and it's not active, it must have been closed.
707    return id < next_stream_id_;
708  }
709  // For peer created streams, we also need to consider implicitly created
710  // streams.
711  return id <= largest_peer_created_stream_id_ &&
712      implicitly_created_streams_.count(id) == 0;
713}
714
715size_t QuicSession::GetNumOpenStreams() const {
716  return stream_map_.size() + implicitly_created_streams_.size();
717}
718
719void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
720#ifndef NDEBUG
721  ReliableQuicStream* stream = GetStream(id);
722  if (stream != NULL) {
723    LOG_IF(DFATAL, priority != stream->EffectivePriority())
724        << ENDPOINT << "Stream " << id
725        << "Priorities do not match.  Got: " << priority
726        << " Expected: " << stream->EffectivePriority();
727  } else {
728    LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
729  }
730#endif
731
732  if (id == kCryptoStreamId) {
733    DCHECK(!has_pending_handshake_);
734    has_pending_handshake_ = true;
735    // TODO(jar): Be sure to use the highest priority for the crypto stream,
736    // perhaps by adding a "special" priority for it that is higher than
737    // kHighestPriority.
738    priority = kHighestPriority;
739  }
740  write_blocked_streams_.PushBack(id, priority);
741}
742
743bool QuicSession::HasDataToWrite() const {
744  return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
745         write_blocked_streams_.HasWriteBlockedDataStreams() ||
746         connection_->HasQueuedData();
747}
748
749bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) const {
750  NOTIMPLEMENTED();
751  return false;
752}
753
754void QuicSession::PostProcessAfterData() {
755  STLDeleteElements(&closed_streams_);
756  closed_streams_.clear();
757
758  if (FLAGS_close_quic_connection_unfinished_streams_2 &&
759      connection()->connected() &&
760      locally_closed_streams_highest_offset_.size() > max_open_streams_) {
761    // A buggy client may fail to send FIN/RSTs. Don't tolerate this.
762    connection_->SendConnectionClose(QUIC_TOO_MANY_UNFINISHED_STREAMS);
763  }
764}
765
766void QuicSession::OnSuccessfulVersionNegotiation(const QuicVersion& version) {
767  if (version < QUIC_VERSION_19) {
768    flow_controller_->Disable();
769  }
770
771  // Disable stream level flow control based on negotiated version. Streams may
772  // have been created with a different version.
773  if (version < QUIC_VERSION_21) {
774    GetCryptoStream()->flow_controller()->Disable();
775    headers_stream_->flow_controller()->Disable();
776  }
777  for (DataStreamMap::iterator it = stream_map_.begin();
778       it != stream_map_.end(); ++it) {
779    if (version <= QUIC_VERSION_16) {
780      it->second->flow_controller()->Disable();
781    }
782  }
783}
784
785}  // namespace net
786