reliable_quic_stream.cc revision 5e3f23d412006dc4db4e659864679f29341e113f
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9
10using base::StringPiece;
11using std::min;
12
13namespace net {
14
15ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16                                       QuicSession* session)
17    : sequencer_(this),
18      id_(id),
19      session_(session),
20      visitor_(NULL),
21      stream_bytes_read_(0),
22      stream_bytes_written_(0),
23      headers_decompressed_(false),
24      headers_id_(0),
25      stream_error_(QUIC_STREAM_NO_ERROR),
26      connection_error_(QUIC_NO_ERROR),
27      read_side_closed_(false),
28      write_side_closed_(false),
29      fin_buffered_(false),
30      fin_sent_(false) {
31}
32
33ReliableQuicStream::~ReliableQuicStream() {
34}
35
36bool ReliableQuicStream::WillAcceptStreamFrame(
37    const QuicStreamFrame& frame) const {
38  if (read_side_closed_) {
39    return true;
40  }
41  if (frame.stream_id != id_) {
42    LOG(ERROR) << "Error!";
43    return false;
44  }
45  return sequencer_.WillAcceptStreamFrame(frame);
46}
47
48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
49  DCHECK_EQ(frame.stream_id, id_);
50  if (read_side_closed_) {
51    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
52    // We don't want to be reading: blackhole the data.
53    return true;
54  }
55  // Note: This count include duplicate data received.
56  stream_bytes_read_ += frame.data.length();
57
58  bool accepted = sequencer_.OnStreamFrame(frame);
59
60  if (frame.fin) {
61    sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), true);
62  }
63
64  return accepted;
65}
66
67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
68  stream_error_ = error;
69  TerminateFromPeer(false);  // Full close.
70}
71
72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
73  if (IsClosed()) {
74    return;
75  }
76  if (error != QUIC_NO_ERROR) {
77    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
78    connection_error_ = error;
79  }
80
81  if (from_peer) {
82    TerminateFromPeer(false);
83  } else {
84    CloseWriteSide();
85    CloseReadSide();
86  }
87}
88
89void ReliableQuicStream::TerminateFromPeer(bool half_close) {
90  if (!half_close) {
91    CloseWriteSide();
92  }
93  CloseReadSide();
94}
95
96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
97  stream_error_ = error;
98  if (error != QUIC_STREAM_NO_ERROR)  {
99    // Sending a RstStream results in calling CloseStream.
100    session()->SendRstStream(id(), error);
101  } else {
102    session_->CloseStream(id());
103  }
104}
105
106size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
107  if (headers_decompressed_ && decompressed_headers_.empty()) {
108    return sequencer_.Readv(iov, iov_len);
109  }
110  size_t bytes_consumed = 0;
111  size_t iov_index = 0;
112  while (iov_index < iov_len &&
113         decompressed_headers_.length() > bytes_consumed) {
114    size_t bytes_to_read = min(iov[iov_index].iov_len,
115                               decompressed_headers_.length() - bytes_consumed);
116    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
117    memcpy(iov_ptr,
118           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
119    bytes_consumed += bytes_to_read;
120    ++iov_index;
121  }
122  decompressed_headers_.erase(0, bytes_consumed);
123  return bytes_consumed;
124}
125
126int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
127  if (headers_decompressed_ && decompressed_headers_.empty()) {
128    return sequencer_.GetReadableRegions(iov, iov_len);
129  }
130  if (iov_len == 0) {
131    return 0;
132  }
133  iov[0].iov_base = static_cast<void*>(
134      const_cast<char*>(decompressed_headers_.data()));
135  iov[0].iov_len = decompressed_headers_.length();
136  return 1;
137}
138
139bool ReliableQuicStream::IsHalfClosed() const {
140  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
141    return false;
142  }
143  return sequencer_.IsHalfClosed();
144}
145
146bool ReliableQuicStream::IsClosed() const {
147  return write_side_closed_ && (read_side_closed_ || IsHalfClosed());
148}
149
150bool ReliableQuicStream::HasBytesToRead() const {
151  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
152}
153
154const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
155  return session_->peer_address();
156}
157
158QuicSpdyCompressor* ReliableQuicStream::compressor() {
159  return session_->compressor();
160}
161
162QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
163  DCHECK(data.size() > 0 || fin);
164  return WriteOrBuffer(data, fin);
165}
166
167QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
168  DCHECK(!fin_buffered_);
169
170  QuicConsumedData consumed_data(0, false);
171  fin_buffered_ = fin;
172
173  if (queued_data_.empty()) {
174    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
175    DCHECK_LE(consumed_data.bytes_consumed, data.length());
176  }
177
178  // If there's unconsumed data or an unconsumed fin, queue it.
179  if (consumed_data.bytes_consumed < data.length() ||
180      (fin && !consumed_data.fin_consumed)) {
181    queued_data_.push_back(
182        string(data.data() + consumed_data.bytes_consumed,
183               data.length() - consumed_data.bytes_consumed));
184  }
185
186  return QuicConsumedData(data.size(), true);
187}
188
189void ReliableQuicStream::OnCanWrite() {
190  bool fin = false;
191  while (!queued_data_.empty()) {
192    const string& data = queued_data_.front();
193    if (queued_data_.size() == 1 && fin_buffered_) {
194      fin = true;
195    }
196    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
197    if (consumed_data.bytes_consumed == data.size() &&
198        fin == consumed_data.fin_consumed) {
199      queued_data_.pop_front();
200    } else {
201      queued_data_.front().erase(0, consumed_data.bytes_consumed);
202      break;
203    }
204  }
205}
206
207QuicConsumedData ReliableQuicStream::WriteDataInternal(
208    StringPiece data, bool fin) {
209  if (write_side_closed_) {
210    DLOG(ERROR) << "Attempt to write when the write side is closed";
211    return QuicConsumedData(0, false);
212  }
213
214  QuicConsumedData consumed_data =
215      session()->WriteData(id(), data, stream_bytes_written_, fin);
216  stream_bytes_written_ += consumed_data.bytes_consumed;
217  if (consumed_data.bytes_consumed == data.length()) {
218    if (fin && consumed_data.fin_consumed) {
219      fin_sent_ = true;
220      CloseWriteSide();
221    } else if (fin && !consumed_data.fin_consumed) {
222      session_->MarkWriteBlocked(id());
223    }
224  } else {
225    session_->MarkWriteBlocked(id());
226  }
227  return consumed_data;
228}
229
230void ReliableQuicStream::CloseReadSide() {
231  if (read_side_closed_) {
232    return;
233  }
234  DLOG(INFO) << "Done reading from stream " << id();
235
236  read_side_closed_ = true;
237  if (write_side_closed_) {
238    DLOG(INFO) << "Closing stream: " << id();
239    session_->CloseStream(id());
240  }
241}
242
243uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
244  if (id() == kCryptoStreamId) {
245    // The crypto stream does not use compression.
246    return ProcessData(data, data_len);
247  }
248  uint32 total_bytes_consumed = 0;
249  if (headers_id_ == 0u) {
250    // The headers ID has not yet been read.  Strip it from the beginning of
251    // the data stream.
252    DCHECK_GT(4u, headers_id_buffer_.length());
253    size_t missing_size = 4 - headers_id_buffer_.length();
254    if (data_len < missing_size) {
255      StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
256      return data_len;
257    }
258    total_bytes_consumed += missing_size;
259    StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
260    DCHECK_EQ(4u, headers_id_buffer_.length());
261    memcpy(&headers_id_, headers_id_buffer_.data(), 4);
262    headers_id_buffer_.clear();
263    data += missing_size;
264    data_len -= missing_size;
265  }
266  DCHECK_NE(0u, headers_id_);
267
268  // Once the headers are finished, we simply pass the data through.
269  if (headers_decompressed_) {
270    // Some buffered header data remains.
271    if (!decompressed_headers_.empty()) {
272      ProcessHeaderData();
273    }
274    if (decompressed_headers_.empty()) {
275      DVLOG(1) << "Delegating procesing to ProcessData";
276      total_bytes_consumed += ProcessData(data, data_len);
277    }
278    return total_bytes_consumed;
279  }
280
281  QuicHeaderId current_header_id =
282      session_->decompressor()->current_header_id();
283  // Ensure that this header id looks sane.
284  if (headers_id_ < current_header_id ||
285      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
286    DVLOG(1) << "Invalid headers for stream: " << id()
287             << " header_id: " << headers_id_
288             << " current_header_id: " << current_header_id;
289    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
290    return total_bytes_consumed;
291  }
292
293  // If we are head-of-line blocked on decompression, then back up.
294  if (current_header_id != headers_id_) {
295    session_->MarkDecompressionBlocked(headers_id_, id());
296    DVLOG(1) << "Unable to decompress header data for stream: " << id()
297             << " header_id: " << headers_id_;
298    return total_bytes_consumed;
299  }
300
301  // Decompressed data will be delivered to decompressed_headers_.
302  size_t bytes_consumed = session_->decompressor()->DecompressData(
303      StringPiece(data, data_len), this);
304  total_bytes_consumed += bytes_consumed;
305
306  // Headers are complete if the decompressor has moved on to the
307  // next stream.
308  headers_decompressed_ =
309      session_->decompressor()->current_header_id() != headers_id_;
310
311  ProcessHeaderData();
312
313  // We have processed all of the decompressed data but we might
314  // have some more raw data to process.
315  if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
316    total_bytes_consumed += ProcessData(data + bytes_consumed,
317                                        data_len - bytes_consumed);
318  }
319
320  // The sequencer will push any additional buffered frames if this data
321  // has been completely consumed.
322  return total_bytes_consumed;
323}
324
325uint32 ReliableQuicStream::ProcessHeaderData() {
326  if (decompressed_headers_.empty()) {
327    return 0;
328  }
329
330  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
331                                       decompressed_headers_.length());
332  if (bytes_processed == decompressed_headers_.length()) {
333    decompressed_headers_.clear();
334  } else {
335    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
336  }
337  return bytes_processed;
338}
339
340void ReliableQuicStream::OnDecompressorAvailable() {
341  DCHECK_EQ(headers_id_,
342            session_->decompressor()->current_header_id());
343  DCHECK(!headers_decompressed_);
344  DCHECK_EQ(0u, decompressed_headers_.length());
345
346  size_t total_bytes_consumed = 0;
347  struct iovec iovecs[5];
348  while (!headers_decompressed_) {
349    size_t num_iovecs =
350        sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
351
352    if (num_iovecs == 0) {
353      return;
354    }
355    for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) {
356      total_bytes_consumed += session_->decompressor()->DecompressData(
357          StringPiece(static_cast<char*>(iovecs[i].iov_base),
358                      iovecs[i].iov_len), this);
359
360      headers_decompressed_ =
361          session_->decompressor()->current_header_id() != headers_id_;
362    }
363  }
364
365  // Either the headers are complete, or the all data as been consumed.
366  sequencer_.MarkConsumed(total_bytes_consumed);
367
368  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
369
370  if (headers_decompressed_ && decompressed_headers_.empty()) {
371    sequencer_.FlushBufferedFrames();
372  }
373}
374
375bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
376  data.AppendToString(&decompressed_headers_);
377  return true;
378}
379
380void ReliableQuicStream::OnDecompressionError() {
381  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
382}
383
384
385void ReliableQuicStream::CloseWriteSide() {
386  if (write_side_closed_) {
387    return;
388  }
389  DLOG(INFO) << "Done writing to stream " << id();
390
391  write_side_closed_ = true;
392  if (read_side_closed_) {
393    DLOG(INFO) << "Closing stream: " << id();
394    session_->CloseStream(id());
395  }
396}
397
398void ReliableQuicStream::OnClose() {
399  CloseReadSide();
400  CloseWriteSide();
401
402  if (visitor_) {
403    Visitor* visitor = visitor_;
404    // Calling Visitor::OnClose() may result the destruction of the visitor,
405    // so we need to ensure we don't call it again.
406    visitor_ = NULL;
407    visitor->OnClose(this);
408  }
409}
410
411}  // namespace net
412