reliable_quic_stream.cc revision 0f1bc08d4cfcc34181b0b5cbf065c40f687bf740
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9#include "net/spdy/write_blocked_list.h"
10
11using base::StringPiece;
12using std::min;
13
14namespace net {
15
16namespace {
17
18// This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
19// to set a priority client-side, or cancel a stream before stripping the
20// priority from the wire server-side.  In either case, start out with a
21// priority in the middle.
22QuicPriority kDefaultPriority = 3;
23
24// Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
25// reaches 4 bytes, copies the data into 'result' and clears
26// partial_data_buffer.
27// Returns the number of bytes consumed.
28uint32 StripUint32(const char* data, uint32 data_len,
29                   string* partial_data_buffer,
30                   uint32* result) {
31  DCHECK_GT(4u, partial_data_buffer->length());
32  size_t missing_size = 4 - partial_data_buffer->length();
33  if (data_len < missing_size) {
34    StringPiece(data, data_len).AppendToString(partial_data_buffer);
35    return data_len;
36  }
37  StringPiece(data, missing_size).AppendToString(partial_data_buffer);
38  DCHECK_EQ(4u, partial_data_buffer->length());
39  memcpy(result, partial_data_buffer->data(), 4);
40  partial_data_buffer->clear();
41  return missing_size;
42}
43
44}  // namespace
45
46ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
47                                       QuicSession* session)
48    : sequencer_(this),
49      id_(id),
50      session_(session),
51      visitor_(NULL),
52      stream_bytes_read_(0),
53      stream_bytes_written_(0),
54      headers_decompressed_(false),
55      priority_(kDefaultPriority),
56      headers_id_(0),
57      decompression_failed_(false),
58      stream_error_(QUIC_STREAM_NO_ERROR),
59      connection_error_(QUIC_NO_ERROR),
60      read_side_closed_(false),
61      write_side_closed_(false),
62      priority_parsed_(false),
63      fin_buffered_(false),
64      fin_sent_(false),
65      is_server_(session_->is_server()) {
66}
67
68ReliableQuicStream::~ReliableQuicStream() {
69}
70
71bool ReliableQuicStream::WillAcceptStreamFrame(
72    const QuicStreamFrame& frame) const {
73  if (read_side_closed_) {
74    return true;
75  }
76  if (frame.stream_id != id_) {
77    LOG(ERROR) << "Error!";
78    return false;
79  }
80  return sequencer_.WillAcceptStreamFrame(frame);
81}
82
83bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
84  DCHECK_EQ(frame.stream_id, id_);
85  if (read_side_closed_) {
86    DLOG(INFO) << ENDPOINT << "Ignoring frame " << frame.stream_id;
87    // We don't want to be reading: blackhole the data.
88    return true;
89  }
90  // Note: This count include duplicate data received.
91  stream_bytes_read_ += frame.data.length();
92
93  bool accepted = sequencer_.OnStreamFrame(frame);
94
95  return accepted;
96}
97
98void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
99  stream_error_ = error;
100  TerminateFromPeer(false);  // Full close.
101}
102
103void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
104                                            bool from_peer) {
105  if (read_side_closed_ && write_side_closed_) {
106    return;
107  }
108  if (error != QUIC_NO_ERROR) {
109    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
110    connection_error_ = error;
111  }
112
113  if (from_peer) {
114    TerminateFromPeer(false);
115  } else {
116    CloseWriteSide();
117    CloseReadSide();
118  }
119}
120
121void ReliableQuicStream::TerminateFromPeer(bool half_close) {
122  if (!half_close) {
123    CloseWriteSide();
124  }
125  CloseReadSide();
126}
127
128void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
129  stream_error_ = error;
130  if (error != QUIC_STREAM_NO_ERROR)  {
131    // Sending a RstStream results in calling CloseStream.
132    session()->SendRstStream(id(), error);
133  } else {
134    session_->CloseStream(id());
135  }
136}
137
138void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
139  session()->connection()->SendConnectionClose(error);
140}
141
142void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
143                                                    const string& details) {
144  session()->connection()->SendConnectionCloseWithDetails(error, details);
145}
146
147size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
148  if (headers_decompressed_ && decompressed_headers_.empty()) {
149    return sequencer_.Readv(iov, iov_len);
150  }
151  size_t bytes_consumed = 0;
152  size_t iov_index = 0;
153  while (iov_index < iov_len &&
154         decompressed_headers_.length() > bytes_consumed) {
155    size_t bytes_to_read = min(iov[iov_index].iov_len,
156                               decompressed_headers_.length() - bytes_consumed);
157    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
158    memcpy(iov_ptr,
159           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
160    bytes_consumed += bytes_to_read;
161    ++iov_index;
162  }
163  decompressed_headers_.erase(0, bytes_consumed);
164  return bytes_consumed;
165}
166
167int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
168  if (headers_decompressed_ && decompressed_headers_.empty()) {
169    return sequencer_.GetReadableRegions(iov, iov_len);
170  }
171  if (iov_len == 0) {
172    return 0;
173  }
174  iov[0].iov_base = static_cast<void*>(
175      const_cast<char*>(decompressed_headers_.data()));
176  iov[0].iov_len = decompressed_headers_.length();
177  return 1;
178}
179
180bool ReliableQuicStream::IsHalfClosed() const {
181  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
182    return false;
183  }
184  return sequencer_.IsHalfClosed();
185}
186
187bool ReliableQuicStream::HasBytesToRead() const {
188  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
189}
190
191const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
192  return session_->peer_address();
193}
194
195QuicSpdyCompressor* ReliableQuicStream::compressor() {
196  return session_->compressor();
197}
198
199bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
200  return session_->GetSSLInfo(ssl_info);
201}
202
203QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
204  DCHECK(data.size() > 0 || fin);
205  return WriteOrBuffer(data, fin);
206}
207
208
209void ReliableQuicStream::set_priority(QuicPriority priority) {
210  DCHECK_EQ(0u, stream_bytes_written_);
211  priority_ = priority;
212}
213
214QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
215  DCHECK(!fin_buffered_);
216
217  QuicConsumedData consumed_data(0, false);
218  fin_buffered_ = fin;
219
220  if (queued_data_.empty()) {
221    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
222    DCHECK_LE(consumed_data.bytes_consumed, data.length());
223  }
224
225  // If there's unconsumed data or an unconsumed fin, queue it.
226  if (consumed_data.bytes_consumed < data.length() ||
227      (fin && !consumed_data.fin_consumed)) {
228    queued_data_.push_back(
229        string(data.data() + consumed_data.bytes_consumed,
230               data.length() - consumed_data.bytes_consumed));
231  }
232
233  return QuicConsumedData(data.size(), true);
234}
235
236void ReliableQuicStream::OnCanWrite() {
237  bool fin = false;
238  while (!queued_data_.empty()) {
239    const string& data = queued_data_.front();
240    if (queued_data_.size() == 1 && fin_buffered_) {
241      fin = true;
242    }
243    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
244    if (consumed_data.bytes_consumed == data.size() &&
245        fin == consumed_data.fin_consumed) {
246      queued_data_.pop_front();
247    } else {
248      queued_data_.front().erase(0, consumed_data.bytes_consumed);
249      break;
250    }
251  }
252}
253
254QuicConsumedData ReliableQuicStream::WriteDataInternal(
255    StringPiece data, bool fin) {
256  struct iovec iov = {const_cast<char*>(data.data()),
257                      static_cast<size_t>(data.size())};
258  return WritevDataInternal(&iov, 1, fin);
259}
260
261QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov,
262                                                        int iov_count,
263                                                        bool fin) {
264  if (write_side_closed_) {
265    DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
266    return QuicConsumedData(0, false);
267  }
268
269  size_t write_length = 0u;
270  for (int i = 0; i < iov_count; ++i) {
271    write_length += iov[i].iov_len;
272  }
273  QuicConsumedData consumed_data =
274      session()->WritevData(id(), iov, iov_count, stream_bytes_written_, fin);
275  stream_bytes_written_ += consumed_data.bytes_consumed;
276  if (consumed_data.bytes_consumed == write_length) {
277    if (fin && consumed_data.fin_consumed) {
278      fin_sent_ = true;
279      CloseWriteSide();
280    } else if (fin && !consumed_data.fin_consumed) {
281      session_->MarkWriteBlocked(id(), EffectivePriority());
282    }
283  } else {
284    session_->MarkWriteBlocked(id(), EffectivePriority());
285  }
286  return consumed_data;
287}
288
289QuicPriority ReliableQuicStream::EffectivePriority() const {
290  return priority();
291}
292
293void ReliableQuicStream::CloseReadSide() {
294  if (read_side_closed_) {
295    return;
296  }
297  DLOG(INFO) << ENDPOINT << "Done reading from stream " << id();
298
299  read_side_closed_ = true;
300  if (write_side_closed_) {
301    DLOG(INFO) << ENDPOINT << "Closing stream: " << id();
302    session_->CloseStream(id());
303  }
304}
305
306uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
307  DCHECK_NE(0u, data_len);
308  if (id() == kCryptoStreamId) {
309    // The crypto stream does not use compression.
310    return ProcessData(data, data_len);
311  }
312
313  uint32 total_bytes_consumed = 0;
314  if (headers_id_ == 0u) {
315    total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
316    data += total_bytes_consumed;
317    data_len -= total_bytes_consumed;
318    if (data_len == 0 || !session_->connection()->connected()) {
319      return total_bytes_consumed;
320    }
321  }
322  DCHECK_NE(0u, headers_id_);
323
324  // Once the headers are finished, we simply pass the data through.
325  if (headers_decompressed_) {
326    // Some buffered header data remains.
327    if (!decompressed_headers_.empty()) {
328      ProcessHeaderData();
329    }
330    if (decompressed_headers_.empty()) {
331      DVLOG(1) << "Delegating procesing to ProcessData";
332      total_bytes_consumed += ProcessData(data, data_len);
333    }
334    return total_bytes_consumed;
335  }
336
337  QuicHeaderId current_header_id =
338      session_->decompressor()->current_header_id();
339  // Ensure that this header id looks sane.
340  if (headers_id_ < current_header_id ||
341      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
342    DVLOG(1) << ENDPOINT
343             << "Invalid headers for stream: " << id()
344             << " header_id: " << headers_id_
345             << " current_header_id: " << current_header_id;
346    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
347    return total_bytes_consumed;
348  }
349
350  // If we are head-of-line blocked on decompression, then back up.
351  if (current_header_id != headers_id_) {
352    session_->MarkDecompressionBlocked(headers_id_, id());
353    DVLOG(1) << ENDPOINT
354             << "Unable to decompress header data for stream: " << id()
355             << " header_id: " << headers_id_;
356    return total_bytes_consumed;
357  }
358
359  // Decompressed data will be delivered to decompressed_headers_.
360  size_t bytes_consumed = session_->decompressor()->DecompressData(
361      StringPiece(data, data_len), this);
362  DCHECK_NE(0u, bytes_consumed);
363  if (bytes_consumed > data_len) {
364    DCHECK(false) << "DecompressData returned illegal value";
365    OnDecompressionError();
366    return total_bytes_consumed;
367  }
368  total_bytes_consumed += bytes_consumed;
369  data += bytes_consumed;
370  data_len -= bytes_consumed;
371
372  if (decompression_failed_) {
373    // The session will have been closed in OnDecompressionError.
374    return total_bytes_consumed;
375  }
376
377  // Headers are complete if the decompressor has moved on to the
378  // next stream.
379  headers_decompressed_ =
380      session_->decompressor()->current_header_id() != headers_id_;
381  if (!headers_decompressed_) {
382    DCHECK_EQ(0u, data_len);
383  }
384
385  ProcessHeaderData();
386
387  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
388    return total_bytes_consumed;
389  }
390
391  // We have processed all of the decompressed data but we might
392  // have some more raw data to process.
393  if (data_len > 0) {
394    total_bytes_consumed += ProcessData(data, data_len);
395  }
396
397  // The sequencer will push any additional buffered frames if this data
398  // has been completely consumed.
399  return total_bytes_consumed;
400}
401
402uint32 ReliableQuicStream::ProcessHeaderData() {
403  if (decompressed_headers_.empty()) {
404    return 0;
405  }
406
407  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
408                                       decompressed_headers_.length());
409  if (bytes_processed == decompressed_headers_.length()) {
410    decompressed_headers_.clear();
411  } else {
412    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
413  }
414  return bytes_processed;
415}
416
417void ReliableQuicStream::OnDecompressorAvailable() {
418  DCHECK_EQ(headers_id_,
419            session_->decompressor()->current_header_id());
420  DCHECK(!headers_decompressed_);
421  DCHECK(!decompression_failed_);
422  DCHECK_EQ(0u, decompressed_headers_.length());
423
424  while (!headers_decompressed_) {
425    struct iovec iovec;
426    if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
427      return;
428    }
429
430    size_t bytes_consumed = session_->decompressor()->DecompressData(
431        StringPiece(static_cast<char*>(iovec.iov_base),
432                    iovec.iov_len),
433        this);
434    DCHECK_LE(bytes_consumed, iovec.iov_len);
435    if (decompression_failed_) {
436      return;
437    }
438    sequencer_.MarkConsumed(bytes_consumed);
439
440    headers_decompressed_ =
441        session_->decompressor()->current_header_id() != headers_id_;
442  }
443
444  // Either the headers are complete, or the all data as been consumed.
445  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
446  if (IsHalfClosed()) {
447    TerminateFromPeer(true);
448  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
449    sequencer_.FlushBufferedFrames();
450  }
451}
452
453bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
454  data.AppendToString(&decompressed_headers_);
455  return true;
456}
457
458void ReliableQuicStream::OnDecompressionError() {
459  DCHECK(!decompression_failed_);
460  decompression_failed_ = true;
461  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
462}
463
464
465void ReliableQuicStream::CloseWriteSide() {
466  if (write_side_closed_) {
467    return;
468  }
469  DLOG(INFO) << ENDPOINT << "Done writing to stream " << id();
470
471  write_side_closed_ = true;
472  if (read_side_closed_) {
473    DLOG(INFO) << ENDPOINT << "Closing stream: " << id();
474    session_->CloseStream(id());
475  }
476}
477
478bool ReliableQuicStream::HasBufferedData() {
479  return !queued_data_.empty();
480}
481
482void ReliableQuicStream::OnClose() {
483  CloseReadSide();
484  CloseWriteSide();
485
486  if (visitor_) {
487    Visitor* visitor = visitor_;
488    // Calling Visitor::OnClose() may result the destruction of the visitor,
489    // so we need to ensure we don't call it again.
490    visitor_ = NULL;
491    visitor->OnClose(this);
492  }
493}
494
495uint32 ReliableQuicStream::StripPriorityAndHeaderId(
496    const char* data, uint32 data_len) {
497  uint32 total_bytes_parsed = 0;
498
499  if (!priority_parsed_ && session_->connection()->is_server()) {
500    QuicPriority temporary_priority = priority_;
501    total_bytes_parsed = StripUint32(
502        data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
503    if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
504      priority_parsed_ = true;
505
506      // Spdy priorities are inverted, so the highest numerical value is the
507      // lowest legal priority.
508      if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
509        session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
510        return 0;
511      }
512      priority_ = temporary_priority;
513    }
514    data += total_bytes_parsed;
515    data_len -= total_bytes_parsed;
516  }
517  if (data_len > 0 && headers_id_ == 0u) {
518    // The headers ID has not yet been read.  Strip it from the beginning of
519    // the data stream.
520    total_bytes_parsed += StripUint32(
521        data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
522  }
523  return total_bytes_parsed;
524}
525
526}  // namespace net
527