reliable_quic_stream.cc revision cedac228d2dd51db4b79ea1e72c7f249408ee061
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/reliable_quic_stream.h"
6
7#include "base/logging.h"
8#include "net/quic/iovector.h"
9#include "net/quic/quic_flow_controller.h"
10#include "net/quic/quic_session.h"
11#include "net/quic/quic_write_blocked_list.h"
12
13using base::StringPiece;
14using std::min;
15
16namespace net {
17
18#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
19
20namespace {
21
22struct iovec MakeIovec(StringPiece data) {
23  struct iovec iov = {const_cast<char*>(data.data()),
24                      static_cast<size_t>(data.size())};
25  return iov;
26}
27
28}  // namespace
29
30// Wrapper that aggregates OnAckNotifications for packets sent using
31// WriteOrBufferData and delivers them to the original
32// QuicAckNotifier::DelegateInterface after all bytes written using
33// WriteOrBufferData are acked.  This level of indirection is
34// necessary because the delegate interface provides no mechanism that
35// WriteOrBufferData can use to inform it that the write required
36// multiple WritevData calls or that only part of the data has been
37// sent out by the time ACKs start arriving.
38class ReliableQuicStream::ProxyAckNotifierDelegate
39    : public QuicAckNotifier::DelegateInterface {
40 public:
41  explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
42      : delegate_(delegate),
43        pending_acks_(0),
44        wrote_last_data_(false),
45        num_original_packets_(0),
46        num_original_bytes_(0),
47        num_retransmitted_packets_(0),
48        num_retransmitted_bytes_(0) {
49  }
50
51  virtual void OnAckNotification(int num_original_packets,
52                                 int num_original_bytes,
53                                 int num_retransmitted_packets,
54                                 int num_retransmitted_bytes,
55                                 QuicTime::Delta delta_largest_observed)
56      OVERRIDE {
57    DCHECK_LT(0, pending_acks_);
58    --pending_acks_;
59    num_original_packets_ += num_original_packets;
60    num_original_bytes_ += num_original_bytes;
61    num_retransmitted_packets_ += num_retransmitted_packets;
62    num_retransmitted_bytes_ += num_retransmitted_bytes;
63
64    if (wrote_last_data_ && pending_acks_ == 0) {
65      delegate_->OnAckNotification(num_original_packets_,
66                                   num_original_bytes_,
67                                   num_retransmitted_packets_,
68                                   num_retransmitted_bytes_,
69                                   delta_largest_observed);
70    }
71  }
72
73  void WroteData(bool last_data) {
74    DCHECK(!wrote_last_data_);
75    ++pending_acks_;
76    wrote_last_data_ = last_data;
77  }
78
79 protected:
80  // Delegates are ref counted.
81  virtual ~ProxyAckNotifierDelegate() {
82  }
83
84 private:
85  // Original delegate.  delegate_->OnAckNotification will be called when:
86  //   wrote_last_data_ == true and pending_acks_ == 0
87  scoped_refptr<DelegateInterface> delegate_;
88
89  // Number of outstanding acks.
90  int pending_acks_;
91
92  // True if no pending writes remain.
93  bool wrote_last_data_;
94
95  // Accumulators.
96  int num_original_packets_;
97  int num_original_bytes_;
98  int num_retransmitted_packets_;
99  int num_retransmitted_bytes_;
100
101  DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
102};
103
104ReliableQuicStream::PendingData::PendingData(
105    string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
106    : data(data_in), delegate(delegate_in) {
107}
108
109ReliableQuicStream::PendingData::~PendingData() {
110}
111
112ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
113    : sequencer_(this),
114      id_(id),
115      session_(session),
116      stream_bytes_read_(0),
117      stream_bytes_written_(0),
118      stream_error_(QUIC_STREAM_NO_ERROR),
119      connection_error_(QUIC_NO_ERROR),
120      read_side_closed_(false),
121      write_side_closed_(false),
122      fin_buffered_(false),
123      fin_sent_(false),
124      rst_sent_(false),
125      is_server_(session_->is_server()),
126      flow_controller_(
127          session_->connection()->version(),
128          id_,
129          is_server_,
130          session_->config()->HasReceivedInitialFlowControlWindowBytes() ?
131              session_->config()->ReceivedInitialFlowControlWindowBytes() :
132              kDefaultFlowControlSendWindow,
133          session_->max_flow_control_receive_window_bytes(),
134          session_->max_flow_control_receive_window_bytes()),
135      connection_flow_controller_(session_->flow_controller()) {
136}
137
138ReliableQuicStream::~ReliableQuicStream() {
139}
140
141bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
142  if (read_side_closed_) {
143    DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
144    // We don't want to be reading: blackhole the data.
145    return true;
146  }
147
148  if (frame.stream_id != id_) {
149    LOG(ERROR) << "Error!";
150    return false;
151  }
152
153  // This count include duplicate data received.
154  stream_bytes_read_ += frame.data.TotalBufferSize();
155
156  bool accepted = sequencer_.OnStreamFrame(frame);
157
158  if (flow_controller_.FlowControlViolation() ||
159      connection_flow_controller_->FlowControlViolation()) {
160    session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
161    return false;
162  }
163  MaybeSendWindowUpdate();
164
165  return accepted;
166}
167
168void ReliableQuicStream::MaybeSendWindowUpdate() {
169  flow_controller_.MaybeSendWindowUpdate(session()->connection());
170  connection_flow_controller_->MaybeSendWindowUpdate(session()->connection());
171}
172
173int ReliableQuicStream::num_frames_received() const {
174  return sequencer_.num_frames_received();
175}
176
177int ReliableQuicStream::num_duplicate_frames_received() const {
178  return sequencer_.num_duplicate_frames_received();
179}
180
181void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
182  stream_error_ = frame.error_code;
183  CloseWriteSide();
184  CloseReadSide();
185}
186
187void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
188                                            bool from_peer) {
189  if (read_side_closed_ && write_side_closed_) {
190    return;
191  }
192  if (error != QUIC_NO_ERROR) {
193    stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
194    connection_error_ = error;
195  }
196
197  CloseWriteSide();
198  CloseReadSide();
199}
200
201void ReliableQuicStream::OnFinRead() {
202  DCHECK(sequencer_.IsClosed());
203  CloseReadSide();
204}
205
206void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
207  DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
208  stream_error_ = error;
209  // Sending a RstStream results in calling CloseStream.
210  session()->SendRstStream(id(), error, stream_bytes_written_);
211  rst_sent_ = true;
212}
213
214void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
215  session()->connection()->SendConnectionClose(error);
216}
217
218void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
219                                                    const string& details) {
220  session()->connection()->SendConnectionCloseWithDetails(error, details);
221}
222
223QuicVersion ReliableQuicStream::version() const {
224  return session()->connection()->version();
225}
226
227void ReliableQuicStream::WriteOrBufferData(
228    StringPiece data,
229    bool fin,
230    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
231  if (data.empty() && !fin) {
232    LOG(DFATAL) << "data.empty() && !fin";
233    return;
234  }
235
236  if (fin_buffered_) {
237    LOG(DFATAL) << "Fin already buffered";
238    return;
239  }
240
241  scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
242  if (ack_notifier_delegate != NULL) {
243    proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
244  }
245
246  QuicConsumedData consumed_data(0, false);
247  fin_buffered_ = fin;
248
249  if (queued_data_.empty()) {
250    struct iovec iov(MakeIovec(data));
251    consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
252    DCHECK_LE(consumed_data.bytes_consumed, data.length());
253  }
254
255  bool write_completed;
256  // If there's unconsumed data or an unconsumed fin, queue it.
257  if (consumed_data.bytes_consumed < data.length() ||
258      (fin && !consumed_data.fin_consumed)) {
259    StringPiece remainder(data.substr(consumed_data.bytes_consumed));
260    queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
261    write_completed = false;
262  } else {
263    write_completed = true;
264  }
265
266  if ((proxy_delegate.get() != NULL) &&
267      (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
268    proxy_delegate->WroteData(write_completed);
269  }
270}
271
272void ReliableQuicStream::OnCanWrite() {
273  bool fin = false;
274  while (!queued_data_.empty()) {
275    PendingData* pending_data = &queued_data_.front();
276    ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
277    if (queued_data_.size() == 1 && fin_buffered_) {
278      fin = true;
279    }
280    struct iovec iov(MakeIovec(pending_data->data));
281    QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
282    if (consumed_data.bytes_consumed == pending_data->data.size() &&
283        fin == consumed_data.fin_consumed) {
284      queued_data_.pop_front();
285      if (delegate != NULL) {
286        delegate->WroteData(true);
287      }
288    } else {
289      if (consumed_data.bytes_consumed > 0) {
290        pending_data->data.erase(0, consumed_data.bytes_consumed);
291        if (delegate != NULL) {
292          delegate->WroteData(false);
293        }
294      }
295      break;
296    }
297  }
298}
299
300void ReliableQuicStream::MaybeSendBlocked() {
301  flow_controller_.MaybeSendBlocked(session()->connection());
302  connection_flow_controller_->MaybeSendBlocked(session()->connection());
303  // If we are connection level flow control blocked, then add the stream
304  // to the write blocked list. It will be given a chance to write when a
305  // connection level WINDOW_UPDATE arrives.
306  if (connection_flow_controller_->IsBlocked() &&
307      !flow_controller_.IsBlocked()) {
308    session_->MarkWriteBlocked(id(), EffectivePriority());
309  }
310}
311
312QuicConsumedData ReliableQuicStream::WritevData(
313    const struct iovec* iov,
314    int iov_count,
315    bool fin,
316    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
317  if (write_side_closed_) {
318    DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
319    return QuicConsumedData(0, false);
320  }
321
322  // How much data we want to write.
323  size_t write_length = TotalIovecLength(iov, iov_count);
324
325  // A FIN with zero data payload should not be flow control blocked.
326  bool fin_with_zero_data = (fin && write_length == 0);
327
328  if (flow_controller_.IsEnabled()) {
329    // How much data we are allowed to write from flow control.
330    uint64 send_window = flow_controller_.SendWindowSize();
331    if (connection_flow_controller_->IsEnabled()) {
332      send_window =
333          min(send_window, connection_flow_controller_->SendWindowSize());
334    }
335
336    if (send_window == 0 && !fin_with_zero_data) {
337      // Quick return if we can't send anything.
338      MaybeSendBlocked();
339      return QuicConsumedData(0, false);
340    }
341
342    if (write_length > send_window) {
343      // Don't send the FIN if we aren't going to send all the data.
344      fin = false;
345
346      // Writing more data would be a violation of flow control.
347      write_length = send_window;
348    }
349  }
350
351  // Fill an IOVector with bytes from the iovec.
352  IOVector data;
353  data.AppendIovecAtMostBytes(iov, iov_count, write_length);
354
355  QuicConsumedData consumed_data = session()->WritevData(
356      id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
357  stream_bytes_written_ += consumed_data.bytes_consumed;
358
359  AddBytesSent(consumed_data.bytes_consumed);
360
361  if (consumed_data.bytes_consumed == write_length) {
362    if (!fin_with_zero_data) {
363      MaybeSendBlocked();
364    }
365    if (fin && consumed_data.fin_consumed) {
366      fin_sent_ = true;
367      CloseWriteSide();
368    } else if (fin && !consumed_data.fin_consumed) {
369      session_->MarkWriteBlocked(id(), EffectivePriority());
370    }
371  } else {
372    session_->MarkWriteBlocked(id(), EffectivePriority());
373  }
374  return consumed_data;
375}
376
377void ReliableQuicStream::CloseReadSide() {
378  if (read_side_closed_) {
379    return;
380  }
381  DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
382
383  read_side_closed_ = true;
384  if (write_side_closed_) {
385    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
386    session_->CloseStream(id());
387  }
388}
389
390void ReliableQuicStream::CloseWriteSide() {
391  if (write_side_closed_) {
392    return;
393  }
394  DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
395
396  write_side_closed_ = true;
397  if (read_side_closed_) {
398    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
399    session_->CloseStream(id());
400  }
401}
402
403bool ReliableQuicStream::HasBufferedData() const {
404  return !queued_data_.empty();
405}
406
407void ReliableQuicStream::OnClose() {
408  CloseReadSide();
409  CloseWriteSide();
410
411  if (!fin_sent_ && !rst_sent_) {
412    // For flow control accounting, we must tell the peer how many bytes we have
413    // written on this stream before termination. Done here if needed, using a
414    // RST frame.
415    DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
416    session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
417                            stream_bytes_written_);
418    rst_sent_ = true;
419  }
420}
421
422void ReliableQuicStream::OnWindowUpdateFrame(
423    const QuicWindowUpdateFrame& frame) {
424  if (!flow_controller_.IsEnabled()) {
425    DLOG(DFATAL) << "Flow control not enabled! " << version();
426    return;
427  }
428
429  if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
430    // We can write again!
431    // TODO(rjshade): This does not respect priorities (e.g. multiple
432    //                outstanding POSTs are unblocked on arrival of
433    //                SHLO with initial window).
434    // As long as the connection is not flow control blocked, we can write!
435    OnCanWrite();
436  }
437}
438
439void ReliableQuicStream::AddBytesBuffered(uint64 bytes) {
440  if (flow_controller_.IsEnabled()) {
441    flow_controller_.AddBytesBuffered(bytes);
442    connection_flow_controller_->AddBytesBuffered(bytes);
443  }
444}
445
446void ReliableQuicStream::RemoveBytesBuffered(uint64 bytes) {
447  if (flow_controller_.IsEnabled()) {
448    flow_controller_.RemoveBytesBuffered(bytes);
449    connection_flow_controller_->RemoveBytesBuffered(bytes);
450  }
451}
452
453void ReliableQuicStream::AddBytesSent(uint64 bytes) {
454  if (flow_controller_.IsEnabled()) {
455    flow_controller_.AddBytesSent(bytes);
456    connection_flow_controller_->AddBytesSent(bytes);
457  }
458}
459
460void ReliableQuicStream::AddBytesConsumed(uint64 bytes) {
461  if (flow_controller_.IsEnabled()) {
462    flow_controller_.AddBytesConsumed(bytes);
463    connection_flow_controller_->AddBytesConsumed(bytes);
464  }
465}
466
467bool ReliableQuicStream::IsFlowControlBlocked() {
468  return flow_controller_.IsBlocked() ||
469         connection_flow_controller_->IsBlocked();
470}
471
472}  // namespace net
473