reliable_quic_stream.cc revision b2df76ea8fec9e32f6f3718986dba0d95315b29c
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9
10using base::StringPiece;
11using std::min;
12
13namespace net {
14
15ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16                                       QuicSession* session)
17    : sequencer_(this),
18      id_(id),
19      session_(session),
20      visitor_(NULL),
21      stream_bytes_read_(0),
22      stream_bytes_written_(0),
23      headers_decompressed_(false),
24      headers_id_(0),
25      stream_error_(QUIC_STREAM_NO_ERROR),
26      connection_error_(QUIC_NO_ERROR),
27      read_side_closed_(false),
28      write_side_closed_(false),
29      fin_buffered_(false),
30      fin_sent_(false) {
31}
32
33ReliableQuicStream::~ReliableQuicStream() {
34}
35
36bool ReliableQuicStream::WillAcceptStreamFrame(
37    const QuicStreamFrame& frame) const {
38  if (read_side_closed_) {
39    return true;
40  }
41  if (frame.stream_id != id_) {
42    LOG(ERROR) << "Error!";
43    return false;
44  }
45  return sequencer_.WillAcceptStreamFrame(frame);
46}
47
48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
49  DCHECK_EQ(frame.stream_id, id_);
50  if (read_side_closed_) {
51    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
52    // We don't want to be reading: blackhole the data.
53    return true;
54  }
55  // Note: This count include duplicate data received.
56  stream_bytes_read_ += frame.data.length();
57
58  bool accepted = sequencer_.OnStreamFrame(frame);
59
60  if (frame.fin) {
61    sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), true);
62  }
63
64  return accepted;
65}
66
67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
68  stream_error_ = error;
69  TerminateFromPeer(false);  // Full close.
70}
71
72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
73  if (IsClosed()) {
74    return;
75  }
76  if (error != QUIC_NO_ERROR) {
77    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
78    connection_error_ = error;
79  }
80
81  if (from_peer) {
82    TerminateFromPeer(false);
83  } else {
84    CloseWriteSide();
85    CloseReadSide();
86  }
87}
88
89void ReliableQuicStream::TerminateFromPeer(bool half_close) {
90  if (!half_close) {
91    CloseWriteSide();
92  }
93  CloseReadSide();
94}
95
96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
97  stream_error_ = error;
98  session()->SendRstStream(id(), error);
99}
100
101int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) {
102  if (headers_decompressed_ && decompressed_headers_.empty()) {
103    return sequencer_.Readv(iov, iov_len);
104  }
105  size_t bytes_consumed = 0;
106  int iov_index = 0;
107  while (iov_index < iov_len &&
108         decompressed_headers_.length() > bytes_consumed) {
109    int bytes_to_read = min(iov[iov_index].iov_len,
110                            decompressed_headers_.length() - bytes_consumed);
111    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
112    memcpy(iov_ptr,
113           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
114    bytes_consumed += bytes_to_read;
115    ++iov_index;
116  }
117  decompressed_headers_.erase(0, bytes_consumed);
118  return bytes_consumed;
119}
120
121int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) {
122  if (headers_decompressed_ && decompressed_headers_.empty()) {
123    return sequencer_.GetReadableRegions(iov, iov_len);
124  }
125  if (iov_len == 0) {
126    return 0;
127  }
128  iov[0].iov_base = static_cast<void*>(
129      const_cast<char*>(decompressed_headers_.data()));
130  iov[0].iov_len = decompressed_headers_.length();
131  return 1;
132}
133
134bool ReliableQuicStream::IsHalfClosed() const {
135  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
136    return false;
137  }
138  return sequencer_.IsHalfClosed();
139}
140
141bool ReliableQuicStream::IsClosed() const {
142  return write_side_closed_ && (read_side_closed_ || IsHalfClosed());
143}
144
145bool ReliableQuicStream::HasBytesToRead() const {
146  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
147}
148
149const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
150  return session_->peer_address();
151}
152
153QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
154  return WriteOrBuffer(data, fin);
155}
156
157QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
158  DCHECK(!fin_buffered_);
159
160  QuicConsumedData consumed_data(0, false);
161  fin_buffered_ = fin;
162
163  if (queued_data_.empty()) {
164    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
165    DCHECK_LE(consumed_data.bytes_consumed, data.length());
166  }
167
168  // If there's unconsumed data or an unconsumed fin, queue it.
169  if (consumed_data.bytes_consumed < data.length() ||
170      (fin && !consumed_data.fin_consumed)) {
171    queued_data_.push_back(
172        string(data.data() + consumed_data.bytes_consumed,
173               data.length() - consumed_data.bytes_consumed));
174  }
175
176  return QuicConsumedData(data.size(), true);
177}
178
179void ReliableQuicStream::OnCanWrite() {
180  bool fin = false;
181  while (!queued_data_.empty()) {
182    const string& data = queued_data_.front();
183    if (queued_data_.size() == 1 && fin_buffered_) {
184      fin = true;
185    }
186    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
187    if (consumed_data.bytes_consumed == data.size() &&
188        fin == consumed_data.fin_consumed) {
189      queued_data_.pop_front();
190    } else {
191      queued_data_.front().erase(0, consumed_data.bytes_consumed);
192      break;
193    }
194  }
195}
196
197QuicConsumedData ReliableQuicStream::WriteDataInternal(
198    StringPiece data, bool fin) {
199  if (write_side_closed_) {
200    DLOG(ERROR) << "Attempt to write when the write side is closed";
201    return QuicConsumedData(0, false);
202  }
203
204  QuicConsumedData consumed_data =
205      session()->WriteData(id(), data, stream_bytes_written_, fin);
206  stream_bytes_written_ += consumed_data.bytes_consumed;
207  if (consumed_data.bytes_consumed == data.length()) {
208    if (fin && consumed_data.fin_consumed) {
209      fin_sent_ = true;
210      CloseWriteSide();
211    }
212  } else {
213    session_->MarkWriteBlocked(id());
214  }
215  return consumed_data;
216}
217
218void ReliableQuicStream::CloseReadSide() {
219  if (read_side_closed_) {
220    return;
221  }
222  DLOG(INFO) << "Done reading from stream " << id();
223
224  read_side_closed_ = true;
225  if (write_side_closed_) {
226    DLOG(INFO) << "Closing stream: " << id();
227    session_->CloseStream(id());
228  }
229}
230
231uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
232  if (id() == kCryptoStreamId) {
233    // The crypto stream does not use compression.
234    return ProcessData(data, data_len);
235  }
236  uint32 total_bytes_consumed = 0;
237  if (headers_id_ == 0u) {
238    // The headers ID has not yet been read.  Strip it from the beginning of
239    // the data stream.
240    DCHECK_GT(4u, headers_id_buffer_.length());
241    size_t missing_size = 4 - headers_id_buffer_.length();
242    if (data_len < missing_size) {
243      StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
244      return data_len;
245    }
246    total_bytes_consumed += missing_size;
247    StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
248    DCHECK_EQ(4u, headers_id_buffer_.length());
249    memcpy(&headers_id_, headers_id_buffer_.data(), 4);
250    headers_id_buffer_.clear();
251    data += missing_size;
252    data_len -= missing_size;
253  }
254  DCHECK_NE(0u, headers_id_);
255
256  // Once the headers are finished, we simply pass the data through.
257  if (headers_decompressed_) {
258    // Some buffered header data remains.
259    if (!decompressed_headers_.empty()) {
260      ProcessHeaderData();
261    }
262    if (decompressed_headers_.empty()) {
263      DVLOG(1) << "Delegating procesing to ProcessData";
264      total_bytes_consumed += ProcessData(data, data_len);
265    }
266    return total_bytes_consumed;
267  }
268
269  QuicHeaderId current_header_id =
270      session_->decompressor()->current_header_id();
271  // Ensure that this header id looks sane.
272  if (headers_id_ < current_header_id ||
273      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
274    DVLOG(1) << "Invalid headers for stream: " << id()
275             << " header_id: " << headers_id_
276             << " current_header_id: " << current_header_id;
277    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
278    return total_bytes_consumed;
279  }
280
281  // If we are head-of-line blocked on decompression, then back up.
282  if (current_header_id != headers_id_) {
283    session_->MarkDecompressionBlocked(headers_id_, id());
284    DVLOG(1) << "Unable to decmpress header data for stream: " << id()
285               << " header_id: " << headers_id_;
286    return total_bytes_consumed;
287  }
288
289  // Decompressed data will be delivered to decompressed_headers_.
290  size_t bytes_consumed = session_->decompressor()->DecompressData(
291      StringPiece(data, data_len), this);
292  total_bytes_consumed += bytes_consumed;
293
294  // Headers are complete if the decompressor has moved on to the
295  // next stream.
296  headers_decompressed_ =
297      session_->decompressor()->current_header_id() != headers_id_;
298
299  ProcessHeaderData();
300
301  // We have processed all of the decompressed data but we might
302  // have some more raw data to process.
303  if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
304    total_bytes_consumed += ProcessData(data + bytes_consumed,
305                                        data_len - bytes_consumed);
306  }
307
308  // The sequencer will push any additional buffered frames if this data
309  // has been completely consumed.
310  return total_bytes_consumed;
311}
312
313uint32 ReliableQuicStream::ProcessHeaderData() {
314  if (decompressed_headers_.empty()) {
315    return 0;
316  }
317
318  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
319                                       decompressed_headers_.length());
320  if (bytes_processed == decompressed_headers_.length()) {
321    decompressed_headers_.clear();
322  } else {
323    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
324  }
325  return bytes_processed;
326}
327
328void ReliableQuicStream::OnDecompressorAvailable() {
329  DCHECK_EQ(headers_id_,
330            session_->decompressor()->current_header_id());
331  DCHECK(!headers_decompressed_);
332  DCHECK_EQ(0u, decompressed_headers_.length());
333
334  size_t total_bytes_consumed = 0;
335  struct iovec iovecs[5];
336  while (!headers_decompressed_) {
337    size_t num_iovecs =
338        sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
339
340    if (num_iovecs == 0) {
341      return;
342    }
343    for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) {
344      total_bytes_consumed += session_->decompressor()->DecompressData(
345          StringPiece(static_cast<char*>(iovecs[i].iov_base),
346                      iovecs[i].iov_len), this);
347
348      headers_decompressed_ =
349          session_->decompressor()->current_header_id() != headers_id_;
350    }
351  }
352
353  // Either the headers are complete, or the all data as been consumed.
354  sequencer_.MarkConsumed(total_bytes_consumed);
355
356  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
357
358  if (headers_decompressed_ && decompressed_headers_.empty()) {
359    sequencer_.FlushBufferedFrames();
360  }
361}
362
363bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
364  data.AppendToString(&decompressed_headers_);
365  return true;
366}
367
368void ReliableQuicStream::CloseWriteSide() {
369  if (write_side_closed_) {
370    return;
371  }
372  DLOG(INFO) << "Done writing to stream " << id();
373
374  write_side_closed_ = true;
375  if (read_side_closed_) {
376    DLOG(INFO) << "Closing stream: " << id();
377    session_->CloseStream(id());
378  }
379}
380
381void ReliableQuicStream::OnClose() {
382  CloseReadSide();
383  CloseWriteSide();
384
385  if (visitor_) {
386    Visitor* visitor = visitor_;
387    // Calling Visitor::OnClose() may result the destruction of the visitor,
388    // so we need to ensure we don't call it again.
389    visitor_ = NULL;
390    visitor->OnClose(this);
391  }
392}
393
394}  // namespace net
395