reliable_quic_stream.cc revision 68043e1e95eeb07d5cae7aca370b26518b0867d6
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9#include "net/spdy/write_blocked_list.h"
10
11using base::StringPiece;
12using std::min;
13
14namespace net {
15
16namespace {
17
18// This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
19// to set a priority client-side, or cancel a stream before stripping the
20// priority from the wire server-side.  In either case, start out with a
21// priority in the middle.
22QuicPriority kDefaultPriority = 3;
23
24// Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
25// reaches 4 bytes, copies the data into 'result' and clears
26// partial_data_buffer.
27// Returns the number of bytes consumed.
28uint32 StripUint32(const char* data, uint32 data_len,
29                   string* partial_data_buffer,
30                   uint32* result) {
31  DCHECK_GT(4u, partial_data_buffer->length());
32  size_t missing_size = 4 - partial_data_buffer->length();
33  if (data_len < missing_size) {
34    StringPiece(data, data_len).AppendToString(partial_data_buffer);
35    return data_len;
36  }
37  StringPiece(data, missing_size).AppendToString(partial_data_buffer);
38  DCHECK_EQ(4u, partial_data_buffer->length());
39  memcpy(result, partial_data_buffer->data(), 4);
40  partial_data_buffer->clear();
41  return missing_size;
42}
43
44}  // namespace
45
46ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
47                                       QuicSession* session)
48    : sequencer_(this),
49      id_(id),
50      session_(session),
51      visitor_(NULL),
52      stream_bytes_read_(0),
53      stream_bytes_written_(0),
54      headers_decompressed_(false),
55      priority_(kDefaultPriority),
56      headers_id_(0),
57      decompression_failed_(false),
58      stream_error_(QUIC_STREAM_NO_ERROR),
59      connection_error_(QUIC_NO_ERROR),
60      read_side_closed_(false),
61      write_side_closed_(false),
62      priority_parsed_(false),
63      fin_buffered_(false),
64      fin_sent_(false) {
65}
66
67ReliableQuicStream::~ReliableQuicStream() {
68}
69
70bool ReliableQuicStream::WillAcceptStreamFrame(
71    const QuicStreamFrame& frame) const {
72  if (read_side_closed_) {
73    return true;
74  }
75  if (frame.stream_id != id_) {
76    LOG(ERROR) << "Error!";
77    return false;
78  }
79  return sequencer_.WillAcceptStreamFrame(frame);
80}
81
82bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
83  DCHECK_EQ(frame.stream_id, id_);
84  if (read_side_closed_) {
85    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
86    // We don't want to be reading: blackhole the data.
87    return true;
88  }
89  // Note: This count include duplicate data received.
90  stream_bytes_read_ += frame.data.length();
91
92  bool accepted = sequencer_.OnStreamFrame(frame);
93
94  return accepted;
95}
96
97void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
98  stream_error_ = error;
99  TerminateFromPeer(false);  // Full close.
100}
101
102void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
103  if (read_side_closed_ && write_side_closed_) {
104    return;
105  }
106  if (error != QUIC_NO_ERROR) {
107    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
108    connection_error_ = error;
109  }
110
111  if (from_peer) {
112    TerminateFromPeer(false);
113  } else {
114    CloseWriteSide();
115    CloseReadSide();
116  }
117}
118
119void ReliableQuicStream::TerminateFromPeer(bool half_close) {
120  if (!half_close) {
121    CloseWriteSide();
122  }
123  CloseReadSide();
124}
125
126void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
127  stream_error_ = error;
128  if (error != QUIC_STREAM_NO_ERROR)  {
129    // Sending a RstStream results in calling CloseStream.
130    session()->SendRstStream(id(), error);
131  } else {
132    session_->CloseStream(id());
133  }
134}
135
136size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
137  if (headers_decompressed_ && decompressed_headers_.empty()) {
138    return sequencer_.Readv(iov, iov_len);
139  }
140  size_t bytes_consumed = 0;
141  size_t iov_index = 0;
142  while (iov_index < iov_len &&
143         decompressed_headers_.length() > bytes_consumed) {
144    size_t bytes_to_read = min(iov[iov_index].iov_len,
145                               decompressed_headers_.length() - bytes_consumed);
146    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
147    memcpy(iov_ptr,
148           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
149    bytes_consumed += bytes_to_read;
150    ++iov_index;
151  }
152  decompressed_headers_.erase(0, bytes_consumed);
153  return bytes_consumed;
154}
155
156int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
157  if (headers_decompressed_ && decompressed_headers_.empty()) {
158    return sequencer_.GetReadableRegions(iov, iov_len);
159  }
160  if (iov_len == 0) {
161    return 0;
162  }
163  iov[0].iov_base = static_cast<void*>(
164      const_cast<char*>(decompressed_headers_.data()));
165  iov[0].iov_len = decompressed_headers_.length();
166  return 1;
167}
168
169bool ReliableQuicStream::IsHalfClosed() const {
170  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
171    return false;
172  }
173  return sequencer_.IsHalfClosed();
174}
175
176bool ReliableQuicStream::HasBytesToRead() const {
177  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
178}
179
180const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
181  return session_->peer_address();
182}
183
184QuicSpdyCompressor* ReliableQuicStream::compressor() {
185  return session_->compressor();
186}
187
188bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
189  return session_->GetSSLInfo(ssl_info);
190}
191
192QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
193  DCHECK(data.size() > 0 || fin);
194  return WriteOrBuffer(data, fin);
195}
196
197
198void ReliableQuicStream::set_priority(QuicPriority priority) {
199  DCHECK_EQ(0u, stream_bytes_written_);
200  priority_ = priority;
201}
202
203QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
204  DCHECK(!fin_buffered_);
205
206  QuicConsumedData consumed_data(0, false);
207  fin_buffered_ = fin;
208
209  if (queued_data_.empty()) {
210    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
211    DCHECK_LE(consumed_data.bytes_consumed, data.length());
212  }
213
214  // If there's unconsumed data or an unconsumed fin, queue it.
215  if (consumed_data.bytes_consumed < data.length() ||
216      (fin && !consumed_data.fin_consumed)) {
217    queued_data_.push_back(
218        string(data.data() + consumed_data.bytes_consumed,
219               data.length() - consumed_data.bytes_consumed));
220  }
221
222  return QuicConsumedData(data.size(), true);
223}
224
225void ReliableQuicStream::OnCanWrite() {
226  bool fin = false;
227  while (!queued_data_.empty()) {
228    const string& data = queued_data_.front();
229    if (queued_data_.size() == 1 && fin_buffered_) {
230      fin = true;
231    }
232    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
233    if (consumed_data.bytes_consumed == data.size() &&
234        fin == consumed_data.fin_consumed) {
235      queued_data_.pop_front();
236    } else {
237      queued_data_.front().erase(0, consumed_data.bytes_consumed);
238      break;
239    }
240  }
241}
242
243QuicConsumedData ReliableQuicStream::WriteDataInternal(
244    StringPiece data, bool fin) {
245  struct iovec iov = {const_cast<char*>(data.data()),
246                      static_cast<size_t>(data.size())};
247  return WritevDataInternal(&iov, 1, fin);
248}
249
250QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov,
251                                                        int iov_count,
252                                                        bool fin) {
253  if (write_side_closed_) {
254    DLOG(ERROR) << "Attempt to write when the write side is closed";
255    return QuicConsumedData(0, false);
256  }
257
258  size_t write_length = 0u;
259  for (int i = 0; i < iov_count; ++i) {
260    write_length += iov[i].iov_len;
261  }
262  QuicConsumedData consumed_data =
263      session()->WritevData(id(), iov, iov_count, stream_bytes_written_, fin);
264  stream_bytes_written_ += consumed_data.bytes_consumed;
265  if (consumed_data.bytes_consumed == write_length) {
266    if (fin && consumed_data.fin_consumed) {
267      fin_sent_ = true;
268      CloseWriteSide();
269    } else if (fin && !consumed_data.fin_consumed) {
270      session_->MarkWriteBlocked(id(), EffectivePriority());
271    }
272  } else {
273    session_->MarkWriteBlocked(id(), EffectivePriority());
274  }
275  return consumed_data;
276}
277
278QuicPriority ReliableQuicStream::EffectivePriority() const {
279  return priority();
280}
281
282void ReliableQuicStream::CloseReadSide() {
283  if (read_side_closed_) {
284    return;
285  }
286  DLOG(INFO) << "Done reading from stream " << id();
287
288  read_side_closed_ = true;
289  if (write_side_closed_) {
290    DLOG(INFO) << "Closing stream: " << id();
291    session_->CloseStream(id());
292  }
293}
294
295uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
296  DCHECK_NE(0u, data_len);
297  if (id() == kCryptoStreamId) {
298    // The crypto stream does not use compression.
299    return ProcessData(data, data_len);
300  }
301
302  uint32 total_bytes_consumed = 0;
303  if (headers_id_ == 0u) {
304    total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
305    data += total_bytes_consumed;
306    data_len -= total_bytes_consumed;
307    if (data_len == 0 || !session_->connection()->connected()) {
308      return total_bytes_consumed;
309    }
310  }
311  DCHECK_NE(0u, headers_id_);
312
313  // Once the headers are finished, we simply pass the data through.
314  if (headers_decompressed_) {
315    // Some buffered header data remains.
316    if (!decompressed_headers_.empty()) {
317      ProcessHeaderData();
318    }
319    if (decompressed_headers_.empty()) {
320      DVLOG(1) << "Delegating procesing to ProcessData";
321      total_bytes_consumed += ProcessData(data, data_len);
322    }
323    return total_bytes_consumed;
324  }
325
326  QuicHeaderId current_header_id =
327      session_->decompressor()->current_header_id();
328  // Ensure that this header id looks sane.
329  if (headers_id_ < current_header_id ||
330      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
331    DVLOG(1) << "Invalid headers for stream: " << id()
332             << " header_id: " << headers_id_
333             << " current_header_id: " << current_header_id;
334    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
335    return total_bytes_consumed;
336  }
337
338  // If we are head-of-line blocked on decompression, then back up.
339  if (current_header_id != headers_id_) {
340    session_->MarkDecompressionBlocked(headers_id_, id());
341    DVLOG(1) << "Unable to decompress header data for stream: " << id()
342             << " header_id: " << headers_id_;
343    return total_bytes_consumed;
344  }
345
346  // Decompressed data will be delivered to decompressed_headers_.
347  size_t bytes_consumed = session_->decompressor()->DecompressData(
348      StringPiece(data, data_len), this);
349  DCHECK_NE(0u, bytes_consumed);
350  if (bytes_consumed > data_len) {
351    DCHECK(false) << "DecompressData returned illegal value";
352    OnDecompressionError();
353    return total_bytes_consumed;
354  }
355  total_bytes_consumed += bytes_consumed;
356  data += bytes_consumed;
357  data_len -= bytes_consumed;
358
359  if (decompression_failed_) {
360    // The session will have been closed in OnDecompressionError.
361    return total_bytes_consumed;
362  }
363
364  // Headers are complete if the decompressor has moved on to the
365  // next stream.
366  headers_decompressed_ =
367      session_->decompressor()->current_header_id() != headers_id_;
368  if (!headers_decompressed_) {
369    DCHECK_EQ(0u, data_len);
370  }
371
372  ProcessHeaderData();
373
374  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
375    return total_bytes_consumed;
376  }
377
378  // We have processed all of the decompressed data but we might
379  // have some more raw data to process.
380  if (data_len > 0) {
381    total_bytes_consumed += ProcessData(data, data_len);
382  }
383
384  // The sequencer will push any additional buffered frames if this data
385  // has been completely consumed.
386  return total_bytes_consumed;
387}
388
389uint32 ReliableQuicStream::ProcessHeaderData() {
390  if (decompressed_headers_.empty()) {
391    return 0;
392  }
393
394  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
395                                       decompressed_headers_.length());
396  if (bytes_processed == decompressed_headers_.length()) {
397    decompressed_headers_.clear();
398  } else {
399    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
400  }
401  return bytes_processed;
402}
403
404void ReliableQuicStream::OnDecompressorAvailable() {
405  DCHECK_EQ(headers_id_,
406            session_->decompressor()->current_header_id());
407  DCHECK(!headers_decompressed_);
408  DCHECK(!decompression_failed_);
409  DCHECK_EQ(0u, decompressed_headers_.length());
410
411  while (!headers_decompressed_) {
412    struct iovec iovec;
413    if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
414      return;
415    }
416
417    size_t bytes_consumed = session_->decompressor()->DecompressData(
418        StringPiece(static_cast<char*>(iovec.iov_base),
419                    iovec.iov_len),
420        this);
421    DCHECK_LE(bytes_consumed, iovec.iov_len);
422    if (decompression_failed_) {
423      return;
424    }
425    sequencer_.MarkConsumed(bytes_consumed);
426
427    headers_decompressed_ =
428        session_->decompressor()->current_header_id() != headers_id_;
429  }
430
431  // Either the headers are complete, or the all data as been consumed.
432  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
433  if (IsHalfClosed()) {
434    TerminateFromPeer(true);
435  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
436    sequencer_.FlushBufferedFrames();
437  }
438}
439
440bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
441  data.AppendToString(&decompressed_headers_);
442  return true;
443}
444
445void ReliableQuicStream::OnDecompressionError() {
446  DCHECK(!decompression_failed_);
447  decompression_failed_ = true;
448  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
449}
450
451
452void ReliableQuicStream::CloseWriteSide() {
453  if (write_side_closed_) {
454    return;
455  }
456  DLOG(INFO) << "Done writing to stream " << id();
457
458  write_side_closed_ = true;
459  if (read_side_closed_) {
460    DLOG(INFO) << "Closing stream: " << id();
461    session_->CloseStream(id());
462  }
463}
464
465bool ReliableQuicStream::HasBufferedData() {
466  return !queued_data_.empty();
467}
468
469void ReliableQuicStream::OnClose() {
470  CloseReadSide();
471  CloseWriteSide();
472
473  if (visitor_) {
474    Visitor* visitor = visitor_;
475    // Calling Visitor::OnClose() may result the destruction of the visitor,
476    // so we need to ensure we don't call it again.
477    visitor_ = NULL;
478    visitor->OnClose(this);
479  }
480}
481
482uint32 ReliableQuicStream::StripPriorityAndHeaderId(
483    const char* data, uint32 data_len) {
484  uint32 total_bytes_parsed = 0;
485
486  if (!priority_parsed_ && session_->connection()->is_server()) {
487    QuicPriority temporary_priority = priority_;
488    total_bytes_parsed = StripUint32(
489        data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
490    if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
491      priority_parsed_ = true;
492      // Spdy priorities are inverted, so the highest numerical value is the
493      // lowest legal priority.
494      if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
495        session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
496        return 0;
497      }
498      priority_ = temporary_priority;
499    }
500    data += total_bytes_parsed;
501    data_len -= total_bytes_parsed;
502  }
503  if (data_len > 0 && headers_id_ == 0u) {
504    // The headers ID has not yet been read.  Strip it from the beginning of
505    // the data stream.
506    total_bytes_parsed += StripUint32(
507        data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
508  }
509  return total_bytes_parsed;
510}
511
512}  // namespace net
513