reliable_quic_stream.cc revision 23730a6e56a168d1879203e4b3819bb36e3d8f1f
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "base/logging.h" 8#include "net/quic/quic_session.h" 9#include "net/quic/quic_write_blocked_list.h" 10 11using base::StringPiece; 12using std::min; 13 14namespace net { 15 16#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 17 18namespace { 19 20struct iovec MakeIovec(StringPiece data) { 21 struct iovec iov = {const_cast<char*>(data.data()), 22 static_cast<size_t>(data.size())}; 23 return iov; 24} 25 26} // namespace 27 28// Wrapper that aggregates OnAckNotifications for packets sent using 29// WriteOrBufferData and delivers them to the original 30// QuicAckNotifier::DelegateInterface after all bytes written using 31// WriteOrBufferData are acked. This level of indirection is 32// necessary because the delegate interface provides no mechanism that 33// WriteOrBufferData can use to inform it that the write required 34// multiple WritevData calls or that only part of the data has been 35// sent out by the time ACKs start arriving. 36class ReliableQuicStream::ProxyAckNotifierDelegate 37 : public QuicAckNotifier::DelegateInterface { 38 public: 39 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) 40 : delegate_(delegate), 41 pending_acks_(0), 42 wrote_last_data_(false), 43 num_original_packets_(0), 44 num_original_bytes_(0), 45 num_retransmitted_packets_(0), 46 num_retransmitted_bytes_(0) { 47 } 48 49 virtual void OnAckNotification(int num_original_packets, 50 int num_original_bytes, 51 int num_retransmitted_packets, 52 int num_retransmitted_bytes) OVERRIDE { 53 DCHECK_LT(0, pending_acks_); 54 --pending_acks_; 55 num_original_packets_ += num_original_packets; 56 num_original_bytes_ += num_original_bytes; 57 num_retransmitted_packets_ += num_retransmitted_packets; 58 num_retransmitted_bytes_ += num_retransmitted_bytes; 59 60 if (wrote_last_data_ && pending_acks_ == 0) { 61 delegate_->OnAckNotification(num_original_packets_, 62 num_original_bytes_, 63 num_retransmitted_packets_, 64 num_retransmitted_bytes_); 65 } 66 } 67 68 void WroteData(bool last_data) { 69 DCHECK(!wrote_last_data_); 70 ++pending_acks_; 71 wrote_last_data_ = last_data; 72 } 73 74 protected: 75 // Delegates are ref counted. 76 virtual ~ProxyAckNotifierDelegate() { 77 } 78 79 private: 80 // Original delegate. delegate_->OnAckNotification will be called when: 81 // wrote_last_data_ == true and pending_acks_ == 0 82 scoped_refptr<DelegateInterface> delegate_; 83 84 // Number of outstanding acks. 85 int pending_acks_; 86 87 // True if no pending writes remain. 88 bool wrote_last_data_; 89 90 // Accumulators. 91 int num_original_packets_; 92 int num_original_bytes_; 93 int num_retransmitted_packets_; 94 int num_retransmitted_bytes_; 95 96 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); 97}; 98 99ReliableQuicStream::PendingData::PendingData( 100 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) 101 : data(data_in), delegate(delegate_in) { 102} 103 104ReliableQuicStream::PendingData::~PendingData() { 105} 106 107ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 108 QuicSession* session) 109 : sequencer_(this), 110 id_(id), 111 session_(session), 112 stream_bytes_read_(0), 113 stream_bytes_written_(0), 114 stream_error_(QUIC_STREAM_NO_ERROR), 115 connection_error_(QUIC_NO_ERROR), 116 read_side_closed_(false), 117 write_side_closed_(false), 118 fin_buffered_(false), 119 fin_sent_(false), 120 rst_sent_(false), 121 is_server_(session_->is_server()) { 122} 123 124ReliableQuicStream::~ReliableQuicStream() { 125} 126 127bool ReliableQuicStream::WillAcceptStreamFrame( 128 const QuicStreamFrame& frame) const { 129 if (read_side_closed_) { 130 return true; 131 } 132 if (frame.stream_id != id_) { 133 LOG(ERROR) << "Error!"; 134 return false; 135 } 136 return sequencer_.WillAcceptStreamFrame(frame); 137} 138 139bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 140 DCHECK_EQ(frame.stream_id, id_); 141 if (read_side_closed_) { 142 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 143 // We don't want to be reading: blackhole the data. 144 return true; 145 } 146 // Note: This count include duplicate data received. 147 stream_bytes_read_ += frame.data.TotalBufferSize(); 148 149 bool accepted = sequencer_.OnStreamFrame(frame); 150 151 return accepted; 152} 153 154void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { 155 stream_error_ = frame.error_code; 156 CloseWriteSide(); 157 CloseReadSide(); 158} 159 160void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 161 bool from_peer) { 162 if (read_side_closed_ && write_side_closed_) { 163 return; 164 } 165 if (error != QUIC_NO_ERROR) { 166 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 167 connection_error_ = error; 168 } 169 170 CloseWriteSide(); 171 CloseReadSide(); 172} 173 174void ReliableQuicStream::OnFinRead() { 175 DCHECK(sequencer_.IsClosed()); 176 CloseReadSide(); 177} 178 179void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 180 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 181 stream_error_ = error; 182 // Sending a RstStream results in calling CloseStream. 183 session()->SendRstStream(id(), error, stream_bytes_written_); 184 rst_sent_ = true; 185} 186 187void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 188 session()->connection()->SendConnectionClose(error); 189} 190 191void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 192 const string& details) { 193 session()->connection()->SendConnectionCloseWithDetails(error, details); 194} 195 196QuicVersion ReliableQuicStream::version() const { 197 return session()->connection()->version(); 198} 199 200void ReliableQuicStream::WriteOrBufferData( 201 StringPiece data, 202 bool fin, 203 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 204 if (data.empty() && !fin) { 205 LOG(DFATAL) << "data.empty() && !fin"; 206 return; 207 } 208 209 if (fin_buffered_) { 210 LOG(DFATAL) << "Fin already buffered"; 211 return; 212 } 213 214 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; 215 if (ack_notifier_delegate != NULL) { 216 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); 217 } 218 219 QuicConsumedData consumed_data(0, false); 220 fin_buffered_ = fin; 221 222 if (queued_data_.empty()) { 223 struct iovec iov(MakeIovec(data)); 224 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); 225 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 226 } 227 228 bool write_completed; 229 // If there's unconsumed data or an unconsumed fin, queue it. 230 if (consumed_data.bytes_consumed < data.length() || 231 (fin && !consumed_data.fin_consumed)) { 232 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); 233 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); 234 write_completed = false; 235 } else { 236 write_completed = true; 237 } 238 239 if ((proxy_delegate.get() != NULL) && 240 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { 241 proxy_delegate->WroteData(write_completed); 242 } 243} 244 245void ReliableQuicStream::OnCanWrite() { 246 bool fin = false; 247 while (!queued_data_.empty()) { 248 PendingData* pending_data = &queued_data_.front(); 249 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); 250 if (queued_data_.size() == 1 && fin_buffered_) { 251 fin = true; 252 } 253 struct iovec iov(MakeIovec(pending_data->data)); 254 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); 255 if (consumed_data.bytes_consumed == pending_data->data.size() && 256 fin == consumed_data.fin_consumed) { 257 queued_data_.pop_front(); 258 if (delegate != NULL) { 259 delegate->WroteData(true); 260 } 261 } else { 262 if (consumed_data.bytes_consumed > 0) { 263 pending_data->data.erase(0, consumed_data.bytes_consumed); 264 if (delegate != NULL) { 265 delegate->WroteData(false); 266 } 267 } 268 break; 269 } 270 } 271} 272 273QuicConsumedData ReliableQuicStream::WritevData( 274 const struct iovec* iov, 275 int iov_count, 276 bool fin, 277 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 278 if (write_side_closed_) { 279 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 280 return QuicConsumedData(0, false); 281 } 282 283 size_t write_length = 0u; 284 for (int i = 0; i < iov_count; ++i) { 285 write_length += iov[i].iov_len; 286 // TODO(rjshade): Maybe block write based on available flow control window. 287 } 288 289 // Fill an IOVector with bytes from the iovec. 290 IOVector data; 291 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 292 293 QuicConsumedData consumed_data = session()->WritevData( 294 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); 295 stream_bytes_written_ += consumed_data.bytes_consumed; 296 if (consumed_data.bytes_consumed == write_length) { 297 if (fin && consumed_data.fin_consumed) { 298 fin_sent_ = true; 299 CloseWriteSide(); 300 } else if (fin && !consumed_data.fin_consumed) { 301 session_->MarkWriteBlocked(id(), EffectivePriority()); 302 } 303 } else { 304 session_->MarkWriteBlocked(id(), EffectivePriority()); 305 } 306 return consumed_data; 307} 308 309void ReliableQuicStream::CloseReadSide() { 310 if (read_side_closed_) { 311 return; 312 } 313 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 314 315 read_side_closed_ = true; 316 if (write_side_closed_) { 317 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 318 session_->CloseStream(id()); 319 } 320} 321 322void ReliableQuicStream::CloseWriteSide() { 323 if (write_side_closed_) { 324 return; 325 } 326 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 327 328 write_side_closed_ = true; 329 if (read_side_closed_) { 330 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 331 session_->CloseStream(id()); 332 } 333} 334 335bool ReliableQuicStream::HasBufferedData() { 336 return !queued_data_.empty(); 337} 338 339void ReliableQuicStream::OnClose() { 340 CloseReadSide(); 341 CloseWriteSide(); 342 343 if (version() > QUIC_VERSION_13 && 344 !fin_sent_ && !rst_sent_) { 345 // For flow control accounting, we must tell the peer how many bytes we have 346 // written on this stream before termination. Done here if needed, using a 347 // RST frame. 348 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 349 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); 350 rst_sent_ = true; 351 } 352} 353 354} // namespace net 355