reliable_quic_stream.cc revision 0529e5d033099cbfc42635f6f6183833b09dff6e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "base/logging.h" 8#include "net/quic/iovector.h" 9#include "net/quic/quic_flow_controller.h" 10#include "net/quic/quic_session.h" 11#include "net/quic/quic_write_blocked_list.h" 12 13using base::StringPiece; 14using std::min; 15 16namespace net { 17 18#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 19 20namespace { 21 22struct iovec MakeIovec(StringPiece data) { 23 struct iovec iov = {const_cast<char*>(data.data()), 24 static_cast<size_t>(data.size())}; 25 return iov; 26} 27 28} // namespace 29 30// Wrapper that aggregates OnAckNotifications for packets sent using 31// WriteOrBufferData and delivers them to the original 32// QuicAckNotifier::DelegateInterface after all bytes written using 33// WriteOrBufferData are acked. This level of indirection is 34// necessary because the delegate interface provides no mechanism that 35// WriteOrBufferData can use to inform it that the write required 36// multiple WritevData calls or that only part of the data has been 37// sent out by the time ACKs start arriving. 38class ReliableQuicStream::ProxyAckNotifierDelegate 39 : public QuicAckNotifier::DelegateInterface { 40 public: 41 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) 42 : delegate_(delegate), 43 pending_acks_(0), 44 wrote_last_data_(false), 45 num_original_packets_(0), 46 num_original_bytes_(0), 47 num_retransmitted_packets_(0), 48 num_retransmitted_bytes_(0) { 49 } 50 51 virtual void OnAckNotification(int num_original_packets, 52 int num_original_bytes, 53 int num_retransmitted_packets, 54 int num_retransmitted_bytes, 55 QuicTime::Delta delta_largest_observed) 56 OVERRIDE { 57 DCHECK_LT(0, pending_acks_); 58 --pending_acks_; 59 num_original_packets_ += num_original_packets; 60 num_original_bytes_ += num_original_bytes; 61 num_retransmitted_packets_ += num_retransmitted_packets; 62 num_retransmitted_bytes_ += num_retransmitted_bytes; 63 64 if (wrote_last_data_ && pending_acks_ == 0) { 65 delegate_->OnAckNotification(num_original_packets_, 66 num_original_bytes_, 67 num_retransmitted_packets_, 68 num_retransmitted_bytes_, 69 delta_largest_observed); 70 } 71 } 72 73 void WroteData(bool last_data) { 74 DCHECK(!wrote_last_data_); 75 ++pending_acks_; 76 wrote_last_data_ = last_data; 77 } 78 79 protected: 80 // Delegates are ref counted. 81 virtual ~ProxyAckNotifierDelegate() { 82 } 83 84 private: 85 // Original delegate. delegate_->OnAckNotification will be called when: 86 // wrote_last_data_ == true and pending_acks_ == 0 87 scoped_refptr<DelegateInterface> delegate_; 88 89 // Number of outstanding acks. 90 int pending_acks_; 91 92 // True if no pending writes remain. 93 bool wrote_last_data_; 94 95 // Accumulators. 96 int num_original_packets_; 97 int num_original_bytes_; 98 int num_retransmitted_packets_; 99 int num_retransmitted_bytes_; 100 101 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); 102}; 103 104ReliableQuicStream::PendingData::PendingData( 105 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) 106 : data(data_in), delegate(delegate_in) { 107} 108 109ReliableQuicStream::PendingData::~PendingData() { 110} 111 112ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) 113 : sequencer_(this), 114 id_(id), 115 session_(session), 116 stream_bytes_read_(0), 117 stream_bytes_written_(0), 118 stream_error_(QUIC_STREAM_NO_ERROR), 119 connection_error_(QUIC_NO_ERROR), 120 read_side_closed_(false), 121 write_side_closed_(false), 122 fin_buffered_(false), 123 fin_sent_(false), 124 rst_sent_(false), 125 is_server_(session_->is_server()), 126 flow_controller_( 127 id_, 128 is_server_, 129 session_->config()->HasReceivedInitialFlowControlWindowBytes() ? 130 session_->config()->ReceivedInitialFlowControlWindowBytes() : 131 kDefaultFlowControlSendWindow, 132 session_->connection()->max_flow_control_receive_window_bytes(), 133 session_->connection()->max_flow_control_receive_window_bytes()) { 134 if (session_->connection()->version() < QUIC_VERSION_17) { 135 flow_controller_.Disable(); 136 } 137} 138 139ReliableQuicStream::~ReliableQuicStream() { 140} 141 142bool ReliableQuicStream::WillAcceptStreamFrame( 143 const QuicStreamFrame& frame) const { 144 if (read_side_closed_) { 145 return true; 146 } 147 if (frame.stream_id != id_) { 148 LOG(ERROR) << "Error!"; 149 return false; 150 } 151 return sequencer_.WillAcceptStreamFrame(frame); 152} 153 154bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 155 DCHECK_EQ(frame.stream_id, id_); 156 if (read_side_closed_) { 157 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 158 // We don't want to be reading: blackhole the data. 159 return true; 160 } 161 162 // This count include duplicate data received. 163 stream_bytes_read_ += frame.data.TotalBufferSize(); 164 165 bool accepted = sequencer_.OnStreamFrame(frame); 166 167 if (version() >= QUIC_VERSION_17) { 168 if (flow_controller_.FlowControlViolation()) { 169 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); 170 return false; 171 } 172 MaybeSendWindowUpdate(); 173 } 174 175 return accepted; 176} 177 178void ReliableQuicStream::MaybeSendWindowUpdate() { 179 if (version() >= QUIC_VERSION_17) { 180 flow_controller_.MaybeSendWindowUpdate(session()->connection()); 181 } 182} 183 184int ReliableQuicStream::num_frames_received() { 185 return sequencer_.num_frames_received(); 186} 187 188int ReliableQuicStream::num_duplicate_frames_received() { 189 return sequencer_.num_duplicate_frames_received(); 190} 191 192void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { 193 stream_error_ = frame.error_code; 194 CloseWriteSide(); 195 CloseReadSide(); 196} 197 198void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 199 bool from_peer) { 200 if (read_side_closed_ && write_side_closed_) { 201 return; 202 } 203 if (error != QUIC_NO_ERROR) { 204 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 205 connection_error_ = error; 206 } 207 208 CloseWriteSide(); 209 CloseReadSide(); 210} 211 212void ReliableQuicStream::OnFinRead() { 213 DCHECK(sequencer_.IsClosed()); 214 CloseReadSide(); 215} 216 217void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 218 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 219 stream_error_ = error; 220 // Sending a RstStream results in calling CloseStream. 221 session()->SendRstStream(id(), error, stream_bytes_written_); 222 rst_sent_ = true; 223} 224 225void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 226 session()->connection()->SendConnectionClose(error); 227} 228 229void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 230 const string& details) { 231 session()->connection()->SendConnectionCloseWithDetails(error, details); 232} 233 234QuicVersion ReliableQuicStream::version() const { 235 return session()->connection()->version(); 236} 237 238void ReliableQuicStream::WriteOrBufferData( 239 StringPiece data, 240 bool fin, 241 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 242 if (data.empty() && !fin) { 243 LOG(DFATAL) << "data.empty() && !fin"; 244 return; 245 } 246 247 if (fin_buffered_) { 248 LOG(DFATAL) << "Fin already buffered"; 249 return; 250 } 251 252 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; 253 if (ack_notifier_delegate != NULL) { 254 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); 255 } 256 257 QuicConsumedData consumed_data(0, false); 258 fin_buffered_ = fin; 259 260 if (queued_data_.empty()) { 261 struct iovec iov(MakeIovec(data)); 262 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); 263 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 264 } 265 266 bool write_completed; 267 // If there's unconsumed data or an unconsumed fin, queue it. 268 if (consumed_data.bytes_consumed < data.length() || 269 (fin && !consumed_data.fin_consumed)) { 270 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); 271 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); 272 write_completed = false; 273 } else { 274 write_completed = true; 275 } 276 277 if ((proxy_delegate.get() != NULL) && 278 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { 279 proxy_delegate->WroteData(write_completed); 280 } 281} 282 283void ReliableQuicStream::OnCanWrite() { 284 bool fin = false; 285 while (!queued_data_.empty()) { 286 PendingData* pending_data = &queued_data_.front(); 287 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); 288 if (queued_data_.size() == 1 && fin_buffered_) { 289 fin = true; 290 } 291 struct iovec iov(MakeIovec(pending_data->data)); 292 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); 293 if (consumed_data.bytes_consumed == pending_data->data.size() && 294 fin == consumed_data.fin_consumed) { 295 queued_data_.pop_front(); 296 if (delegate != NULL) { 297 delegate->WroteData(true); 298 } 299 } else { 300 if (consumed_data.bytes_consumed > 0) { 301 pending_data->data.erase(0, consumed_data.bytes_consumed); 302 if (delegate != NULL) { 303 delegate->WroteData(false); 304 } 305 } 306 break; 307 } 308 } 309} 310 311QuicConsumedData ReliableQuicStream::WritevData( 312 const struct iovec* iov, 313 int iov_count, 314 bool fin, 315 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 316 if (write_side_closed_) { 317 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 318 return QuicConsumedData(0, false); 319 } 320 321 // How much data we want to write. 322 size_t write_length = TotalIovecLength(iov, iov_count); 323 324 // How much data we are allowed to write from flow control. 325 size_t send_window = flow_controller_.SendWindowSize(); 326 327 // A FIN with zero data payload should not be flow control blocked. 328 bool fin_with_zero_data = (fin && write_length == 0); 329 330 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { 331 if (send_window == 0 && !fin_with_zero_data) { 332 // Quick return if we can't send anything. 333 flow_controller_.MaybeSendBlocked(session()->connection()); 334 return QuicConsumedData(0, false); 335 } 336 337 if (write_length > send_window) { 338 // Don't send the FIN if we aren't going to send all the data. 339 fin = false; 340 341 // Writing more data would be a violation of flow control. 342 write_length = send_window; 343 } 344 } 345 346 // Fill an IOVector with bytes from the iovec. 347 IOVector data; 348 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 349 350 QuicConsumedData consumed_data = session()->WritevData( 351 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); 352 stream_bytes_written_ += consumed_data.bytes_consumed; 353 354 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { 355 flow_controller_.AddBytesSent(consumed_data.bytes_consumed); 356 } 357 358 if (consumed_data.bytes_consumed == write_length) { 359 if (!fin_with_zero_data) { 360 if (version() >= QUIC_VERSION_17) { 361 flow_controller_.MaybeSendBlocked(session()->connection()); 362 } 363 } 364 if (fin && consumed_data.fin_consumed) { 365 fin_sent_ = true; 366 CloseWriteSide(); 367 } else if (fin && !consumed_data.fin_consumed) { 368 session_->MarkWriteBlocked(id(), EffectivePriority()); 369 } 370 } else { 371 session_->MarkWriteBlocked(id(), EffectivePriority()); 372 } 373 return consumed_data; 374} 375 376void ReliableQuicStream::CloseReadSide() { 377 if (read_side_closed_) { 378 return; 379 } 380 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 381 382 read_side_closed_ = true; 383 if (write_side_closed_) { 384 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 385 session_->CloseStream(id()); 386 } 387} 388 389void ReliableQuicStream::CloseWriteSide() { 390 if (write_side_closed_) { 391 return; 392 } 393 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 394 395 write_side_closed_ = true; 396 if (read_side_closed_) { 397 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 398 session_->CloseStream(id()); 399 } 400} 401 402bool ReliableQuicStream::HasBufferedData() { 403 return !queued_data_.empty(); 404} 405 406void ReliableQuicStream::OnClose() { 407 CloseReadSide(); 408 CloseWriteSide(); 409 410 if (version() > QUIC_VERSION_13 && 411 !fin_sent_ && !rst_sent_) { 412 // For flow control accounting, we must tell the peer how many bytes we have 413 // written on this stream before termination. Done here if needed, using a 414 // RST frame. 415 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 416 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING, 417 stream_bytes_written_); 418 rst_sent_ = true; 419 } 420} 421 422void ReliableQuicStream::OnWindowUpdateFrame( 423 const QuicWindowUpdateFrame& frame) { 424 if (!flow_controller_.IsEnabled()) { 425 DLOG(DFATAL) << "Flow control not enabled! " << version(); 426 return; 427 } 428 429 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { 430 // We can write again! 431 // TODO(rjshade): This does not respect priorities (e.g. multiple 432 // outstanding POSTs are unblocked on arrival of 433 // SHLO with initial window). 434 OnCanWrite(); 435 } 436} 437 438} // namespace net 439