reliable_quic_stream.cc revision 7d4cd473f85ac64c3747c96c277f9e506a0d2246
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9
10using base::StringPiece;
11using std::min;
12
13namespace net {
14
15ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16                                       QuicSession* session)
17    : sequencer_(this),
18      id_(id),
19      session_(session),
20      visitor_(NULL),
21      stream_bytes_read_(0),
22      stream_bytes_written_(0),
23      headers_decompressed_(false),
24      headers_id_(0),
25      stream_error_(QUIC_STREAM_NO_ERROR),
26      connection_error_(QUIC_NO_ERROR),
27      read_side_closed_(false),
28      write_side_closed_(false),
29      fin_buffered_(false),
30      fin_sent_(false) {
31}
32
33ReliableQuicStream::~ReliableQuicStream() {
34}
35
36bool ReliableQuicStream::WillAcceptStreamFrame(
37    const QuicStreamFrame& frame) const {
38  if (read_side_closed_) {
39    return true;
40  }
41  if (frame.stream_id != id_) {
42    LOG(ERROR) << "Error!";
43    return false;
44  }
45  return sequencer_.WillAcceptStreamFrame(frame);
46}
47
48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
49  DCHECK_EQ(frame.stream_id, id_);
50  if (read_side_closed_) {
51    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
52    // We don't want to be reading: blackhole the data.
53    return true;
54  }
55  // Note: This count include duplicate data received.
56  stream_bytes_read_ += frame.data.length();
57
58  bool accepted = sequencer_.OnStreamFrame(frame);
59
60  if (frame.fin) {
61    sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size());
62  }
63
64  return accepted;
65}
66
67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
68  stream_error_ = error;
69  TerminateFromPeer(false);  // Full close.
70}
71
72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
73  if (read_side_closed_ && write_side_closed_) {
74    return;
75  }
76  if (error != QUIC_NO_ERROR) {
77    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
78    connection_error_ = error;
79  }
80
81  if (from_peer) {
82    TerminateFromPeer(false);
83  } else {
84    CloseWriteSide();
85    CloseReadSide();
86  }
87}
88
89void ReliableQuicStream::TerminateFromPeer(bool half_close) {
90  if (!half_close) {
91    CloseWriteSide();
92  }
93  CloseReadSide();
94}
95
96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
97  stream_error_ = error;
98  if (error != QUIC_STREAM_NO_ERROR)  {
99    // Sending a RstStream results in calling CloseStream.
100    session()->SendRstStream(id(), error);
101  } else {
102    session_->CloseStream(id());
103  }
104}
105
106size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
107  if (headers_decompressed_ && decompressed_headers_.empty()) {
108    return sequencer_.Readv(iov, iov_len);
109  }
110  size_t bytes_consumed = 0;
111  size_t iov_index = 0;
112  while (iov_index < iov_len &&
113         decompressed_headers_.length() > bytes_consumed) {
114    size_t bytes_to_read = min(iov[iov_index].iov_len,
115                               decompressed_headers_.length() - bytes_consumed);
116    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
117    memcpy(iov_ptr,
118           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
119    bytes_consumed += bytes_to_read;
120    ++iov_index;
121  }
122  decompressed_headers_.erase(0, bytes_consumed);
123  return bytes_consumed;
124}
125
126int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
127  if (headers_decompressed_ && decompressed_headers_.empty()) {
128    return sequencer_.GetReadableRegions(iov, iov_len);
129  }
130  if (iov_len == 0) {
131    return 0;
132  }
133  iov[0].iov_base = static_cast<void*>(
134      const_cast<char*>(decompressed_headers_.data()));
135  iov[0].iov_len = decompressed_headers_.length();
136  return 1;
137}
138
139bool ReliableQuicStream::IsHalfClosed() const {
140  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
141    return false;
142  }
143  return sequencer_.IsHalfClosed();
144}
145
146bool ReliableQuicStream::HasBytesToRead() const {
147  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
148}
149
150const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
151  return session_->peer_address();
152}
153
154QuicSpdyCompressor* ReliableQuicStream::compressor() {
155  return session_->compressor();
156}
157
158QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
159  DCHECK(data.size() > 0 || fin);
160  return WriteOrBuffer(data, fin);
161}
162
163QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
164  DCHECK(!fin_buffered_);
165
166  QuicConsumedData consumed_data(0, false);
167  fin_buffered_ = fin;
168
169  if (queued_data_.empty()) {
170    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
171    DCHECK_LE(consumed_data.bytes_consumed, data.length());
172  }
173
174  // If there's unconsumed data or an unconsumed fin, queue it.
175  if (consumed_data.bytes_consumed < data.length() ||
176      (fin && !consumed_data.fin_consumed)) {
177    queued_data_.push_back(
178        string(data.data() + consumed_data.bytes_consumed,
179               data.length() - consumed_data.bytes_consumed));
180  }
181
182  return QuicConsumedData(data.size(), true);
183}
184
185void ReliableQuicStream::OnCanWrite() {
186  bool fin = false;
187  while (!queued_data_.empty()) {
188    const string& data = queued_data_.front();
189    if (queued_data_.size() == 1 && fin_buffered_) {
190      fin = true;
191    }
192    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
193    if (consumed_data.bytes_consumed == data.size() &&
194        fin == consumed_data.fin_consumed) {
195      queued_data_.pop_front();
196    } else {
197      queued_data_.front().erase(0, consumed_data.bytes_consumed);
198      break;
199    }
200  }
201}
202
203QuicConsumedData ReliableQuicStream::WriteDataInternal(
204    StringPiece data, bool fin) {
205  if (write_side_closed_) {
206    DLOG(ERROR) << "Attempt to write when the write side is closed";
207    return QuicConsumedData(0, false);
208  }
209
210  QuicConsumedData consumed_data =
211      session()->WriteData(id(), data, stream_bytes_written_, fin);
212  stream_bytes_written_ += consumed_data.bytes_consumed;
213  if (consumed_data.bytes_consumed == data.length()) {
214    if (fin && consumed_data.fin_consumed) {
215      fin_sent_ = true;
216      CloseWriteSide();
217    } else if (fin && !consumed_data.fin_consumed) {
218      session_->MarkWriteBlocked(id());
219    }
220  } else {
221    session_->MarkWriteBlocked(id());
222  }
223  return consumed_data;
224}
225
226void ReliableQuicStream::CloseReadSide() {
227  if (read_side_closed_) {
228    return;
229  }
230  DLOG(INFO) << "Done reading from stream " << id();
231
232  read_side_closed_ = true;
233  if (write_side_closed_) {
234    DLOG(INFO) << "Closing stream: " << id();
235    session_->CloseStream(id());
236  }
237}
238
239uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
240  if (id() == kCryptoStreamId) {
241    // The crypto stream does not use compression.
242    return ProcessData(data, data_len);
243  }
244  uint32 total_bytes_consumed = 0;
245  if (headers_id_ == 0u) {
246    // The headers ID has not yet been read.  Strip it from the beginning of
247    // the data stream.
248    DCHECK_GT(4u, headers_id_buffer_.length());
249    size_t missing_size = 4 - headers_id_buffer_.length();
250    if (data_len < missing_size) {
251      StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
252      return data_len;
253    }
254    total_bytes_consumed += missing_size;
255    StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
256    DCHECK_EQ(4u, headers_id_buffer_.length());
257    memcpy(&headers_id_, headers_id_buffer_.data(), 4);
258    headers_id_buffer_.clear();
259    data += missing_size;
260    data_len -= missing_size;
261  }
262  DCHECK_NE(0u, headers_id_);
263
264  // Once the headers are finished, we simply pass the data through.
265  if (headers_decompressed_) {
266    // Some buffered header data remains.
267    if (!decompressed_headers_.empty()) {
268      ProcessHeaderData();
269    }
270    if (decompressed_headers_.empty()) {
271      DVLOG(1) << "Delegating procesing to ProcessData";
272      total_bytes_consumed += ProcessData(data, data_len);
273    }
274    return total_bytes_consumed;
275  }
276
277  QuicHeaderId current_header_id =
278      session_->decompressor()->current_header_id();
279  // Ensure that this header id looks sane.
280  if (headers_id_ < current_header_id ||
281      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
282    DVLOG(1) << "Invalid headers for stream: " << id()
283             << " header_id: " << headers_id_
284             << " current_header_id: " << current_header_id;
285    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
286    return total_bytes_consumed;
287  }
288
289  // If we are head-of-line blocked on decompression, then back up.
290  if (current_header_id != headers_id_) {
291    session_->MarkDecompressionBlocked(headers_id_, id());
292    DVLOG(1) << "Unable to decompress header data for stream: " << id()
293             << " header_id: " << headers_id_;
294    return total_bytes_consumed;
295  }
296
297  // Decompressed data will be delivered to decompressed_headers_.
298  size_t bytes_consumed = session_->decompressor()->DecompressData(
299      StringPiece(data, data_len), this);
300  total_bytes_consumed += bytes_consumed;
301
302  // Headers are complete if the decompressor has moved on to the
303  // next stream.
304  headers_decompressed_ =
305      session_->decompressor()->current_header_id() != headers_id_;
306
307  ProcessHeaderData();
308
309  // We have processed all of the decompressed data but we might
310  // have some more raw data to process.
311  if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
312    total_bytes_consumed += ProcessData(data + bytes_consumed,
313                                        data_len - bytes_consumed);
314  }
315
316  // The sequencer will push any additional buffered frames if this data
317  // has been completely consumed.
318  return total_bytes_consumed;
319}
320
321uint32 ReliableQuicStream::ProcessHeaderData() {
322  if (decompressed_headers_.empty()) {
323    return 0;
324  }
325
326  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
327                                       decompressed_headers_.length());
328  if (bytes_processed == decompressed_headers_.length()) {
329    decompressed_headers_.clear();
330  } else {
331    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
332  }
333  return bytes_processed;
334}
335
336void ReliableQuicStream::OnDecompressorAvailable() {
337  DCHECK_EQ(headers_id_,
338            session_->decompressor()->current_header_id());
339  DCHECK(!headers_decompressed_);
340  DCHECK_EQ(0u, decompressed_headers_.length());
341
342  size_t total_bytes_consumed = 0;
343  struct iovec iovecs[5];
344  while (!headers_decompressed_) {
345    size_t num_iovecs =
346        sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
347
348    if (num_iovecs == 0) {
349      return;
350    }
351    for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) {
352      total_bytes_consumed += session_->decompressor()->DecompressData(
353          StringPiece(static_cast<char*>(iovecs[i].iov_base),
354                      iovecs[i].iov_len), this);
355
356      headers_decompressed_ =
357          session_->decompressor()->current_header_id() != headers_id_;
358    }
359  }
360
361  // Either the headers are complete, or the all data as been consumed.
362  sequencer_.MarkConsumed(total_bytes_consumed);
363  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
364  if (IsHalfClosed()) {
365    TerminateFromPeer(true);
366  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
367    sequencer_.FlushBufferedFrames();
368  }
369}
370
371bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
372  data.AppendToString(&decompressed_headers_);
373  return true;
374}
375
376void ReliableQuicStream::OnDecompressionError() {
377  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
378}
379
380
381void ReliableQuicStream::CloseWriteSide() {
382  if (write_side_closed_) {
383    return;
384  }
385  DLOG(INFO) << "Done writing to stream " << id();
386
387  write_side_closed_ = true;
388  if (read_side_closed_) {
389    DLOG(INFO) << "Closing stream: " << id();
390    session_->CloseStream(id());
391  }
392}
393
394void ReliableQuicStream::OnClose() {
395  CloseReadSide();
396  CloseWriteSide();
397
398  if (visitor_) {
399    Visitor* visitor = visitor_;
400    // Calling Visitor::OnClose() may result the destruction of the visitor,
401    // so we need to ensure we don't call it again.
402    visitor_ = NULL;
403    visitor->OnClose(this);
404  }
405}
406
407}  // namespace net
408