reliable_quic_stream.cc revision 558790d6acca3451cf3a6b497803a5f07d0bec58
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "net/quic/quic_session.h"
8#include "net/quic/quic_spdy_decompressor.h"
9
10using base::StringPiece;
11using std::min;
12
13namespace net {
14
15ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16                                       QuicSession* session)
17    : sequencer_(this),
18      id_(id),
19      session_(session),
20      visitor_(NULL),
21      stream_bytes_read_(0),
22      stream_bytes_written_(0),
23      headers_decompressed_(false),
24      headers_id_(0),
25      decompression_failed_(false),
26      stream_error_(QUIC_STREAM_NO_ERROR),
27      connection_error_(QUIC_NO_ERROR),
28      read_side_closed_(false),
29      write_side_closed_(false),
30      fin_buffered_(false),
31      fin_sent_(false) {
32}
33
34ReliableQuicStream::~ReliableQuicStream() {
35}
36
37bool ReliableQuicStream::WillAcceptStreamFrame(
38    const QuicStreamFrame& frame) const {
39  if (read_side_closed_) {
40    return true;
41  }
42  if (frame.stream_id != id_) {
43    LOG(ERROR) << "Error!";
44    return false;
45  }
46  return sequencer_.WillAcceptStreamFrame(frame);
47}
48
49bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
50  DCHECK_EQ(frame.stream_id, id_);
51  if (read_side_closed_) {
52    DLOG(INFO) << "Ignoring frame " << frame.stream_id;
53    // We don't want to be reading: blackhole the data.
54    return true;
55  }
56  // Note: This count include duplicate data received.
57  stream_bytes_read_ += frame.data.length();
58
59  bool accepted = sequencer_.OnStreamFrame(frame);
60
61  return accepted;
62}
63
64void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
65  stream_error_ = error;
66  TerminateFromPeer(false);  // Full close.
67}
68
69void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
70  if (read_side_closed_ && write_side_closed_) {
71    return;
72  }
73  if (error != QUIC_NO_ERROR) {
74    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
75    connection_error_ = error;
76  }
77
78  if (from_peer) {
79    TerminateFromPeer(false);
80  } else {
81    CloseWriteSide();
82    CloseReadSide();
83  }
84}
85
86void ReliableQuicStream::TerminateFromPeer(bool half_close) {
87  if (!half_close) {
88    CloseWriteSide();
89  }
90  CloseReadSide();
91}
92
93void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
94  stream_error_ = error;
95  if (error != QUIC_STREAM_NO_ERROR)  {
96    // Sending a RstStream results in calling CloseStream.
97    session()->SendRstStream(id(), error);
98  } else {
99    session_->CloseStream(id());
100  }
101}
102
103size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
104  if (headers_decompressed_ && decompressed_headers_.empty()) {
105    return sequencer_.Readv(iov, iov_len);
106  }
107  size_t bytes_consumed = 0;
108  size_t iov_index = 0;
109  while (iov_index < iov_len &&
110         decompressed_headers_.length() > bytes_consumed) {
111    size_t bytes_to_read = min(iov[iov_index].iov_len,
112                               decompressed_headers_.length() - bytes_consumed);
113    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
114    memcpy(iov_ptr,
115           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
116    bytes_consumed += bytes_to_read;
117    ++iov_index;
118  }
119  decompressed_headers_.erase(0, bytes_consumed);
120  return bytes_consumed;
121}
122
123int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
124  if (headers_decompressed_ && decompressed_headers_.empty()) {
125    return sequencer_.GetReadableRegions(iov, iov_len);
126  }
127  if (iov_len == 0) {
128    return 0;
129  }
130  iov[0].iov_base = static_cast<void*>(
131      const_cast<char*>(decompressed_headers_.data()));
132  iov[0].iov_len = decompressed_headers_.length();
133  return 1;
134}
135
136bool ReliableQuicStream::IsHalfClosed() const {
137  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
138    return false;
139  }
140  return sequencer_.IsHalfClosed();
141}
142
143bool ReliableQuicStream::HasBytesToRead() const {
144  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
145}
146
147const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
148  return session_->peer_address();
149}
150
151QuicSpdyCompressor* ReliableQuicStream::compressor() {
152  return session_->compressor();
153}
154
155bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
156  return session_->GetSSLInfo(ssl_info);
157}
158
159QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
160  DCHECK(data.size() > 0 || fin);
161  return WriteOrBuffer(data, fin);
162}
163
164QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
165  DCHECK(!fin_buffered_);
166
167  QuicConsumedData consumed_data(0, false);
168  fin_buffered_ = fin;
169
170  if (queued_data_.empty()) {
171    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
172    DCHECK_LE(consumed_data.bytes_consumed, data.length());
173  }
174
175  // If there's unconsumed data or an unconsumed fin, queue it.
176  if (consumed_data.bytes_consumed < data.length() ||
177      (fin && !consumed_data.fin_consumed)) {
178    queued_data_.push_back(
179        string(data.data() + consumed_data.bytes_consumed,
180               data.length() - consumed_data.bytes_consumed));
181  }
182
183  return QuicConsumedData(data.size(), true);
184}
185
186void ReliableQuicStream::OnCanWrite() {
187  bool fin = false;
188  while (!queued_data_.empty()) {
189    const string& data = queued_data_.front();
190    if (queued_data_.size() == 1 && fin_buffered_) {
191      fin = true;
192    }
193    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
194    if (consumed_data.bytes_consumed == data.size() &&
195        fin == consumed_data.fin_consumed) {
196      queued_data_.pop_front();
197    } else {
198      queued_data_.front().erase(0, consumed_data.bytes_consumed);
199      break;
200    }
201  }
202}
203
204QuicConsumedData ReliableQuicStream::WriteDataInternal(
205    StringPiece data, bool fin) {
206  if (write_side_closed_) {
207    DLOG(ERROR) << "Attempt to write when the write side is closed";
208    return QuicConsumedData(0, false);
209  }
210
211  QuicConsumedData consumed_data =
212      session()->WriteData(id(), data, stream_bytes_written_, fin);
213  stream_bytes_written_ += consumed_data.bytes_consumed;
214  if (consumed_data.bytes_consumed == data.length()) {
215    if (fin && consumed_data.fin_consumed) {
216      fin_sent_ = true;
217      CloseWriteSide();
218    } else if (fin && !consumed_data.fin_consumed) {
219      session_->MarkWriteBlocked(id());
220    }
221  } else {
222    session_->MarkWriteBlocked(id());
223  }
224  return consumed_data;
225}
226
227void ReliableQuicStream::CloseReadSide() {
228  if (read_side_closed_) {
229    return;
230  }
231  DLOG(INFO) << "Done reading from stream " << id();
232
233  read_side_closed_ = true;
234  if (write_side_closed_) {
235    DLOG(INFO) << "Closing stream: " << id();
236    session_->CloseStream(id());
237  }
238}
239
240uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
241  if (id() == kCryptoStreamId) {
242    if (data_len == 0) {
243      return 0;
244    }
245    // The crypto stream does not use compression.
246    return ProcessData(data, data_len);
247  }
248  uint32 total_bytes_consumed = 0;
249  if (headers_id_ == 0u) {
250    // The headers ID has not yet been read.  Strip it from the beginning of
251    // the data stream.
252    DCHECK_GT(4u, headers_id_buffer_.length());
253    size_t missing_size = 4 - headers_id_buffer_.length();
254    if (data_len < missing_size) {
255      StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
256      return data_len;
257    }
258    total_bytes_consumed += missing_size;
259    StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
260    DCHECK_EQ(4u, headers_id_buffer_.length());
261    memcpy(&headers_id_, headers_id_buffer_.data(), 4);
262    headers_id_buffer_.clear();
263    data += missing_size;
264    data_len -= missing_size;
265  }
266  DCHECK_NE(0u, headers_id_);
267  if (data_len == 0) {
268    return total_bytes_consumed;
269  }
270
271  // Once the headers are finished, we simply pass the data through.
272  if (headers_decompressed_) {
273    // Some buffered header data remains.
274    if (!decompressed_headers_.empty()) {
275      ProcessHeaderData();
276    }
277    if (decompressed_headers_.empty()) {
278      DVLOG(1) << "Delegating procesing to ProcessData";
279      total_bytes_consumed += ProcessData(data, data_len);
280    }
281    return total_bytes_consumed;
282  }
283
284  QuicHeaderId current_header_id =
285      session_->decompressor()->current_header_id();
286  // Ensure that this header id looks sane.
287  if (headers_id_ < current_header_id ||
288      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
289    DVLOG(1) << "Invalid headers for stream: " << id()
290             << " header_id: " << headers_id_
291             << " current_header_id: " << current_header_id;
292    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
293    return total_bytes_consumed;
294  }
295
296  // If we are head-of-line blocked on decompression, then back up.
297  if (current_header_id != headers_id_) {
298    session_->MarkDecompressionBlocked(headers_id_, id());
299    DVLOG(1) << "Unable to decompress header data for stream: " << id()
300             << " header_id: " << headers_id_;
301    return total_bytes_consumed;
302  }
303
304  // Decompressed data will be delivered to decompressed_headers_.
305  size_t bytes_consumed = session_->decompressor()->DecompressData(
306      StringPiece(data, data_len), this);
307  DCHECK_NE(0u, bytes_consumed);
308  if (bytes_consumed > data_len) {
309    DCHECK(false) << "DecompressData returned illegal value";
310    OnDecompressionError();
311    return total_bytes_consumed;
312  }
313  total_bytes_consumed += bytes_consumed;
314  data += bytes_consumed;
315  data_len -= bytes_consumed;
316
317  if (decompression_failed_) {
318    // The session will have been closed in OnDecompressionError.
319    return total_bytes_consumed;
320  }
321
322  // Headers are complete if the decompressor has moved on to the
323  // next stream.
324  headers_decompressed_ =
325      session_->decompressor()->current_header_id() != headers_id_;
326  if (!headers_decompressed_) {
327    DCHECK_EQ(0u, data_len);
328  }
329
330  ProcessHeaderData();
331
332  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
333    return total_bytes_consumed;
334  }
335
336  // We have processed all of the decompressed data but we might
337  // have some more raw data to process.
338  if (data_len > 0) {
339    total_bytes_consumed += ProcessData(data, data_len);
340  }
341
342  // The sequencer will push any additional buffered frames if this data
343  // has been completely consumed.
344  return total_bytes_consumed;
345}
346
347uint32 ReliableQuicStream::ProcessHeaderData() {
348  if (decompressed_headers_.empty()) {
349    return 0;
350  }
351
352  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
353                                       decompressed_headers_.length());
354  if (bytes_processed == decompressed_headers_.length()) {
355    decompressed_headers_.clear();
356  } else {
357    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
358  }
359  return bytes_processed;
360}
361
362void ReliableQuicStream::OnDecompressorAvailable() {
363  DCHECK_EQ(headers_id_,
364            session_->decompressor()->current_header_id());
365  DCHECK(!headers_decompressed_);
366  DCHECK(!decompression_failed_);
367  DCHECK_EQ(0u, decompressed_headers_.length());
368
369  while (!headers_decompressed_) {
370    struct iovec iovec;
371    if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
372      return;
373    }
374
375    size_t bytes_consumed = session_->decompressor()->DecompressData(
376        StringPiece(static_cast<char*>(iovec.iov_base),
377                    iovec.iov_len),
378        this);
379    DCHECK_LE(bytes_consumed, iovec.iov_len);
380    if (decompression_failed_) {
381      return;
382    }
383    sequencer_.MarkConsumed(bytes_consumed);
384
385    headers_decompressed_ =
386        session_->decompressor()->current_header_id() != headers_id_;
387  }
388
389  // Either the headers are complete, or the all data as been consumed.
390  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
391  if (IsHalfClosed()) {
392    TerminateFromPeer(true);
393  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
394    sequencer_.FlushBufferedFrames();
395  }
396}
397
398bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
399  data.AppendToString(&decompressed_headers_);
400  return true;
401}
402
403void ReliableQuicStream::OnDecompressionError() {
404  DCHECK(!decompression_failed_);
405  decompression_failed_ = true;
406  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
407}
408
409
410void ReliableQuicStream::CloseWriteSide() {
411  if (write_side_closed_) {
412    return;
413  }
414  DLOG(INFO) << "Done writing to stream " << id();
415
416  write_side_closed_ = true;
417  if (read_side_closed_) {
418    DLOG(INFO) << "Closing stream: " << id();
419    session_->CloseStream(id());
420  }
421}
422
423void ReliableQuicStream::OnClose() {
424  CloseReadSide();
425  CloseWriteSide();
426
427  if (visitor_) {
428    Visitor* visitor = visitor_;
429    // Calling Visitor::OnClose() may result the destruction of the visitor,
430    // so we need to ensure we don't call it again.
431    visitor_ = NULL;
432    visitor->OnClose(this);
433  }
434}
435
436}  // namespace net
437