quic_session.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_session.h"
6
7#include "base/stl_util.h"
8#include "net/quic/crypto/proof_verifier.h"
9#include "net/quic/quic_connection.h"
10#include "net/quic/quic_headers_stream.h"
11#include "net/ssl/ssl_info.h"
12
13using base::StringPiece;
14using base::hash_map;
15using base::hash_set;
16using std::make_pair;
17using std::vector;
18
19namespace net {
20
21const size_t kMaxPrematurelyClosedStreamsTracked = 20;
22const size_t kMaxZombieStreams = 20;
23
24#define ENDPOINT (is_server() ? "Server: " : " Client: ")
25
26// We want to make sure we delete any closed streams in a safe manner.
27// To avoid deleting a stream in mid-operation, we have a simple shim between
28// us and the stream, so we can delete any streams when we return from
29// processing.
30//
31// We could just override the base methods, but this makes it easier to make
32// sure we don't miss any.
33class VisitorShim : public QuicConnectionVisitorInterface {
34 public:
35  explicit VisitorShim(QuicSession* session) : session_(session) {}
36
37  virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
38    bool accepted = session_->OnStreamFrames(frames);
39    session_->PostProcessAfterData();
40    return accepted;
41  }
42  virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
43    session_->OnRstStream(frame);
44    session_->PostProcessAfterData();
45  }
46
47  virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
48    session_->OnGoAway(frame);
49    session_->PostProcessAfterData();
50  }
51
52  virtual bool OnCanWrite() OVERRIDE {
53    bool rc = session_->OnCanWrite();
54    session_->PostProcessAfterData();
55    return rc;
56  }
57
58  virtual void OnSuccessfulVersionNegotiation(
59      const QuicVersion& version) OVERRIDE {
60    session_->OnSuccessfulVersionNegotiation(version);
61  }
62
63  virtual void OnConnectionClosed(
64      QuicErrorCode error, bool from_peer) OVERRIDE {
65    session_->OnConnectionClosed(error, from_peer);
66    // The session will go away, so don't bother with cleanup.
67  }
68
69  virtual void OnWriteBlocked() OVERRIDE {
70    session_->OnWriteBlocked();
71  }
72
73  virtual bool HasPendingHandshake() const OVERRIDE {
74    return session_->HasPendingHandshake();
75  }
76
77 private:
78  QuicSession* session_;
79};
80
81QuicSession::QuicSession(QuicConnection* connection,
82                         const QuicConfig& config)
83    : connection_(connection),
84      visitor_shim_(new VisitorShim(this)),
85      config_(config),
86      max_open_streams_(config_.max_streams_per_connection()),
87      next_stream_id_(is_server() ? 2 : 3),
88      largest_peer_created_stream_id_(0),
89      error_(QUIC_NO_ERROR),
90      goaway_received_(false),
91      goaway_sent_(false),
92      has_pending_handshake_(false) {
93
94  connection_->set_visitor(visitor_shim_.get());
95  connection_->SetFromConfig(config_);
96  if (connection_->connected()) {
97    connection_->SetOverallConnectionTimeout(
98        config_.max_time_before_crypto_handshake());
99  }
100  if (connection_->version() > QUIC_VERSION_12) {
101    headers_stream_.reset(new QuicHeadersStream(this));
102    if (!is_server()) {
103      // For version above QUIC v12, the headers stream is stream 3, so the
104      // next available local stream ID should be 5.
105      DCHECK_EQ(kHeadersStreamId, next_stream_id_);
106      next_stream_id_ += 2;
107    }
108  }
109}
110
111QuicSession::~QuicSession() {
112  STLDeleteElements(&closed_streams_);
113  STLDeleteValues(&stream_map_);
114}
115
116bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
117  for (size_t i = 0; i < frames.size(); ++i) {
118    // TODO(rch) deal with the error case of stream id 0
119    if (IsClosedStream(frames[i].stream_id)) {
120      // If we get additional frames for a stream where we didn't process
121      // headers, it's highly likely our compression context will end up
122      // permanently out of sync with the peer's, so we give up and close the
123      // connection.
124      if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
125        connection()->SendConnectionClose(
126            QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
127        return false;
128      }
129      continue;
130    }
131
132    ReliableQuicStream* stream = GetStream(frames[i].stream_id);
133    if (stream == NULL) return false;
134    if (!stream->WillAcceptStreamFrame(frames[i])) return false;
135
136    // TODO(alyssar) check against existing connection address: if changed, make
137    // sure we update the connection.
138  }
139
140  for (size_t i = 0; i < frames.size(); ++i) {
141    QuicStreamId stream_id = frames[i].stream_id;
142    ReliableQuicStream* stream = GetStream(stream_id);
143    if (!stream) {
144      continue;
145    }
146    stream->OnStreamFrame(frames[i]);
147
148    // If the stream is a data stream had been prematurely closed, and the
149    // headers are now decompressed, then we are finally finished
150    // with this stream.
151    if (ContainsKey(zombie_streams_, stream_id) &&
152        static_cast<QuicDataStream*>(stream)->headers_decompressed()) {
153      CloseZombieStream(stream_id);
154    }
155  }
156
157  while (!decompression_blocked_streams_.empty()) {
158    QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
159    if (header_id != decompressor_.current_header_id()) {
160      break;
161    }
162    QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
163    decompression_blocked_streams_.erase(header_id);
164    QuicDataStream* stream = GetDataStream(stream_id);
165    if (!stream) {
166      connection()->SendConnectionClose(
167          QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
168      return false;
169    }
170    stream->OnDecompressorAvailable();
171  }
172  return true;
173}
174
175void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
176                                  StringPiece headers_data) {
177  QuicDataStream* stream = GetDataStream(stream_id);
178  if (!stream) {
179    // It's quite possible to receive headers after a stream has been reset.
180    return;
181  }
182  stream->OnStreamHeaders(headers_data);
183}
184
185void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
186                                          QuicPriority priority) {
187  QuicDataStream* stream = GetDataStream(stream_id);
188  if (!stream) {
189    // It's quite possible to receive headers after a stream has been reset.
190    return;
191  }
192  stream->OnStreamHeadersPriority(priority);
193}
194
195void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
196                                          bool fin,
197                                          size_t frame_len) {
198  QuicDataStream* stream = GetDataStream(stream_id);
199  if (!stream) {
200    // It's quite possible to receive headers after a stream has been reset.
201    return;
202  }
203  stream->OnStreamHeadersComplete(fin, frame_len);
204}
205
206void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
207  if (frame.stream_id == kCryptoStreamId) {
208    connection()->SendConnectionCloseWithDetails(
209        QUIC_INVALID_STREAM_ID,
210        "Attempt to reset the crypto stream");
211    return;
212  }
213  if (frame.stream_id == kHeadersStreamId &&
214      connection()->version() > QUIC_VERSION_12) {
215    connection()->SendConnectionCloseWithDetails(
216        QUIC_INVALID_STREAM_ID,
217        "Attempt to reset the headers stream");
218    return;
219  }
220  QuicDataStream* stream = GetDataStream(frame.stream_id);
221  if (!stream) {
222    return;  // Errors are handled by GetStream.
223  }
224  if (ContainsKey(zombie_streams_, stream->id())) {
225    // If this was a zombie stream then we close it out now.
226    CloseZombieStream(stream->id());
227    // However, since the headers still have not been decompressed, we want to
228    // mark it a prematurely closed so that if we ever receive frames
229    // for this stream we can close the connection.
230    DCHECK(!stream->headers_decompressed());
231    AddPrematurelyClosedStream(frame.stream_id);
232    return;
233  }
234  if (connection()->version() <= QUIC_VERSION_12) {
235    if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) {
236      connection()->SendConnectionClose(
237          QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
238    }
239  }
240  stream->OnStreamReset(frame);
241}
242
243void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
244  DCHECK(frame.last_good_stream_id < next_stream_id_);
245  goaway_received_ = true;
246}
247
248void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
249  DCHECK(!connection_->connected());
250  if (error_ == QUIC_NO_ERROR) {
251    error_ = error;
252  }
253
254  while (!stream_map_.empty()) {
255    DataStreamMap::iterator it = stream_map_.begin();
256    QuicStreamId id = it->first;
257    it->second->OnConnectionClosed(error, from_peer);
258    // The stream should call CloseStream as part of OnConnectionClosed.
259    if (stream_map_.find(id) != stream_map_.end()) {
260      LOG(DFATAL) << ENDPOINT
261                  << "Stream failed to close under OnConnectionClosed";
262      CloseStream(id);
263    }
264  }
265}
266
267bool QuicSession::OnCanWrite() {
268  // We latch this here rather than doing a traditional loop, because streams
269  // may be modifying the list as we loop.
270  int remaining_writes = write_blocked_streams_.NumBlockedStreams();
271
272  while (remaining_writes > 0 && connection_->CanWriteStreamData()) {
273    DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
274    if (!write_blocked_streams_.HasWriteBlockedStreams()) {
275      LOG(DFATAL) << "WriteBlockedStream is missing";
276      connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
277      return true;  // We have no write blocked streams.
278    }
279    QuicStreamId stream_id = write_blocked_streams_.PopFront();
280    if (stream_id == kCryptoStreamId) {
281      has_pending_handshake_ = false;  // We just popped it.
282    }
283    ReliableQuicStream* stream = GetStream(stream_id);
284    if (stream != NULL) {
285      // If the stream can't write all bytes, it'll re-add itself to the blocked
286      // list.
287      stream->OnCanWrite();
288    }
289    --remaining_writes;
290  }
291
292  return !write_blocked_streams_.HasWriteBlockedStreams();
293}
294
295bool QuicSession::HasPendingHandshake() const {
296  return has_pending_handshake_;
297}
298
299QuicConsumedData QuicSession::WritevData(
300    QuicStreamId id,
301    const struct iovec* iov,
302    int iov_count,
303    QuicStreamOffset offset,
304    bool fin,
305    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
306  IOVector data;
307  data.AppendIovec(iov, iov_count);
308  return connection_->SendStreamData(id, data, offset, fin,
309                                     ack_notifier_delegate);
310}
311
312size_t QuicSession::WriteHeaders(QuicStreamId id,
313                               const SpdyHeaderBlock& headers,
314                               bool fin) {
315  DCHECK_LT(QUIC_VERSION_12, connection()->version());
316  if (connection()->version() <= QUIC_VERSION_12) {
317    return 0;
318  }
319  return headers_stream_->WriteHeaders(id, headers, fin);
320}
321
322void QuicSession::SendRstStream(QuicStreamId id,
323                                QuicRstStreamErrorCode error,
324                                QuicStreamOffset bytes_written) {
325  connection_->SendRstStream(id, error, bytes_written);
326  CloseStreamInner(id, true);
327}
328
329void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
330  goaway_sent_ = true;
331  connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
332}
333
334void QuicSession::CloseStream(QuicStreamId stream_id) {
335  CloseStreamInner(stream_id, false);
336}
337
338void QuicSession::CloseStreamInner(QuicStreamId stream_id,
339                                   bool locally_reset) {
340  DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
341
342  DataStreamMap::iterator it = stream_map_.find(stream_id);
343  if (it == stream_map_.end()) {
344    DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
345    return;
346  }
347  QuicDataStream* stream = it->second;
348
349  // Tell the stream that a RST has been sent.
350  if (locally_reset) {
351    stream->set_rst_sent(true);
352  }
353
354  if (connection_->version() <= QUIC_VERSION_12 &&
355      connection_->connected() && !stream->headers_decompressed()) {
356    // If the stream is being closed locally (for example a client cancelling
357    // a request before receiving the response) then we need to make sure that
358    // we keep the stream alive long enough to process any response or
359    // RST_STREAM frames.
360    if (locally_reset && !is_server()) {
361      AddZombieStream(stream_id);
362      return;
363    }
364
365    // This stream has been closed before the headers were decompressed.
366    // This might cause problems with head of line blocking of headers.
367    // If the peer sent headers which were lost but we now close the stream
368    // we will never be able to decompress headers for other streams.
369    // To deal with this, we keep track of streams which have been closed
370    // prematurely.  If we ever receive data frames for this steam, then we
371    // know there actually has been a problem and we close the connection.
372    AddPrematurelyClosedStream(stream->id());
373  }
374  closed_streams_.push_back(it->second);
375  if (ContainsKey(zombie_streams_, stream->id())) {
376    zombie_streams_.erase(stream->id());
377  }
378  stream_map_.erase(it);
379  stream->OnClose();
380}
381
382void QuicSession::AddZombieStream(QuicStreamId stream_id) {
383  if (zombie_streams_.size() == kMaxZombieStreams) {
384    QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
385    CloseZombieStream(oldest_zombie_stream_id);
386    // However, since the headers still have not been decompressed, we want to
387    // mark it a prematurely closed so that if we ever receive frames
388    // for this stream we can close the connection.
389    AddPrematurelyClosedStream(oldest_zombie_stream_id);
390  }
391  zombie_streams_.insert(make_pair(stream_id, true));
392}
393
394void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
395  DCHECK(ContainsKey(zombie_streams_, stream_id));
396  zombie_streams_.erase(stream_id);
397  QuicDataStream* stream = GetDataStream(stream_id);
398  if (!stream) {
399    return;
400  }
401  stream_map_.erase(stream_id);
402  stream->OnClose();
403  closed_streams_.push_back(stream);
404}
405
406void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
407  if (connection()->version() > QUIC_VERSION_12) {
408    return;
409  }
410  if (prematurely_closed_streams_.size() ==
411      kMaxPrematurelyClosedStreamsTracked) {
412    prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
413  }
414  prematurely_closed_streams_.insert(make_pair(stream_id, true));
415}
416
417bool QuicSession::IsEncryptionEstablished() {
418  return GetCryptoStream()->encryption_established();
419}
420
421bool QuicSession::IsCryptoHandshakeConfirmed() {
422  return GetCryptoStream()->handshake_confirmed();
423}
424
425void QuicSession::OnConfigNegotiated() {
426  connection_->SetFromConfig(config_);
427}
428
429void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
430  switch (event) {
431    // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
432    // to QuicSession since it is the glue.
433    case ENCRYPTION_FIRST_ESTABLISHED:
434      break;
435
436    case ENCRYPTION_REESTABLISHED:
437      // Retransmit originally packets that were sent, since they can't be
438      // decrypted by the peer.
439      connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY);
440      break;
441
442    case HANDSHAKE_CONFIRMED:
443      LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
444          << "Handshake confirmed without parameter negotiation.";
445      connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
446      max_open_streams_ = config_.max_streams_per_connection();
447      break;
448
449    default:
450      LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
451  }
452}
453
454void QuicSession::OnCryptoHandshakeMessageSent(
455    const CryptoHandshakeMessage& message) {
456}
457
458void QuicSession::OnCryptoHandshakeMessageReceived(
459    const CryptoHandshakeMessage& message) {
460}
461
462QuicConfig* QuicSession::config() {
463  return &config_;
464}
465
466void QuicSession::ActivateStream(QuicDataStream* stream) {
467  DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
468             << ". activating " << stream->id();
469  DCHECK_EQ(stream_map_.count(stream->id()), 0u);
470  stream_map_[stream->id()] = stream;
471}
472
473QuicStreamId QuicSession::GetNextStreamId() {
474  QuicStreamId id = next_stream_id_;
475  next_stream_id_ += 2;
476  return id;
477}
478
479ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
480  if (stream_id == kCryptoStreamId) {
481    return GetCryptoStream();
482  }
483  if (stream_id == kHeadersStreamId &&
484      connection_->version() > QUIC_VERSION_12) {
485    return headers_stream_.get();
486  }
487  return GetDataStream(stream_id);
488}
489
490QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
491  if (stream_id == kCryptoStreamId) {
492    DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
493    return NULL;
494  }
495  if (stream_id == kHeadersStreamId &&
496      connection_->version() > QUIC_VERSION_12) {
497    DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
498    return NULL;
499  }
500
501  DataStreamMap::iterator it = stream_map_.find(stream_id);
502  if (it != stream_map_.end()) {
503    return it->second;
504  }
505
506  if (IsClosedStream(stream_id)) {
507    return NULL;
508  }
509
510  if (stream_id % 2 == next_stream_id_ % 2) {
511    // We've received a frame for a locally-created stream that is not
512    // currently active.  This is an error.
513    if (connection()->connected()) {
514      connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
515    }
516    return NULL;
517  }
518
519  return GetIncomingDataStream(stream_id);
520}
521
522QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
523  if (IsClosedStream(stream_id)) {
524    return NULL;
525  }
526
527  if (goaway_sent_) {
528    // We've already sent a GoAway
529    SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
530    return NULL;
531  }
532
533  implicitly_created_streams_.erase(stream_id);
534  if (stream_id > largest_peer_created_stream_id_) {
535    // TODO(rch) add unit test for this
536    if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
537      connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
538      return NULL;
539    }
540    if (largest_peer_created_stream_id_ == 0) {
541      if (is_server() && connection()->version() > QUIC_VERSION_12) {
542        largest_peer_created_stream_id_= 3;
543      } else {
544        largest_peer_created_stream_id_= 1;
545      }
546    }
547    for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
548         id < stream_id;
549         id += 2) {
550      implicitly_created_streams_.insert(id);
551    }
552    largest_peer_created_stream_id_ = stream_id;
553  }
554  QuicDataStream* stream = CreateIncomingDataStream(stream_id);
555  if (stream == NULL) {
556    return NULL;
557  }
558  ActivateStream(stream);
559  return stream;
560}
561
562bool QuicSession::IsClosedStream(QuicStreamId id) {
563  DCHECK_NE(0u, id);
564  if (id == kCryptoStreamId) {
565    return false;
566  }
567  if (connection()->version() > QUIC_VERSION_12) {
568    if (id == kHeadersStreamId) {
569      return false;
570    }
571  }
572  if (ContainsKey(zombie_streams_, id)) {
573    return true;
574  }
575  if (ContainsKey(stream_map_, id)) {
576    // Stream is active
577    return false;
578  }
579  if (id % 2 == next_stream_id_ % 2) {
580    // Locally created streams are strictly in-order.  If the id is in the
581    // range of created streams and it's not active, it must have been closed.
582    return id < next_stream_id_;
583  }
584  // For peer created streams, we also need to consider implicitly created
585  // streams.
586  return id <= largest_peer_created_stream_id_ &&
587      implicitly_created_streams_.count(id) == 0;
588}
589
590size_t QuicSession::GetNumOpenStreams() const {
591  return stream_map_.size() + implicitly_created_streams_.size() -
592      zombie_streams_.size();
593}
594
595void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
596#ifndef NDEBUG
597  ReliableQuicStream* stream = GetStream(id);
598  if (stream != NULL) {
599    LOG_IF(DFATAL, priority != stream->EffectivePriority())
600        << "Priorities do not match.  Got: " << priority
601        << " Expected: " << stream->EffectivePriority();
602  } else {
603    LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
604  }
605#endif
606
607  if (id == kCryptoStreamId) {
608    DCHECK(!has_pending_handshake_);
609    has_pending_handshake_ = true;
610    // TODO(jar): Be sure to use the highest priority for the crypto stream,
611    // perhaps by adding a "special" priority for it that is higher than
612    // kHighestPriority.
613    priority = kHighestPriority;
614  }
615  write_blocked_streams_.PushBack(id, priority, connection()->version());
616}
617
618bool QuicSession::HasDataToWrite() const {
619  return write_blocked_streams_.HasWriteBlockedStreams() ||
620      connection_->HasQueuedData();
621}
622
623void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
624                                           QuicStreamId stream_id) {
625  DCHECK_GE(QUIC_VERSION_12, connection()->version());
626  decompression_blocked_streams_[header_id] = stream_id;
627}
628
629bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
630  NOTIMPLEMENTED();
631  return false;
632}
633
634void QuicSession::PostProcessAfterData() {
635  STLDeleteElements(&closed_streams_);
636  closed_streams_.clear();
637}
638
639}  // namespace net
640