reliable_quic_stream.cc revision 0529e5d033099cbfc42635f6f6183833b09dff6e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "base/logging.h"
8#include "net/quic/iovector.h"
9#include "net/quic/quic_flow_controller.h"
10#include "net/quic/quic_session.h"
11#include "net/quic/quic_write_blocked_list.h"
12
13using base::StringPiece;
14using std::min;
15
16namespace net {
17
18#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
19
20namespace {
21
22struct iovec MakeIovec(StringPiece data) {
23  struct iovec iov = {const_cast<char*>(data.data()),
24                      static_cast<size_t>(data.size())};
25  return iov;
26}
27
28}  // namespace
29
30// Wrapper that aggregates OnAckNotifications for packets sent using
31// WriteOrBufferData and delivers them to the original
32// QuicAckNotifier::DelegateInterface after all bytes written using
33// WriteOrBufferData are acked.  This level of indirection is
34// necessary because the delegate interface provides no mechanism that
35// WriteOrBufferData can use to inform it that the write required
36// multiple WritevData calls or that only part of the data has been
37// sent out by the time ACKs start arriving.
38class ReliableQuicStream::ProxyAckNotifierDelegate
39    : public QuicAckNotifier::DelegateInterface {
40 public:
41  explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
42      : delegate_(delegate),
43        pending_acks_(0),
44        wrote_last_data_(false),
45        num_original_packets_(0),
46        num_original_bytes_(0),
47        num_retransmitted_packets_(0),
48        num_retransmitted_bytes_(0) {
49  }
50
51  virtual void OnAckNotification(int num_original_packets,
52                                 int num_original_bytes,
53                                 int num_retransmitted_packets,
54                                 int num_retransmitted_bytes,
55                                 QuicTime::Delta delta_largest_observed)
56      OVERRIDE {
57    DCHECK_LT(0, pending_acks_);
58    --pending_acks_;
59    num_original_packets_ += num_original_packets;
60    num_original_bytes_ += num_original_bytes;
61    num_retransmitted_packets_ += num_retransmitted_packets;
62    num_retransmitted_bytes_ += num_retransmitted_bytes;
63
64    if (wrote_last_data_ && pending_acks_ == 0) {
65      delegate_->OnAckNotification(num_original_packets_,
66                                   num_original_bytes_,
67                                   num_retransmitted_packets_,
68                                   num_retransmitted_bytes_,
69                                   delta_largest_observed);
70    }
71  }
72
73  void WroteData(bool last_data) {
74    DCHECK(!wrote_last_data_);
75    ++pending_acks_;
76    wrote_last_data_ = last_data;
77  }
78
79 protected:
80  // Delegates are ref counted.
81  virtual ~ProxyAckNotifierDelegate() {
82  }
83
84 private:
85  // Original delegate.  delegate_->OnAckNotification will be called when:
86  //   wrote_last_data_ == true and pending_acks_ == 0
87  scoped_refptr<DelegateInterface> delegate_;
88
89  // Number of outstanding acks.
90  int pending_acks_;
91
92  // True if no pending writes remain.
93  bool wrote_last_data_;
94
95  // Accumulators.
96  int num_original_packets_;
97  int num_original_bytes_;
98  int num_retransmitted_packets_;
99  int num_retransmitted_bytes_;
100
101  DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
102};
103
104ReliableQuicStream::PendingData::PendingData(
105    string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
106    : data(data_in), delegate(delegate_in) {
107}
108
109ReliableQuicStream::PendingData::~PendingData() {
110}
111
112ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
113    : sequencer_(this),
114      id_(id),
115      session_(session),
116      stream_bytes_read_(0),
117      stream_bytes_written_(0),
118      stream_error_(QUIC_STREAM_NO_ERROR),
119      connection_error_(QUIC_NO_ERROR),
120      read_side_closed_(false),
121      write_side_closed_(false),
122      fin_buffered_(false),
123      fin_sent_(false),
124      rst_sent_(false),
125      is_server_(session_->is_server()),
126      flow_controller_(
127          id_,
128          is_server_,
129          session_->config()->HasReceivedInitialFlowControlWindowBytes() ?
130              session_->config()->ReceivedInitialFlowControlWindowBytes() :
131              kDefaultFlowControlSendWindow,
132          session_->connection()->max_flow_control_receive_window_bytes(),
133          session_->connection()->max_flow_control_receive_window_bytes()) {
134  if (session_->connection()->version() < QUIC_VERSION_17) {
135    flow_controller_.Disable();
136  }
137}
138
139ReliableQuicStream::~ReliableQuicStream() {
140}
141
142bool ReliableQuicStream::WillAcceptStreamFrame(
143    const QuicStreamFrame& frame) const {
144  if (read_side_closed_) {
145    return true;
146  }
147  if (frame.stream_id != id_) {
148    LOG(ERROR) << "Error!";
149    return false;
150  }
151  return sequencer_.WillAcceptStreamFrame(frame);
152}
153
154bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
155  DCHECK_EQ(frame.stream_id, id_);
156  if (read_side_closed_) {
157    DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
158    // We don't want to be reading: blackhole the data.
159    return true;
160  }
161
162  // This count include duplicate data received.
163  stream_bytes_read_ += frame.data.TotalBufferSize();
164
165  bool accepted = sequencer_.OnStreamFrame(frame);
166
167  if (version() >= QUIC_VERSION_17) {
168    if (flow_controller_.FlowControlViolation()) {
169      session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
170      return false;
171    }
172    MaybeSendWindowUpdate();
173  }
174
175  return accepted;
176}
177
178void ReliableQuicStream::MaybeSendWindowUpdate() {
179  if (version() >= QUIC_VERSION_17) {
180    flow_controller_.MaybeSendWindowUpdate(session()->connection());
181  }
182}
183
184int ReliableQuicStream::num_frames_received() {
185  return sequencer_.num_frames_received();
186}
187
188int ReliableQuicStream::num_duplicate_frames_received() {
189  return sequencer_.num_duplicate_frames_received();
190}
191
192void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
193  stream_error_ = frame.error_code;
194  CloseWriteSide();
195  CloseReadSide();
196}
197
198void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
199                                            bool from_peer) {
200  if (read_side_closed_ && write_side_closed_) {
201    return;
202  }
203  if (error != QUIC_NO_ERROR) {
204    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
205    connection_error_ = error;
206  }
207
208  CloseWriteSide();
209  CloseReadSide();
210}
211
212void ReliableQuicStream::OnFinRead() {
213  DCHECK(sequencer_.IsClosed());
214  CloseReadSide();
215}
216
217void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
218  DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
219  stream_error_ = error;
220  // Sending a RstStream results in calling CloseStream.
221  session()->SendRstStream(id(), error, stream_bytes_written_);
222  rst_sent_ = true;
223}
224
225void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
226  session()->connection()->SendConnectionClose(error);
227}
228
229void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
230                                                    const string& details) {
231  session()->connection()->SendConnectionCloseWithDetails(error, details);
232}
233
234QuicVersion ReliableQuicStream::version() const {
235  return session()->connection()->version();
236}
237
238void ReliableQuicStream::WriteOrBufferData(
239    StringPiece data,
240    bool fin,
241    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
242  if (data.empty() && !fin) {
243    LOG(DFATAL) << "data.empty() && !fin";
244    return;
245  }
246
247  if (fin_buffered_) {
248    LOG(DFATAL) << "Fin already buffered";
249    return;
250  }
251
252  scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
253  if (ack_notifier_delegate != NULL) {
254    proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
255  }
256
257  QuicConsumedData consumed_data(0, false);
258  fin_buffered_ = fin;
259
260  if (queued_data_.empty()) {
261    struct iovec iov(MakeIovec(data));
262    consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
263    DCHECK_LE(consumed_data.bytes_consumed, data.length());
264  }
265
266  bool write_completed;
267  // If there's unconsumed data or an unconsumed fin, queue it.
268  if (consumed_data.bytes_consumed < data.length() ||
269      (fin && !consumed_data.fin_consumed)) {
270    StringPiece remainder(data.substr(consumed_data.bytes_consumed));
271    queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
272    write_completed = false;
273  } else {
274    write_completed = true;
275  }
276
277  if ((proxy_delegate.get() != NULL) &&
278      (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
279    proxy_delegate->WroteData(write_completed);
280  }
281}
282
283void ReliableQuicStream::OnCanWrite() {
284  bool fin = false;
285  while (!queued_data_.empty()) {
286    PendingData* pending_data = &queued_data_.front();
287    ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
288    if (queued_data_.size() == 1 && fin_buffered_) {
289      fin = true;
290    }
291    struct iovec iov(MakeIovec(pending_data->data));
292    QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
293    if (consumed_data.bytes_consumed == pending_data->data.size() &&
294        fin == consumed_data.fin_consumed) {
295      queued_data_.pop_front();
296      if (delegate != NULL) {
297        delegate->WroteData(true);
298      }
299    } else {
300      if (consumed_data.bytes_consumed > 0) {
301        pending_data->data.erase(0, consumed_data.bytes_consumed);
302        if (delegate != NULL) {
303          delegate->WroteData(false);
304        }
305      }
306      break;
307    }
308  }
309}
310
311QuicConsumedData ReliableQuicStream::WritevData(
312    const struct iovec* iov,
313    int iov_count,
314    bool fin,
315    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
316  if (write_side_closed_) {
317    DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
318    return QuicConsumedData(0, false);
319  }
320
321  // How much data we want to write.
322  size_t write_length = TotalIovecLength(iov, iov_count);
323
324  // How much data we are allowed to write from flow control.
325  size_t send_window = flow_controller_.SendWindowSize();
326
327  // A FIN with zero data payload should not be flow control blocked.
328  bool fin_with_zero_data = (fin && write_length == 0);
329
330  if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) {
331    if (send_window == 0 && !fin_with_zero_data) {
332      // Quick return if we can't send anything.
333      flow_controller_.MaybeSendBlocked(session()->connection());
334      return QuicConsumedData(0, false);
335    }
336
337    if (write_length > send_window) {
338      // Don't send the FIN if we aren't going to send all the data.
339      fin = false;
340
341      // Writing more data would be a violation of flow control.
342      write_length = send_window;
343    }
344  }
345
346  // Fill an IOVector with bytes from the iovec.
347  IOVector data;
348  data.AppendIovecAtMostBytes(iov, iov_count, write_length);
349
350  QuicConsumedData consumed_data = session()->WritevData(
351      id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
352  stream_bytes_written_ += consumed_data.bytes_consumed;
353
354  if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) {
355    flow_controller_.AddBytesSent(consumed_data.bytes_consumed);
356  }
357
358  if (consumed_data.bytes_consumed == write_length) {
359    if (!fin_with_zero_data) {
360      if (version() >= QUIC_VERSION_17) {
361        flow_controller_.MaybeSendBlocked(session()->connection());
362      }
363    }
364    if (fin && consumed_data.fin_consumed) {
365      fin_sent_ = true;
366      CloseWriteSide();
367    } else if (fin && !consumed_data.fin_consumed) {
368      session_->MarkWriteBlocked(id(), EffectivePriority());
369    }
370  } else {
371    session_->MarkWriteBlocked(id(), EffectivePriority());
372  }
373  return consumed_data;
374}
375
376void ReliableQuicStream::CloseReadSide() {
377  if (read_side_closed_) {
378    return;
379  }
380  DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
381
382  read_side_closed_ = true;
383  if (write_side_closed_) {
384    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
385    session_->CloseStream(id());
386  }
387}
388
389void ReliableQuicStream::CloseWriteSide() {
390  if (write_side_closed_) {
391    return;
392  }
393  DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
394
395  write_side_closed_ = true;
396  if (read_side_closed_) {
397    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
398    session_->CloseStream(id());
399  }
400}
401
402bool ReliableQuicStream::HasBufferedData() {
403  return !queued_data_.empty();
404}
405
406void ReliableQuicStream::OnClose() {
407  CloseReadSide();
408  CloseWriteSide();
409
410  if (version() > QUIC_VERSION_13 &&
411      !fin_sent_ && !rst_sent_) {
412    // For flow control accounting, we must tell the peer how many bytes we have
413    // written on this stream before termination. Done here if needed, using a
414    // RST frame.
415    DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
416    session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
417                            stream_bytes_written_);
418    rst_sent_ = true;
419  }
420}
421
422void ReliableQuicStream::OnWindowUpdateFrame(
423    const QuicWindowUpdateFrame& frame) {
424  if (!flow_controller_.IsEnabled()) {
425    DLOG(DFATAL) << "Flow control not enabled! " << version();
426    return;
427  }
428
429  if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
430    // We can write again!
431    // TODO(rjshade): This does not respect priorities (e.g. multiple
432    //                outstanding POSTs are unblocked on arrival of
433    //                SHLO with initial window).
434    OnCanWrite();
435  }
436}
437
438}  // namespace net
439