quic_session.cc revision 1e9bf3e0803691d0a228da41fc608347b6db4340
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_session.h"
6
7#include "base/stl_util.h"
8#include "net/quic/crypto/proof_verifier.h"
9#include "net/quic/quic_connection.h"
10#include "net/ssl/ssl_info.h"
11
12using base::StringPiece;
13using base::hash_map;
14using base::hash_set;
15using std::make_pair;
16using std::vector;
17
18namespace net {
19
20const size_t kMaxPrematurelyClosedStreamsTracked = 20;
21const size_t kMaxZombieStreams = 20;
22
23#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
24
25// We want to make sure we delete any closed streams in a safe manner.
26// To avoid deleting a stream in mid-operation, we have a simple shim between
27// us and the stream, so we can delete any streams when we return from
28// processing.
29//
30// We could just override the base methods, but this makes it easier to make
31// sure we don't miss any.
32class VisitorShim : public QuicConnectionVisitorInterface {
33 public:
34  explicit VisitorShim(QuicSession* session) : session_(session) {}
35
36  virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
37    bool accepted = session_->OnStreamFrames(frames);
38    session_->PostProcessAfterData();
39    return accepted;
40  }
41  virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
42    session_->OnRstStream(frame);
43    session_->PostProcessAfterData();
44  }
45
46  virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
47    session_->OnGoAway(frame);
48    session_->PostProcessAfterData();
49  }
50
51  virtual bool OnCanWrite() OVERRIDE {
52    bool rc = session_->OnCanWrite();
53    session_->PostProcessAfterData();
54    return rc;
55  }
56
57  virtual void OnSuccessfulVersionNegotiation(
58      const QuicVersion& version) OVERRIDE {
59    session_->OnSuccessfulVersionNegotiation(version);
60  }
61
62  virtual void OnConnectionClosed(QuicErrorCode error,
63                                  bool from_peer) OVERRIDE {
64    session_->OnConnectionClosed(error, from_peer);
65    // The session will go away, so don't bother with cleanup.
66  }
67
68  virtual bool HasPendingHandshake() const OVERRIDE {
69    return session_->HasPendingHandshake();
70  }
71
72 private:
73  QuicSession* session_;
74};
75
76QuicSession::QuicSession(QuicConnection* connection,
77                         const QuicConfig& config,
78                         bool is_server)
79    : connection_(connection),
80      visitor_shim_(new VisitorShim(this)),
81      config_(config),
82      max_open_streams_(config_.max_streams_per_connection()),
83      next_stream_id_(is_server ? 2 : 3),
84      is_server_(is_server),
85      largest_peer_created_stream_id_(0),
86      error_(QUIC_NO_ERROR),
87      goaway_received_(false),
88      goaway_sent_(false),
89      has_pending_handshake_(false) {
90
91  connection_->set_visitor(visitor_shim_.get());
92  connection_->SetIdleNetworkTimeout(config_.idle_connection_state_lifetime());
93  if (connection_->connected()) {
94    connection_->SetOverallConnectionTimeout(
95        config_.max_time_before_crypto_handshake());
96  }
97  // TODO(satyamshekhar): Set congestion control and ICSL also.
98}
99
100QuicSession::~QuicSession() {
101  STLDeleteElements(&closed_streams_);
102  STLDeleteValues(&stream_map_);
103}
104
105bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
106  for (size_t i = 0; i < frames.size(); ++i) {
107    // TODO(rch) deal with the error case of stream id 0
108    if (IsClosedStream(frames[i].stream_id)) {
109      // If we get additional frames for a stream where we didn't process
110      // headers, it's highly likely our compression context will end up
111      // permanently out of sync with the peer's, so we give up and close the
112      // connection.
113      if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
114        connection()->SendConnectionClose(
115            QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
116        return false;
117      }
118      continue;
119    }
120
121    ReliableQuicStream* stream = GetStream(frames[i].stream_id);
122    if (stream == NULL) return false;
123    if (!stream->WillAcceptStreamFrame(frames[i])) return false;
124
125    // TODO(alyssar) check against existing connection address: if changed, make
126    // sure we update the connection.
127  }
128
129  for (size_t i = 0; i < frames.size(); ++i) {
130    QuicStreamId stream_id = frames[i].stream_id;
131    ReliableQuicStream* stream = GetStream(stream_id);
132    if (!stream) {
133      continue;
134    }
135    stream->OnStreamFrame(frames[i]);
136
137    // If the stream had been prematurely closed, and the
138    // headers are now decompressed, then we are finally finished
139    // with this stream.
140    if (ContainsKey(zombie_streams_, stream_id) &&
141        stream->headers_decompressed()) {
142      CloseZombieStream(stream_id);
143    }
144  }
145
146  while (!decompression_blocked_streams_.empty()) {
147    QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
148    if (header_id != decompressor_.current_header_id()) {
149      break;
150    }
151    QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
152    decompression_blocked_streams_.erase(header_id);
153    ReliableQuicStream* stream = GetStream(stream_id);
154    if (!stream) {
155      connection()->SendConnectionClose(
156          QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
157      return false;
158    }
159    stream->OnDecompressorAvailable();
160  }
161  return true;
162}
163
164void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
165  ReliableQuicStream* stream = GetStream(frame.stream_id);
166  if (!stream) {
167    return;  // Errors are handled by GetStream.
168  }
169  if (ContainsKey(zombie_streams_, stream->id())) {
170    // If this was a zombie stream then we close it out now.
171    CloseZombieStream(stream->id());
172    // However, since the headers still have not been decompressed, we want to
173    // mark it a prematurely closed so that if we ever receive frames
174    // for this stream we can close the connection.
175    DCHECK(!stream->headers_decompressed());
176    AddPrematurelyClosedStream(frame.stream_id);
177    return;
178  }
179  stream->OnStreamReset(frame.error_code);
180}
181
182void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
183  DCHECK(frame.last_good_stream_id < next_stream_id_);
184  goaway_received_ = true;
185}
186
187void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
188  DCHECK(!connection_->connected());
189  if (error_ == QUIC_NO_ERROR) {
190    error_ = error;
191  }
192
193  while (stream_map_.size() != 0) {
194    ReliableStreamMap::iterator it = stream_map_.begin();
195    QuicStreamId id = it->first;
196    it->second->OnConnectionClosed(error, from_peer);
197    // The stream should call CloseStream as part of OnConnectionClosed.
198    if (stream_map_.find(id) != stream_map_.end()) {
199      LOG(DFATAL) << ENDPOINT
200                  << "Stream failed to close under OnConnectionClosed";
201      CloseStream(id);
202    }
203  }
204}
205
206bool QuicSession::OnCanWrite() {
207  // We latch this here rather than doing a traditional loop, because streams
208  // may be modifying the list as we loop.
209  int remaining_writes = write_blocked_streams_.NumBlockedStreams();
210
211  while (!connection_->HasQueuedData() &&
212         remaining_writes > 0) {
213    DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
214    int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList();
215    if (index == -1) {
216      LOG(DFATAL) << "WriteBlockedStream is missing";
217      connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
218      return true;  // We have no write blocked streams.
219    }
220    QuicStreamId stream_id = write_blocked_streams_.PopFront(index);
221    if (stream_id == kCryptoStreamId) {
222      has_pending_handshake_ = false;  // We just popped it.
223    }
224    ReliableQuicStream* stream = GetStream(stream_id);
225    if (stream != NULL) {
226      // If the stream can't write all bytes, it'll re-add itself to the blocked
227      // list.
228      stream->OnCanWrite();
229    }
230    --remaining_writes;
231  }
232
233  return !write_blocked_streams_.HasWriteBlockedStreams();
234}
235
236bool QuicSession::HasPendingHandshake() const {
237  return has_pending_handshake_;
238}
239
240QuicConsumedData QuicSession::WritevData(QuicStreamId id,
241                                         const struct iovec* iov,
242                                         int iov_count,
243                                         QuicStreamOffset offset,
244                                         bool fin) {
245  IOVector data;
246  data.AppendIovec(iov, iov_count);
247  return connection_->SendStreamData(id, data, offset, fin);
248}
249
250void QuicSession::SendRstStream(QuicStreamId id,
251                                QuicRstStreamErrorCode error) {
252  connection_->SendRstStream(id, error);
253  CloseStreamInner(id, true);
254}
255
256void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
257  goaway_sent_ = true;
258  connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
259}
260
261void QuicSession::CloseStream(QuicStreamId stream_id) {
262  CloseStreamInner(stream_id, false);
263}
264
265void QuicSession::CloseStreamInner(QuicStreamId stream_id,
266                                   bool locally_reset) {
267  DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
268
269  ReliableStreamMap::iterator it = stream_map_.find(stream_id);
270  if (it == stream_map_.end()) {
271    DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
272    return;
273  }
274  ReliableQuicStream* stream = it->second;
275  if (connection_->connected() && !stream->headers_decompressed()) {
276    // If the stream is being closed locally (for example a client cancelling
277    // a request before receiving the response) then we need to make sure that
278    // we keep the stream alive long enough to process any response or
279    // RST_STREAM frames.
280    if (locally_reset && !is_server_) {
281      AddZombieStream(stream_id);
282      return;
283    }
284
285    // This stream has been closed before the headers were decompressed.
286    // This might cause problems with head of line blocking of headers.
287    // If the peer sent headers which were lost but we now close the stream
288    // we will never be able to decompress headers for other streams.
289    // To deal with this, we keep track of streams which have been closed
290    // prematurely.  If we ever receive data frames for this steam, then we
291    // know there actually has been a problem and we close the connection.
292    AddPrematurelyClosedStream(stream->id());
293  }
294  closed_streams_.push_back(it->second);
295  if (ContainsKey(zombie_streams_, stream->id())) {
296    zombie_streams_.erase(stream->id());
297  }
298  stream_map_.erase(it);
299  stream->OnClose();
300}
301
302void QuicSession::AddZombieStream(QuicStreamId stream_id) {
303  if (zombie_streams_.size() == kMaxZombieStreams) {
304    QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
305    CloseZombieStream(oldest_zombie_stream_id);
306    // However, since the headers still have not been decompressed, we want to
307    // mark it a prematurely closed so that if we ever receive frames
308    // for this stream we can close the connection.
309    AddPrematurelyClosedStream(oldest_zombie_stream_id);
310  }
311  zombie_streams_.insert(make_pair(stream_id, true));
312}
313
314void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
315  DCHECK(ContainsKey(zombie_streams_, stream_id));
316  zombie_streams_.erase(stream_id);
317  ReliableQuicStream* stream = GetStream(stream_id);
318  if (!stream) {
319    return;
320  }
321  stream_map_.erase(stream_id);
322  stream->OnClose();
323  closed_streams_.push_back(stream);
324}
325
326void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
327  if (prematurely_closed_streams_.size() ==
328      kMaxPrematurelyClosedStreamsTracked) {
329    prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
330  }
331  prematurely_closed_streams_.insert(make_pair(stream_id, true));
332}
333
334bool QuicSession::IsEncryptionEstablished() {
335  return GetCryptoStream()->encryption_established();
336}
337
338bool QuicSession::IsCryptoHandshakeConfirmed() {
339  return GetCryptoStream()->handshake_confirmed();
340}
341
342void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
343  switch (event) {
344    // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
345    // to QuicSession since it is the glue.
346    case ENCRYPTION_FIRST_ESTABLISHED:
347      break;
348
349    case ENCRYPTION_REESTABLISHED:
350      // Retransmit originally packets that were sent, since they can't be
351      // decrypted by the peer.
352      connection_->RetransmitUnackedPackets(
353          QuicConnection::INITIAL_ENCRYPTION_ONLY);
354      break;
355
356    case HANDSHAKE_CONFIRMED:
357      LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
358          << "Handshake confirmed without parameter negotiation.";
359      connection_->SetIdleNetworkTimeout(
360          config_.idle_connection_state_lifetime());
361      connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
362      max_open_streams_ = config_.max_streams_per_connection();
363      break;
364
365    default:
366      LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
367  }
368}
369
370void QuicSession::OnCryptoHandshakeMessageSent(
371    const CryptoHandshakeMessage& message) {
372}
373
374void QuicSession::OnCryptoHandshakeMessageReceived(
375    const CryptoHandshakeMessage& message) {
376}
377
378QuicConfig* QuicSession::config() {
379  return &config_;
380}
381
382void QuicSession::ActivateStream(ReliableQuicStream* stream) {
383  DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size()
384             << ". activating " << stream->id();
385  DCHECK_EQ(stream_map_.count(stream->id()), 0u);
386  stream_map_[stream->id()] = stream;
387}
388
389QuicStreamId QuicSession::GetNextStreamId() {
390  QuicStreamId id = next_stream_id_;
391  next_stream_id_ += 2;
392  return id;
393}
394
395ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
396  if (stream_id == kCryptoStreamId) {
397    return GetCryptoStream();
398  }
399
400  ReliableStreamMap::iterator it = stream_map_.find(stream_id);
401  if (it != stream_map_.end()) {
402    return it->second;
403  }
404
405  if (IsClosedStream(stream_id)) {
406    return NULL;
407  }
408
409  if (stream_id % 2 == next_stream_id_ % 2) {
410    // We've received a frame for a locally-created stream that is not
411    // currently active.  This is an error.
412    connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
413    return NULL;
414  }
415
416  return GetIncomingReliableStream(stream_id);
417}
418
419ReliableQuicStream* QuicSession::GetIncomingReliableStream(
420    QuicStreamId stream_id) {
421  if (IsClosedStream(stream_id)) {
422    return NULL;
423  }
424
425  if (goaway_sent_) {
426    // We've already sent a GoAway
427    SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
428    return NULL;
429  }
430
431  implicitly_created_streams_.erase(stream_id);
432  if (stream_id > largest_peer_created_stream_id_) {
433    // TODO(rch) add unit test for this
434    if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
435      connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
436      return NULL;
437    }
438    if (largest_peer_created_stream_id_ == 0) {
439      largest_peer_created_stream_id_= 1;
440    }
441    for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
442         id < stream_id;
443         id += 2) {
444      implicitly_created_streams_.insert(id);
445    }
446    largest_peer_created_stream_id_ = stream_id;
447  }
448  ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id);
449  if (stream == NULL) {
450    return NULL;
451  }
452  ActivateStream(stream);
453  return stream;
454}
455
456bool QuicSession::IsClosedStream(QuicStreamId id) {
457  DCHECK_NE(0u, id);
458  if (id == kCryptoStreamId) {
459    return false;
460  }
461  if (ContainsKey(zombie_streams_, id)) {
462    return true;
463  }
464  if (ContainsKey(stream_map_, id)) {
465    // Stream is active
466    return false;
467  }
468  if (id % 2 == next_stream_id_ % 2) {
469    // Locally created streams are strictly in-order.  If the id is in the
470    // range of created streams and it's not active, it must have been closed.
471    return id < next_stream_id_;
472  }
473  // For peer created streams, we also need to consider implicitly created
474  // streams.
475  return id <= largest_peer_created_stream_id_ &&
476      implicitly_created_streams_.count(id) == 0;
477}
478
479size_t QuicSession::GetNumOpenStreams() const {
480  return stream_map_.size() + implicitly_created_streams_.size() -
481      zombie_streams_.size();
482}
483
484void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
485  if (id == kCryptoStreamId) {
486    DCHECK(!has_pending_handshake_);
487    has_pending_handshake_ = true;
488    // TODO(jar): Be sure to use the highest priority for the crypto stream,
489    // perhaps by adding a "special" priority for it that is higher than
490    // kHighestPriority.
491    priority = kHighestPriority;
492  }
493  write_blocked_streams_.PushBack(id, priority);
494}
495
496void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
497                                           QuicStreamId stream_id) {
498  decompression_blocked_streams_[header_id] = stream_id;
499}
500
501bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
502  NOTIMPLEMENTED();
503  return false;
504}
505
506void QuicSession::PostProcessAfterData() {
507  STLDeleteElements(&closed_streams_);
508  closed_streams_.clear();
509}
510
511}  // namespace net
512