reliable_quic_stream.cc revision 58537e28ecd584eab876aee8be7156509866d23a
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9
10using base::StringPiece;
11using std::min;
12
13namespace net {
14
15namespace {
16
17// This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
18// to set a priority client-side, or cancel a stream before stripping the
19// priority from the wire server-side.  In either case, start out with a
20// priority in the middle.
21QuicPriority kDefaultPriority = 3;
22
23// Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
24// reaches 4 bytes, copies the data into 'result' and clears
25// partial_data_buffer.
26// Returns the number of bytes consumed.
27uint32 StripUint32(const char* data, uint32 data_len,
28                   string* partial_data_buffer,
29                   uint32* result) {
30  DCHECK_GT(4u, partial_data_buffer->length());
31  size_t missing_size = 4 - partial_data_buffer->length();
32  if (data_len < missing_size) {
33    StringPiece(data, data_len).AppendToString(partial_data_buffer);
34    return data_len;
35  }
36  StringPiece(data, missing_size).AppendToString(partial_data_buffer);
37  DCHECK_EQ(4u, partial_data_buffer->length());
38  memcpy(result, partial_data_buffer->data(), 4);
39  partial_data_buffer->clear();
40  return missing_size;
41}
42
43}  // namespace
44
45ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
46                                       QuicSession* session)
47    : sequencer_(this),
48      id_(id),
49      session_(session),
50      visitor_(NULL),
51      stream_bytes_read_(0),
52      stream_bytes_written_(0),
53      headers_decompressed_(false),
54      priority_(kDefaultPriority),
55      headers_id_(0),
56      decompression_failed_(false),
57      stream_error_(QUIC_STREAM_NO_ERROR),
58      connection_error_(QUIC_NO_ERROR),
59      read_side_closed_(false),
60      write_side_closed_(false),
61      priority_parsed_(false),
62      fin_buffered_(false),
63      fin_sent_(false) {
64}
65
66ReliableQuicStream::~ReliableQuicStream() {
67}
68
69bool ReliableQuicStream::WillAcceptStreamFrame(
70    const QuicStreamFrame& frame) const {
71  if (read_side_closed_) {
72    return true;
73  }
74  if (frame.stream_id != id_) {
75    LOG(ERROR) << "Error!";
76    return false;
77  }
78  return sequencer_.WillAcceptStreamFrame(frame);
79}
80
81bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
82  DCHECK_EQ(frame.stream_id, id_);
83  if (read_side_closed_) {
84    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
85    // We don't want to be reading: blackhole the data.
86    return true;
87  }
88  // Note: This count include duplicate data received.
89  stream_bytes_read_ += frame.data.length();
90
91  bool accepted = sequencer_.OnStreamFrame(frame);
92
93  return accepted;
94}
95
96void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
97  stream_error_ = error;
98  TerminateFromPeer(false);  // Full close.
99}
100
101void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
102  if (read_side_closed_ && write_side_closed_) {
103    return;
104  }
105  if (error != QUIC_NO_ERROR) {
106    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
107    connection_error_ = error;
108  }
109
110  if (from_peer) {
111    TerminateFromPeer(false);
112  } else {
113    CloseWriteSide();
114    CloseReadSide();
115  }
116}
117
118void ReliableQuicStream::TerminateFromPeer(bool half_close) {
119  if (!half_close) {
120    CloseWriteSide();
121  }
122  CloseReadSide();
123}
124
125void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
126  stream_error_ = error;
127  if (error != QUIC_STREAM_NO_ERROR)  {
128    // Sending a RstStream results in calling CloseStream.
129    session()->SendRstStream(id(), error);
130  } else {
131    session_->CloseStream(id());
132  }
133}
134
135size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
136  if (headers_decompressed_ && decompressed_headers_.empty()) {
137    return sequencer_.Readv(iov, iov_len);
138  }
139  size_t bytes_consumed = 0;
140  size_t iov_index = 0;
141  while (iov_index < iov_len &&
142         decompressed_headers_.length() > bytes_consumed) {
143    size_t bytes_to_read = min(iov[iov_index].iov_len,
144                               decompressed_headers_.length() - bytes_consumed);
145    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
146    memcpy(iov_ptr,
147           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
148    bytes_consumed += bytes_to_read;
149    ++iov_index;
150  }
151  decompressed_headers_.erase(0, bytes_consumed);
152  return bytes_consumed;
153}
154
155int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
156  if (headers_decompressed_ && decompressed_headers_.empty()) {
157    return sequencer_.GetReadableRegions(iov, iov_len);
158  }
159  if (iov_len == 0) {
160    return 0;
161  }
162  iov[0].iov_base = static_cast<void*>(
163      const_cast<char*>(decompressed_headers_.data()));
164  iov[0].iov_len = decompressed_headers_.length();
165  return 1;
166}
167
168bool ReliableQuicStream::IsHalfClosed() const {
169  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
170    return false;
171  }
172  return sequencer_.IsHalfClosed();
173}
174
175bool ReliableQuicStream::HasBytesToRead() const {
176  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
177}
178
179const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
180  return session_->peer_address();
181}
182
183QuicSpdyCompressor* ReliableQuicStream::compressor() {
184  return session_->compressor();
185}
186
187bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
188  return session_->GetSSLInfo(ssl_info);
189}
190
191QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
192  DCHECK(data.size() > 0 || fin);
193  return WriteOrBuffer(data, fin);
194}
195
196QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
197  DCHECK(!fin_buffered_);
198
199  QuicConsumedData consumed_data(0, false);
200  fin_buffered_ = fin;
201
202  if (queued_data_.empty()) {
203    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
204    DCHECK_LE(consumed_data.bytes_consumed, data.length());
205  }
206
207  // If there's unconsumed data or an unconsumed fin, queue it.
208  if (consumed_data.bytes_consumed < data.length() ||
209      (fin && !consumed_data.fin_consumed)) {
210    queued_data_.push_back(
211        string(data.data() + consumed_data.bytes_consumed,
212               data.length() - consumed_data.bytes_consumed));
213  }
214
215  return QuicConsumedData(data.size(), true);
216}
217
218void ReliableQuicStream::OnCanWrite() {
219  bool fin = false;
220  while (!queued_data_.empty()) {
221    const string& data = queued_data_.front();
222    if (queued_data_.size() == 1 && fin_buffered_) {
223      fin = true;
224    }
225    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
226    if (consumed_data.bytes_consumed == data.size() &&
227        fin == consumed_data.fin_consumed) {
228      queued_data_.pop_front();
229    } else {
230      queued_data_.front().erase(0, consumed_data.bytes_consumed);
231      break;
232    }
233  }
234}
235
236QuicConsumedData ReliableQuicStream::WriteDataInternal(
237    StringPiece data, bool fin) {
238  if (write_side_closed_) {
239    DLOG(ERROR) << "Attempt to write when the write side is closed";
240    return QuicConsumedData(0, false);
241  }
242
243  QuicConsumedData consumed_data =
244      session()->WriteData(id(), data, stream_bytes_written_, fin);
245  stream_bytes_written_ += consumed_data.bytes_consumed;
246  if (consumed_data.bytes_consumed == data.length()) {
247    if (fin && consumed_data.fin_consumed) {
248      fin_sent_ = true;
249      CloseWriteSide();
250    } else if (fin && !consumed_data.fin_consumed) {
251      session_->MarkWriteBlocked(id());
252    }
253  } else {
254    session_->MarkWriteBlocked(id());
255  }
256  return consumed_data;
257}
258
259void ReliableQuicStream::CloseReadSide() {
260  if (read_side_closed_) {
261    return;
262  }
263  DLOG(INFO) << "Done reading from stream " << id();
264
265  read_side_closed_ = true;
266  if (write_side_closed_) {
267    DLOG(INFO) << "Closing stream: " << id();
268    session_->CloseStream(id());
269  }
270}
271
272uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
273  if (id() == kCryptoStreamId) {
274    if (data_len == 0) {
275      return 0;
276    }
277    // The crypto stream does not use compression.
278    return ProcessData(data, data_len);
279  }
280
281  uint32 total_bytes_consumed = 0;
282  if (headers_id_ == 0u) {
283    total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
284    data += total_bytes_consumed;
285    data_len -= total_bytes_consumed;
286    if (data_len == 0) {
287      return total_bytes_consumed;
288    }
289  }
290  DCHECK_NE(0u, headers_id_);
291
292  // Once the headers are finished, we simply pass the data through.
293  if (headers_decompressed_) {
294    // Some buffered header data remains.
295    if (!decompressed_headers_.empty()) {
296      ProcessHeaderData();
297    }
298    if (decompressed_headers_.empty()) {
299      DVLOG(1) << "Delegating procesing to ProcessData";
300      total_bytes_consumed += ProcessData(data, data_len);
301    }
302    return total_bytes_consumed;
303  }
304
305  QuicHeaderId current_header_id =
306      session_->decompressor()->current_header_id();
307  // Ensure that this header id looks sane.
308  if (headers_id_ < current_header_id ||
309      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
310    DVLOG(1) << "Invalid headers for stream: " << id()
311             << " header_id: " << headers_id_
312             << " current_header_id: " << current_header_id;
313    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
314    return total_bytes_consumed;
315  }
316
317  // If we are head-of-line blocked on decompression, then back up.
318  if (current_header_id != headers_id_) {
319    session_->MarkDecompressionBlocked(headers_id_, id());
320    DVLOG(1) << "Unable to decompress header data for stream: " << id()
321             << " header_id: " << headers_id_;
322    return total_bytes_consumed;
323  }
324
325  // Decompressed data will be delivered to decompressed_headers_.
326  size_t bytes_consumed = session_->decompressor()->DecompressData(
327      StringPiece(data, data_len), this);
328  DCHECK_NE(0u, bytes_consumed);
329  if (bytes_consumed > data_len) {
330    DCHECK(false) << "DecompressData returned illegal value";
331    OnDecompressionError();
332    return total_bytes_consumed;
333  }
334  total_bytes_consumed += bytes_consumed;
335  data += bytes_consumed;
336  data_len -= bytes_consumed;
337
338  if (decompression_failed_) {
339    // The session will have been closed in OnDecompressionError.
340    return total_bytes_consumed;
341  }
342
343  // Headers are complete if the decompressor has moved on to the
344  // next stream.
345  headers_decompressed_ =
346      session_->decompressor()->current_header_id() != headers_id_;
347  if (!headers_decompressed_) {
348    DCHECK_EQ(0u, data_len);
349  }
350
351  ProcessHeaderData();
352
353  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
354    return total_bytes_consumed;
355  }
356
357  // We have processed all of the decompressed data but we might
358  // have some more raw data to process.
359  if (data_len > 0) {
360    total_bytes_consumed += ProcessData(data, data_len);
361  }
362
363  // The sequencer will push any additional buffered frames if this data
364  // has been completely consumed.
365  return total_bytes_consumed;
366}
367
368uint32 ReliableQuicStream::ProcessHeaderData() {
369  if (decompressed_headers_.empty()) {
370    return 0;
371  }
372
373  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
374                                       decompressed_headers_.length());
375  if (bytes_processed == decompressed_headers_.length()) {
376    decompressed_headers_.clear();
377  } else {
378    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
379  }
380  return bytes_processed;
381}
382
383void ReliableQuicStream::OnDecompressorAvailable() {
384  DCHECK_EQ(headers_id_,
385            session_->decompressor()->current_header_id());
386  DCHECK(!headers_decompressed_);
387  DCHECK(!decompression_failed_);
388  DCHECK_EQ(0u, decompressed_headers_.length());
389
390  while (!headers_decompressed_) {
391    struct iovec iovec;
392    if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
393      return;
394    }
395
396    size_t bytes_consumed = session_->decompressor()->DecompressData(
397        StringPiece(static_cast<char*>(iovec.iov_base),
398                    iovec.iov_len),
399        this);
400    DCHECK_LE(bytes_consumed, iovec.iov_len);
401    if (decompression_failed_) {
402      return;
403    }
404    sequencer_.MarkConsumed(bytes_consumed);
405
406    headers_decompressed_ =
407        session_->decompressor()->current_header_id() != headers_id_;
408  }
409
410  // Either the headers are complete, or the all data as been consumed.
411  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
412  if (IsHalfClosed()) {
413    TerminateFromPeer(true);
414  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
415    sequencer_.FlushBufferedFrames();
416  }
417}
418
419bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
420  data.AppendToString(&decompressed_headers_);
421  return true;
422}
423
424void ReliableQuicStream::OnDecompressionError() {
425  DCHECK(!decompression_failed_);
426  decompression_failed_ = true;
427  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
428}
429
430
431void ReliableQuicStream::CloseWriteSide() {
432  if (write_side_closed_) {
433    return;
434  }
435  DLOG(INFO) << "Done writing to stream " << id();
436
437  write_side_closed_ = true;
438  if (read_side_closed_) {
439    DLOG(INFO) << "Closing stream: " << id();
440    session_->CloseStream(id());
441  }
442}
443
444bool ReliableQuicStream::HasBufferedData() {
445  return !queued_data_.empty();
446}
447
448void ReliableQuicStream::OnClose() {
449  CloseReadSide();
450  CloseWriteSide();
451
452  if (visitor_) {
453    Visitor* visitor = visitor_;
454    // Calling Visitor::OnClose() may result the destruction of the visitor,
455    // so we need to ensure we don't call it again.
456    visitor_ = NULL;
457    visitor->OnClose(this);
458  }
459}
460
461uint32 ReliableQuicStream::StripPriorityAndHeaderId(
462    const char* data, uint32 data_len) {
463  uint32 total_bytes_parsed = 0;
464
465  if (!priority_parsed_ &&
466      session_->connection()->version() >= QUIC_VERSION_9 &&
467      session_->connection()->is_server()) {
468    total_bytes_parsed = StripUint32(
469        data, data_len, &headers_id_and_priority_buffer_, &priority_);
470    if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
471      // TODO(alyssar) check for priority out of bounds.
472      priority_parsed_ = true;
473    }
474    data += total_bytes_parsed;
475    data_len -= total_bytes_parsed;
476  }
477  if (data_len > 0 && headers_id_ == 0u) {
478    // The headers ID has not yet been read.  Strip it from the beginning of
479    // the data stream.
480    total_bytes_parsed += StripUint32(
481        data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
482  }
483  return total_bytes_parsed;
484}
485
486}  // namespace net
487