reliable_quic_stream.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "base/logging.h"
8#include "net/quic/quic_session.h"
9#include "net/quic/quic_spdy_decompressor.h"
10#include "net/quic/quic_write_blocked_list.h"
11
12using base::StringPiece;
13using std::min;
14
15namespace net {
16
17#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
18
19namespace {
20
21struct iovec MakeIovec(StringPiece data) {
22  struct iovec iov = {const_cast<char*>(data.data()),
23                      static_cast<size_t>(data.size())};
24  return iov;
25}
26
27}  // namespace
28
29ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
30                                       QuicSession* session)
31    : sequencer_(this),
32      id_(id),
33      session_(session),
34      stream_bytes_read_(0),
35      stream_bytes_written_(0),
36      stream_error_(QUIC_STREAM_NO_ERROR),
37      connection_error_(QUIC_NO_ERROR),
38      read_side_closed_(false),
39      write_side_closed_(false),
40      fin_buffered_(false),
41      fin_sent_(false),
42      rst_sent_(false),
43      is_server_(session_->is_server()) {
44}
45
46ReliableQuicStream::~ReliableQuicStream() {
47}
48
49bool ReliableQuicStream::WillAcceptStreamFrame(
50    const QuicStreamFrame& frame) const {
51  if (read_side_closed_) {
52    return true;
53  }
54  if (frame.stream_id != id_) {
55    LOG(ERROR) << "Error!";
56    return false;
57  }
58  return sequencer_.WillAcceptStreamFrame(frame);
59}
60
61bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
62  DCHECK_EQ(frame.stream_id, id_);
63  if (read_side_closed_) {
64    DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
65    // We don't want to be reading: blackhole the data.
66    return true;
67  }
68  // Note: This count include duplicate data received.
69  stream_bytes_read_ += frame.data.TotalBufferSize();
70
71  bool accepted = sequencer_.OnStreamFrame(frame);
72
73  return accepted;
74}
75
76void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
77  stream_error_ = frame.error_code;
78  CloseWriteSide();
79  CloseReadSide();
80}
81
82void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
83                                            bool from_peer) {
84  if (read_side_closed_ && write_side_closed_) {
85    return;
86  }
87  if (error != QUIC_NO_ERROR) {
88    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
89    connection_error_ = error;
90  }
91
92  CloseWriteSide();
93  CloseReadSide();
94}
95
96void ReliableQuicStream::OnFinRead() {
97  DCHECK(sequencer_.IsClosed());
98  CloseReadSide();
99}
100
101void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
102  DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
103  stream_error_ = error;
104  // Sending a RstStream results in calling CloseStream.
105  session()->SendRstStream(id(), error, stream_bytes_written_);
106  rst_sent_ = true;
107}
108
109void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
110  session()->connection()->SendConnectionClose(error);
111}
112
113void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
114                                                    const string& details) {
115  session()->connection()->SendConnectionCloseWithDetails(error, details);
116}
117
118QuicVersion ReliableQuicStream::version() {
119  return session()->connection()->version();
120}
121
122void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
123  if (data.empty() && !fin) {
124    LOG(DFATAL) << "data.empty() && !fin";
125    return;
126  }
127
128  if (fin_buffered_) {
129    LOG(DFATAL) << "Fin already buffered";
130    return;
131  }
132
133  QuicConsumedData consumed_data(0, false);
134  fin_buffered_ = fin;
135
136  if (queued_data_.empty()) {
137    struct iovec iov(MakeIovec(data));
138    consumed_data = WritevData(&iov, 1, fin, NULL);
139    DCHECK_LE(consumed_data.bytes_consumed, data.length());
140  }
141
142  // If there's unconsumed data or an unconsumed fin, queue it.
143  if (consumed_data.bytes_consumed < data.length() ||
144      (fin && !consumed_data.fin_consumed)) {
145    queued_data_.push_back(
146        string(data.data() + consumed_data.bytes_consumed,
147               data.length() - consumed_data.bytes_consumed));
148  }
149}
150
151void ReliableQuicStream::OnCanWrite() {
152  bool fin = false;
153  while (!queued_data_.empty()) {
154    const string& data = queued_data_.front();
155    if (queued_data_.size() == 1 && fin_buffered_) {
156      fin = true;
157    }
158    struct iovec iov(MakeIovec(data));
159    QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
160    if (consumed_data.bytes_consumed == data.size() &&
161        fin == consumed_data.fin_consumed) {
162      queued_data_.pop_front();
163    } else {
164      queued_data_.front().erase(0, consumed_data.bytes_consumed);
165      break;
166    }
167  }
168}
169
170QuicConsumedData ReliableQuicStream::WritevData(
171    const struct iovec* iov,
172    int iov_count,
173    bool fin,
174    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
175  if (write_side_closed_) {
176    DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
177    return QuicConsumedData(0, false);
178  }
179
180  size_t write_length = 0u;
181  for (int i = 0; i < iov_count; ++i) {
182    write_length += iov[i].iov_len;
183  }
184  QuicConsumedData consumed_data = session()->WritevData(
185      id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate);
186  stream_bytes_written_ += consumed_data.bytes_consumed;
187  if (consumed_data.bytes_consumed == write_length) {
188    if (fin && consumed_data.fin_consumed) {
189      fin_sent_ = true;
190      CloseWriteSide();
191    } else if (fin && !consumed_data.fin_consumed) {
192      session_->MarkWriteBlocked(id(), EffectivePriority());
193    }
194  } else {
195    session_->MarkWriteBlocked(id(), EffectivePriority());
196  }
197  return consumed_data;
198}
199
200void ReliableQuicStream::CloseReadSide() {
201  if (read_side_closed_) {
202    return;
203  }
204  DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
205
206  read_side_closed_ = true;
207  if (write_side_closed_) {
208    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
209    session_->CloseStream(id());
210  }
211}
212
213void ReliableQuicStream::CloseWriteSide() {
214  if (write_side_closed_) {
215    return;
216  }
217  DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
218
219  write_side_closed_ = true;
220  if (read_side_closed_) {
221    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
222    session_->CloseStream(id());
223  }
224}
225
226bool ReliableQuicStream::HasBufferedData() {
227  return !queued_data_.empty();
228}
229
230void ReliableQuicStream::OnClose() {
231  CloseReadSide();
232  CloseWriteSide();
233
234  if (version() > QUIC_VERSION_13 &&
235      !fin_sent_ && !rst_sent_) {
236    // For flow control accounting, we must tell the peer how many bytes we have
237    // written on this stream before termination. Done here if needed, using a
238    // RST frame.
239    DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
240    session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_);
241    rst_sent_ = true;
242  }
243}
244
245}  // namespace net
246