reliable_quic_stream.cc revision a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9#include "net/spdy/write_blocked_list.h"
10
11using base::StringPiece;
12using std::min;
13
14namespace net {
15
16namespace {
17
18// This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
19// to set a priority client-side, or cancel a stream before stripping the
20// priority from the wire server-side.  In either case, start out with a
21// priority in the middle.
22QuicPriority kDefaultPriority = 3;
23
24// Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
25// reaches 4 bytes, copies the data into 'result' and clears
26// partial_data_buffer.
27// Returns the number of bytes consumed.
28uint32 StripUint32(const char* data, uint32 data_len,
29                   string* partial_data_buffer,
30                   uint32* result) {
31  DCHECK_GT(4u, partial_data_buffer->length());
32  size_t missing_size = 4 - partial_data_buffer->length();
33  if (data_len < missing_size) {
34    StringPiece(data, data_len).AppendToString(partial_data_buffer);
35    return data_len;
36  }
37  StringPiece(data, missing_size).AppendToString(partial_data_buffer);
38  DCHECK_EQ(4u, partial_data_buffer->length());
39  memcpy(result, partial_data_buffer->data(), 4);
40  partial_data_buffer->clear();
41  return missing_size;
42}
43
44}  // namespace
45
46ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
47                                       QuicSession* session)
48    : sequencer_(this),
49      id_(id),
50      session_(session),
51      visitor_(NULL),
52      stream_bytes_read_(0),
53      stream_bytes_written_(0),
54      headers_decompressed_(false),
55      priority_(kDefaultPriority),
56      headers_id_(0),
57      decompression_failed_(false),
58      stream_error_(QUIC_STREAM_NO_ERROR),
59      connection_error_(QUIC_NO_ERROR),
60      read_side_closed_(false),
61      write_side_closed_(false),
62      priority_parsed_(false),
63      fin_buffered_(false),
64      fin_sent_(false),
65      is_server_(session_->is_server()) {
66}
67
68ReliableQuicStream::~ReliableQuicStream() {
69}
70
71bool ReliableQuicStream::WillAcceptStreamFrame(
72    const QuicStreamFrame& frame) const {
73  if (read_side_closed_) {
74    return true;
75  }
76  if (frame.stream_id != id_) {
77    LOG(ERROR) << "Error!";
78    return false;
79  }
80  return sequencer_.WillAcceptStreamFrame(frame);
81}
82
83bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
84  DCHECK_EQ(frame.stream_id, id_);
85  if (read_side_closed_) {
86    DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
87    // We don't want to be reading: blackhole the data.
88    return true;
89  }
90  // Note: This count include duplicate data received.
91  stream_bytes_read_ += frame.data.TotalBufferSize();
92
93  bool accepted = sequencer_.OnStreamFrame(frame);
94
95  return accepted;
96}
97
98void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
99  stream_error_ = error;
100  CloseWriteSide();
101  CloseReadSide();
102}
103
104void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
105                                            bool from_peer) {
106  if (read_side_closed_ && write_side_closed_) {
107    return;
108  }
109  if (error != QUIC_NO_ERROR) {
110    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
111    connection_error_ = error;
112  }
113
114  CloseWriteSide();
115  CloseReadSide();
116}
117
118void ReliableQuicStream::OnFinRead() {
119  DCHECK(sequencer_.IsClosed());
120  CloseReadSide();
121}
122
123void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
124  DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
125  stream_error_ = error;
126  // Sending a RstStream results in calling CloseStream.
127  session()->SendRstStream(id(), error);
128}
129
130void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
131  session()->connection()->SendConnectionClose(error);
132}
133
134void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
135                                                    const string& details) {
136  session()->connection()->SendConnectionCloseWithDetails(error, details);
137}
138
139size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
140  if (headers_decompressed_ && decompressed_headers_.empty()) {
141    return sequencer_.Readv(iov, iov_len);
142  }
143  size_t bytes_consumed = 0;
144  size_t iov_index = 0;
145  while (iov_index < iov_len &&
146         decompressed_headers_.length() > bytes_consumed) {
147    size_t bytes_to_read = min(iov[iov_index].iov_len,
148                               decompressed_headers_.length() - bytes_consumed);
149    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
150    memcpy(iov_ptr,
151           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
152    bytes_consumed += bytes_to_read;
153    ++iov_index;
154  }
155  decompressed_headers_.erase(0, bytes_consumed);
156  return bytes_consumed;
157}
158
159int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
160  if (headers_decompressed_ && decompressed_headers_.empty()) {
161    return sequencer_.GetReadableRegions(iov, iov_len);
162  }
163  if (iov_len == 0) {
164    return 0;
165  }
166  iov[0].iov_base = static_cast<void*>(
167      const_cast<char*>(decompressed_headers_.data()));
168  iov[0].iov_len = decompressed_headers_.length();
169  return 1;
170}
171
172bool ReliableQuicStream::IsDoneReading() const {
173  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
174    return false;
175  }
176  return sequencer_.IsClosed();
177}
178
179bool ReliableQuicStream::HasBytesToRead() const {
180  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
181}
182
183const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
184  return session_->peer_address();
185}
186
187QuicSpdyCompressor* ReliableQuicStream::compressor() {
188  return session_->compressor();
189}
190
191bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
192  return session_->GetSSLInfo(ssl_info);
193}
194
195QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
196  DCHECK(data.size() > 0 || fin);
197  return WriteOrBuffer(data, fin);
198}
199
200
201void ReliableQuicStream::set_priority(QuicPriority priority) {
202  DCHECK_EQ(0u, stream_bytes_written_);
203  priority_ = priority;
204}
205
206QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
207  DCHECK(!fin_buffered_);
208
209  QuicConsumedData consumed_data(0, false);
210  fin_buffered_ = fin;
211
212  if (queued_data_.empty()) {
213    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
214    DCHECK_LE(consumed_data.bytes_consumed, data.length());
215  }
216
217  // If there's unconsumed data or an unconsumed fin, queue it.
218  if (consumed_data.bytes_consumed < data.length() ||
219      (fin && !consumed_data.fin_consumed)) {
220    queued_data_.push_back(
221        string(data.data() + consumed_data.bytes_consumed,
222               data.length() - consumed_data.bytes_consumed));
223  }
224
225  return QuicConsumedData(data.size(), true);
226}
227
228void ReliableQuicStream::OnCanWrite() {
229  bool fin = false;
230  while (!queued_data_.empty()) {
231    const string& data = queued_data_.front();
232    if (queued_data_.size() == 1 && fin_buffered_) {
233      fin = true;
234    }
235    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
236    if (consumed_data.bytes_consumed == data.size() &&
237        fin == consumed_data.fin_consumed) {
238      queued_data_.pop_front();
239    } else {
240      queued_data_.front().erase(0, consumed_data.bytes_consumed);
241      break;
242    }
243  }
244}
245
246QuicConsumedData ReliableQuicStream::WriteDataInternal(
247    StringPiece data, bool fin) {
248  struct iovec iov = {const_cast<char*>(data.data()),
249                      static_cast<size_t>(data.size())};
250  return WritevDataInternal(&iov, 1, fin, NULL);
251}
252
253QuicConsumedData ReliableQuicStream::WritevDataInternal(
254    const struct iovec* iov,
255    int iov_count,
256    bool fin,
257    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
258  if (write_side_closed_) {
259    DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
260    return QuicConsumedData(0, false);
261  }
262
263  size_t write_length = 0u;
264  for (int i = 0; i < iov_count; ++i) {
265    write_length += iov[i].iov_len;
266  }
267  QuicConsumedData consumed_data = session()->WritevData(
268      id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate);
269  stream_bytes_written_ += consumed_data.bytes_consumed;
270  if (consumed_data.bytes_consumed == write_length) {
271    if (fin && consumed_data.fin_consumed) {
272      fin_sent_ = true;
273      CloseWriteSide();
274    } else if (fin && !consumed_data.fin_consumed) {
275      session_->MarkWriteBlocked(id(), EffectivePriority());
276    }
277  } else {
278    session_->MarkWriteBlocked(id(), EffectivePriority());
279  }
280  return consumed_data;
281}
282
283QuicPriority ReliableQuicStream::EffectivePriority() const {
284  return priority();
285}
286
287void ReliableQuicStream::CloseReadSide() {
288  if (read_side_closed_) {
289    return;
290  }
291  DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
292
293  read_side_closed_ = true;
294  if (write_side_closed_) {
295    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
296    session_->CloseStream(id());
297  }
298}
299
300uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
301  DCHECK_NE(0u, data_len);
302  if (id() == kCryptoStreamId) {
303    // The crypto stream does not use compression.
304    return ProcessData(data, data_len);
305  }
306
307  uint32 total_bytes_consumed = 0;
308  if (headers_id_ == 0u) {
309    total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
310    data += total_bytes_consumed;
311    data_len -= total_bytes_consumed;
312    if (data_len == 0 || total_bytes_consumed == 0) {
313      return total_bytes_consumed;
314    }
315  }
316  DCHECK_NE(0u, headers_id_);
317
318  // Once the headers are finished, we simply pass the data through.
319  if (headers_decompressed_) {
320    // Some buffered header data remains.
321    if (!decompressed_headers_.empty()) {
322      ProcessHeaderData();
323    }
324    if (decompressed_headers_.empty()) {
325      DVLOG(1) << "Delegating procesing to ProcessData";
326      total_bytes_consumed += ProcessData(data, data_len);
327    }
328    return total_bytes_consumed;
329  }
330
331  QuicHeaderId current_header_id =
332      session_->decompressor()->current_header_id();
333  // Ensure that this header id looks sane.
334  if (headers_id_ < current_header_id ||
335      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
336    DVLOG(1) << ENDPOINT
337             << "Invalid headers for stream: " << id()
338             << " header_id: " << headers_id_
339             << " current_header_id: " << current_header_id;
340    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
341    return total_bytes_consumed;
342  }
343
344  // If we are head-of-line blocked on decompression, then back up.
345  if (current_header_id != headers_id_) {
346    session_->MarkDecompressionBlocked(headers_id_, id());
347    DVLOG(1) << ENDPOINT
348             << "Unable to decompress header data for stream: " << id()
349             << " header_id: " << headers_id_;
350    return total_bytes_consumed;
351  }
352
353  // Decompressed data will be delivered to decompressed_headers_.
354  size_t bytes_consumed = session_->decompressor()->DecompressData(
355      StringPiece(data, data_len), this);
356  DCHECK_NE(0u, bytes_consumed);
357  if (bytes_consumed > data_len) {
358    DCHECK(false) << "DecompressData returned illegal value";
359    OnDecompressionError();
360    return total_bytes_consumed;
361  }
362  total_bytes_consumed += bytes_consumed;
363  data += bytes_consumed;
364  data_len -= bytes_consumed;
365
366  if (decompression_failed_) {
367    // The session will have been closed in OnDecompressionError.
368    return total_bytes_consumed;
369  }
370
371  // Headers are complete if the decompressor has moved on to the
372  // next stream.
373  headers_decompressed_ =
374      session_->decompressor()->current_header_id() != headers_id_;
375  if (!headers_decompressed_) {
376    DCHECK_EQ(0u, data_len);
377  }
378
379  ProcessHeaderData();
380
381  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
382    return total_bytes_consumed;
383  }
384
385  // We have processed all of the decompressed data but we might
386  // have some more raw data to process.
387  if (data_len > 0) {
388    total_bytes_consumed += ProcessData(data, data_len);
389  }
390
391  // The sequencer will push any additional buffered frames if this data
392  // has been completely consumed.
393  return total_bytes_consumed;
394}
395
396uint32 ReliableQuicStream::ProcessHeaderData() {
397  if (decompressed_headers_.empty()) {
398    return 0;
399  }
400
401  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
402                                       decompressed_headers_.length());
403  if (bytes_processed == decompressed_headers_.length()) {
404    decompressed_headers_.clear();
405  } else {
406    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
407  }
408  return bytes_processed;
409}
410
411void ReliableQuicStream::OnDecompressorAvailable() {
412  DCHECK_EQ(headers_id_,
413            session_->decompressor()->current_header_id());
414  DCHECK(!headers_decompressed_);
415  DCHECK(!decompression_failed_);
416  DCHECK_EQ(0u, decompressed_headers_.length());
417
418  while (!headers_decompressed_) {
419    struct iovec iovec;
420    if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
421      return;
422    }
423
424    size_t bytes_consumed = session_->decompressor()->DecompressData(
425        StringPiece(static_cast<char*>(iovec.iov_base),
426                    iovec.iov_len),
427        this);
428    DCHECK_LE(bytes_consumed, iovec.iov_len);
429    if (decompression_failed_) {
430      return;
431    }
432    sequencer_.MarkConsumed(bytes_consumed);
433
434    headers_decompressed_ =
435        session_->decompressor()->current_header_id() != headers_id_;
436  }
437
438  // Either the headers are complete, or the all data as been consumed.
439  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
440  if (IsDoneReading()) {
441    OnFinRead();
442  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
443    sequencer_.FlushBufferedFrames();
444  }
445}
446
447bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
448  data.AppendToString(&decompressed_headers_);
449  return true;
450}
451
452void ReliableQuicStream::OnDecompressionError() {
453  DCHECK(!decompression_failed_);
454  decompression_failed_ = true;
455  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
456}
457
458
459void ReliableQuicStream::CloseWriteSide() {
460  if (write_side_closed_) {
461    return;
462  }
463  DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
464
465  write_side_closed_ = true;
466  if (read_side_closed_) {
467    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
468    session_->CloseStream(id());
469  }
470}
471
472bool ReliableQuicStream::HasBufferedData() {
473  return !queued_data_.empty();
474}
475
476void ReliableQuicStream::OnClose() {
477  CloseReadSide();
478  CloseWriteSide();
479
480  if (visitor_) {
481    Visitor* visitor = visitor_;
482    // Calling Visitor::OnClose() may result the destruction of the visitor,
483    // so we need to ensure we don't call it again.
484    visitor_ = NULL;
485    visitor->OnClose(this);
486  }
487}
488
489uint32 ReliableQuicStream::StripPriorityAndHeaderId(
490    const char* data, uint32 data_len) {
491  uint32 total_bytes_parsed = 0;
492
493  if (!priority_parsed_ && session_->connection()->is_server()) {
494    QuicPriority temporary_priority = priority_;
495    total_bytes_parsed = StripUint32(
496        data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
497    if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) {
498      priority_parsed_ = true;
499
500      // Spdy priorities are inverted, so the highest numerical value is the
501      // lowest legal priority.
502      if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
503        session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
504        return 0;
505      }
506      priority_ = temporary_priority;
507    }
508    data += total_bytes_parsed;
509    data_len -= total_bytes_parsed;
510  }
511  if (data_len > 0 && headers_id_ == 0u) {
512    // The headers ID has not yet been read.  Strip it from the beginning of
513    // the data stream.
514    total_bytes_parsed += StripUint32(
515        data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
516  }
517  return total_bytes_parsed;
518}
519
520}  // namespace net
521