reliable_quic_stream.cc revision 90dce4d38c5ff5333bea97d859d4e484e27edf0c
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9
10using base::StringPiece;
11using std::min;
12
13namespace net {
14
15ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16                                       QuicSession* session)
17    : sequencer_(this),
18      id_(id),
19      session_(session),
20      visitor_(NULL),
21      stream_bytes_read_(0),
22      stream_bytes_written_(0),
23      headers_decompressed_(false),
24      headers_id_(0),
25      stream_error_(QUIC_STREAM_NO_ERROR),
26      connection_error_(QUIC_NO_ERROR),
27      read_side_closed_(false),
28      write_side_closed_(false),
29      fin_buffered_(false),
30      fin_sent_(false) {
31}
32
33ReliableQuicStream::~ReliableQuicStream() {
34}
35
36bool ReliableQuicStream::WillAcceptStreamFrame(
37    const QuicStreamFrame& frame) const {
38  if (read_side_closed_) {
39    return true;
40  }
41  if (frame.stream_id != id_) {
42    LOG(ERROR) << "Error!";
43    return false;
44  }
45  return sequencer_.WillAcceptStreamFrame(frame);
46}
47
48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
49  DCHECK_EQ(frame.stream_id, id_);
50  if (read_side_closed_) {
51    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
52    // We don't want to be reading: blackhole the data.
53    return true;
54  }
55  // Note: This count include duplicate data received.
56  stream_bytes_read_ += frame.data.length();
57
58  bool accepted = sequencer_.OnStreamFrame(frame);
59
60  if (frame.fin) {
61    sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), true);
62  }
63
64  return accepted;
65}
66
67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
68  stream_error_ = error;
69  TerminateFromPeer(false);  // Full close.
70}
71
72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
73  if (IsClosed()) {
74    return;
75  }
76  if (error != QUIC_NO_ERROR) {
77    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
78    connection_error_ = error;
79  }
80
81  if (from_peer) {
82    TerminateFromPeer(false);
83  } else {
84    CloseWriteSide();
85    CloseReadSide();
86  }
87}
88
89void ReliableQuicStream::TerminateFromPeer(bool half_close) {
90  if (!half_close) {
91    CloseWriteSide();
92  }
93  CloseReadSide();
94}
95
96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
97  stream_error_ = error;
98  session()->SendRstStream(id(), error);
99}
100
101int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) {
102  if (headers_decompressed_ && decompressed_headers_.empty()) {
103    return sequencer_.Readv(iov, iov_len);
104  }
105  size_t bytes_consumed = 0;
106  int iov_index = 0;
107  while (iov_index < iov_len &&
108         decompressed_headers_.length() > bytes_consumed) {
109    int bytes_to_read = min(iov[iov_index].iov_len,
110                            decompressed_headers_.length() - bytes_consumed);
111    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
112    memcpy(iov_ptr,
113           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
114    bytes_consumed += bytes_to_read;
115    ++iov_index;
116  }
117  decompressed_headers_.erase(0, bytes_consumed);
118  return bytes_consumed;
119}
120
121int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) {
122  if (headers_decompressed_ && decompressed_headers_.empty()) {
123    return sequencer_.GetReadableRegions(iov, iov_len);
124  }
125  if (iov_len == 0) {
126    return 0;
127  }
128  iov[0].iov_base = static_cast<void*>(
129      const_cast<char*>(decompressed_headers_.data()));
130  iov[0].iov_len = decompressed_headers_.length();
131  return 1;
132}
133
134bool ReliableQuicStream::IsHalfClosed() const {
135  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
136    return false;
137  }
138  return sequencer_.IsHalfClosed();
139}
140
141bool ReliableQuicStream::IsClosed() const {
142  return write_side_closed_ && (read_side_closed_ || IsHalfClosed());
143}
144
145bool ReliableQuicStream::HasBytesToRead() const {
146  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
147}
148
149const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
150  return session_->peer_address();
151}
152
153QuicSpdyCompressor* ReliableQuicStream::compressor() {
154  return session_->compressor();
155}
156
157QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
158  return WriteOrBuffer(data, fin);
159}
160
161QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
162  DCHECK(!fin_buffered_);
163
164  QuicConsumedData consumed_data(0, false);
165  fin_buffered_ = fin;
166
167  if (queued_data_.empty()) {
168    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
169    DCHECK_LE(consumed_data.bytes_consumed, data.length());
170  }
171
172  // If there's unconsumed data or an unconsumed fin, queue it.
173  if (consumed_data.bytes_consumed < data.length() ||
174      (fin && !consumed_data.fin_consumed)) {
175    queued_data_.push_back(
176        string(data.data() + consumed_data.bytes_consumed,
177               data.length() - consumed_data.bytes_consumed));
178  }
179
180  return QuicConsumedData(data.size(), true);
181}
182
183void ReliableQuicStream::OnCanWrite() {
184  bool fin = false;
185  while (!queued_data_.empty()) {
186    const string& data = queued_data_.front();
187    if (queued_data_.size() == 1 && fin_buffered_) {
188      fin = true;
189    }
190    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
191    if (consumed_data.bytes_consumed == data.size() &&
192        fin == consumed_data.fin_consumed) {
193      queued_data_.pop_front();
194    } else {
195      queued_data_.front().erase(0, consumed_data.bytes_consumed);
196      break;
197    }
198  }
199}
200
201QuicConsumedData ReliableQuicStream::WriteDataInternal(
202    StringPiece data, bool fin) {
203  if (write_side_closed_) {
204    DLOG(ERROR) << "Attempt to write when the write side is closed";
205    return QuicConsumedData(0, false);
206  }
207
208  QuicConsumedData consumed_data =
209      session()->WriteData(id(), data, stream_bytes_written_, fin);
210  stream_bytes_written_ += consumed_data.bytes_consumed;
211  if (consumed_data.bytes_consumed == data.length()) {
212    if (fin && consumed_data.fin_consumed) {
213      fin_sent_ = true;
214      CloseWriteSide();
215    }
216  } else {
217    session_->MarkWriteBlocked(id());
218  }
219  return consumed_data;
220}
221
222void ReliableQuicStream::CloseReadSide() {
223  if (read_side_closed_) {
224    return;
225  }
226  DLOG(INFO) << "Done reading from stream " << id();
227
228  read_side_closed_ = true;
229  if (write_side_closed_) {
230    DLOG(INFO) << "Closing stream: " << id();
231    session_->CloseStream(id());
232  }
233}
234
235uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
236  if (id() == kCryptoStreamId) {
237    // The crypto stream does not use compression.
238    return ProcessData(data, data_len);
239  }
240  uint32 total_bytes_consumed = 0;
241  if (headers_id_ == 0u) {
242    // The headers ID has not yet been read.  Strip it from the beginning of
243    // the data stream.
244    DCHECK_GT(4u, headers_id_buffer_.length());
245    size_t missing_size = 4 - headers_id_buffer_.length();
246    if (data_len < missing_size) {
247      StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
248      return data_len;
249    }
250    total_bytes_consumed += missing_size;
251    StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
252    DCHECK_EQ(4u, headers_id_buffer_.length());
253    memcpy(&headers_id_, headers_id_buffer_.data(), 4);
254    headers_id_buffer_.clear();
255    data += missing_size;
256    data_len -= missing_size;
257  }
258  DCHECK_NE(0u, headers_id_);
259
260  // Once the headers are finished, we simply pass the data through.
261  if (headers_decompressed_) {
262    // Some buffered header data remains.
263    if (!decompressed_headers_.empty()) {
264      ProcessHeaderData();
265    }
266    if (decompressed_headers_.empty()) {
267      DVLOG(1) << "Delegating procesing to ProcessData";
268      total_bytes_consumed += ProcessData(data, data_len);
269    }
270    return total_bytes_consumed;
271  }
272
273  QuicHeaderId current_header_id =
274      session_->decompressor()->current_header_id();
275  // Ensure that this header id looks sane.
276  if (headers_id_ < current_header_id ||
277      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
278    DVLOG(1) << "Invalid headers for stream: " << id()
279             << " header_id: " << headers_id_
280             << " current_header_id: " << current_header_id;
281    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
282    return total_bytes_consumed;
283  }
284
285  // If we are head-of-line blocked on decompression, then back up.
286  if (current_header_id != headers_id_) {
287    session_->MarkDecompressionBlocked(headers_id_, id());
288    DVLOG(1) << "Unable to decompress header data for stream: " << id()
289             << " header_id: " << headers_id_;
290    return total_bytes_consumed;
291  }
292
293  // Decompressed data will be delivered to decompressed_headers_.
294  size_t bytes_consumed = session_->decompressor()->DecompressData(
295      StringPiece(data, data_len), this);
296  total_bytes_consumed += bytes_consumed;
297
298  // Headers are complete if the decompressor has moved on to the
299  // next stream.
300  headers_decompressed_ =
301      session_->decompressor()->current_header_id() != headers_id_;
302
303  ProcessHeaderData();
304
305  // We have processed all of the decompressed data but we might
306  // have some more raw data to process.
307  if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
308    total_bytes_consumed += ProcessData(data + bytes_consumed,
309                                        data_len - bytes_consumed);
310  }
311
312  // The sequencer will push any additional buffered frames if this data
313  // has been completely consumed.
314  return total_bytes_consumed;
315}
316
317uint32 ReliableQuicStream::ProcessHeaderData() {
318  if (decompressed_headers_.empty()) {
319    return 0;
320  }
321
322  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
323                                       decompressed_headers_.length());
324  if (bytes_processed == decompressed_headers_.length()) {
325    decompressed_headers_.clear();
326  } else {
327    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
328  }
329  return bytes_processed;
330}
331
332void ReliableQuicStream::OnDecompressorAvailable() {
333  DCHECK_EQ(headers_id_,
334            session_->decompressor()->current_header_id());
335  DCHECK(!headers_decompressed_);
336  DCHECK_EQ(0u, decompressed_headers_.length());
337
338  size_t total_bytes_consumed = 0;
339  struct iovec iovecs[5];
340  while (!headers_decompressed_) {
341    size_t num_iovecs =
342        sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
343
344    if (num_iovecs == 0) {
345      return;
346    }
347    for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) {
348      total_bytes_consumed += session_->decompressor()->DecompressData(
349          StringPiece(static_cast<char*>(iovecs[i].iov_base),
350                      iovecs[i].iov_len), this);
351
352      headers_decompressed_ =
353          session_->decompressor()->current_header_id() != headers_id_;
354    }
355  }
356
357  // Either the headers are complete, or the all data as been consumed.
358  sequencer_.MarkConsumed(total_bytes_consumed);
359
360  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
361
362  if (headers_decompressed_ && decompressed_headers_.empty()) {
363    sequencer_.FlushBufferedFrames();
364  }
365}
366
367bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
368  data.AppendToString(&decompressed_headers_);
369  return true;
370}
371
372void ReliableQuicStream::OnDecompressionError() {
373  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
374}
375
376
377void ReliableQuicStream::CloseWriteSide() {
378  if (write_side_closed_) {
379    return;
380  }
381  DLOG(INFO) << "Done writing to stream " << id();
382
383  write_side_closed_ = true;
384  if (read_side_closed_) {
385    DLOG(INFO) << "Closing stream: " << id();
386    session_->CloseStream(id());
387  }
388}
389
390void ReliableQuicStream::OnClose() {
391  CloseReadSide();
392  CloseWriteSide();
393
394  if (visitor_) {
395    Visitor* visitor = visitor_;
396    // Calling Visitor::OnClose() may result the destruction of the visitor,
397    // so we need to ensure we don't call it again.
398    visitor_ = NULL;
399    visitor->OnClose(this);
400  }
401}
402
403}  // namespace net
404