reliable_quic_stream.cc revision f8ee788a64d60abd8f2d742a5fdedde054ecd910
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "base/logging.h" 8#include "net/quic/iovector.h" 9#include "net/quic/quic_flow_controller.h" 10#include "net/quic/quic_session.h" 11#include "net/quic/quic_write_blocked_list.h" 12 13using base::StringPiece; 14using std::min; 15 16namespace net { 17 18#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 19 20namespace { 21 22struct iovec MakeIovec(StringPiece data) { 23 struct iovec iov = {const_cast<char*>(data.data()), 24 static_cast<size_t>(data.size())}; 25 return iov; 26} 27 28} // namespace 29 30// Wrapper that aggregates OnAckNotifications for packets sent using 31// WriteOrBufferData and delivers them to the original 32// QuicAckNotifier::DelegateInterface after all bytes written using 33// WriteOrBufferData are acked. This level of indirection is 34// necessary because the delegate interface provides no mechanism that 35// WriteOrBufferData can use to inform it that the write required 36// multiple WritevData calls or that only part of the data has been 37// sent out by the time ACKs start arriving. 38class ReliableQuicStream::ProxyAckNotifierDelegate 39 : public QuicAckNotifier::DelegateInterface { 40 public: 41 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) 42 : delegate_(delegate), 43 pending_acks_(0), 44 wrote_last_data_(false), 45 num_original_packets_(0), 46 num_original_bytes_(0), 47 num_retransmitted_packets_(0), 48 num_retransmitted_bytes_(0) { 49 } 50 51 virtual void OnAckNotification(int num_original_packets, 52 int num_original_bytes, 53 int num_retransmitted_packets, 54 int num_retransmitted_bytes, 55 QuicTime::Delta delta_largest_observed) 56 OVERRIDE { 57 DCHECK_LT(0, pending_acks_); 58 --pending_acks_; 59 num_original_packets_ += num_original_packets; 60 num_original_bytes_ += num_original_bytes; 61 num_retransmitted_packets_ += num_retransmitted_packets; 62 num_retransmitted_bytes_ += num_retransmitted_bytes; 63 64 if (wrote_last_data_ && pending_acks_ == 0) { 65 delegate_->OnAckNotification(num_original_packets_, 66 num_original_bytes_, 67 num_retransmitted_packets_, 68 num_retransmitted_bytes_, 69 delta_largest_observed); 70 } 71 } 72 73 void WroteData(bool last_data) { 74 DCHECK(!wrote_last_data_); 75 ++pending_acks_; 76 wrote_last_data_ = last_data; 77 } 78 79 protected: 80 // Delegates are ref counted. 81 virtual ~ProxyAckNotifierDelegate() OVERRIDE { 82 } 83 84 private: 85 // Original delegate. delegate_->OnAckNotification will be called when: 86 // wrote_last_data_ == true and pending_acks_ == 0 87 scoped_refptr<DelegateInterface> delegate_; 88 89 // Number of outstanding acks. 90 int pending_acks_; 91 92 // True if no pending writes remain. 93 bool wrote_last_data_; 94 95 // Accumulators. 96 int num_original_packets_; 97 int num_original_bytes_; 98 int num_retransmitted_packets_; 99 int num_retransmitted_bytes_; 100 101 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); 102}; 103 104ReliableQuicStream::PendingData::PendingData( 105 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) 106 : data(data_in), delegate(delegate_in) { 107} 108 109ReliableQuicStream::PendingData::~PendingData() { 110} 111 112ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) 113 : sequencer_(this), 114 id_(id), 115 session_(session), 116 stream_bytes_read_(0), 117 stream_bytes_written_(0), 118 stream_error_(QUIC_STREAM_NO_ERROR), 119 connection_error_(QUIC_NO_ERROR), 120 read_side_closed_(false), 121 write_side_closed_(false), 122 fin_buffered_(false), 123 fin_sent_(false), 124 fin_received_(false), 125 rst_sent_(false), 126 rst_received_(false), 127 fec_policy_(FEC_PROTECT_OPTIONAL), 128 is_server_(session_->is_server()), 129 flow_controller_( 130 session_->connection(), 131 id_, 132 is_server_, 133 session_->config()->HasReceivedInitialFlowControlWindowBytes() ? 134 session_->config()->ReceivedInitialFlowControlWindowBytes() : 135 kDefaultFlowControlSendWindow, 136 session_->config()->GetInitialFlowControlWindowToSend(), 137 session_->config()->GetInitialFlowControlWindowToSend()), 138 connection_flow_controller_(session_->flow_controller()) { 139} 140 141ReliableQuicStream::~ReliableQuicStream() { 142} 143 144bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 145 if (read_side_closed_) { 146 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 147 // We don't want to be reading: blackhole the data. 148 return true; 149 } 150 151 if (frame.stream_id != id_) { 152 LOG(ERROR) << "Error!"; 153 return false; 154 } 155 156 if (frame.fin) { 157 fin_received_ = true; 158 } 159 160 // This count include duplicate data received. 161 size_t frame_payload_size = frame.data.TotalBufferSize(); 162 stream_bytes_read_ += frame_payload_size; 163 164 // Flow control is interested in tracking highest received offset. 165 if (MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { 166 // As the highest received offset has changed, we should check to see if 167 // this is a violation of flow control. 168 if (flow_controller_.FlowControlViolation() || 169 connection_flow_controller_->FlowControlViolation()) { 170 session_->connection()->SendConnectionClose( 171 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); 172 return false; 173 } 174 } 175 176 return sequencer_.OnStreamFrame(frame); 177} 178 179int ReliableQuicStream::num_frames_received() const { 180 return sequencer_.num_frames_received(); 181} 182 183int ReliableQuicStream::num_duplicate_frames_received() const { 184 return sequencer_.num_duplicate_frames_received(); 185} 186 187void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { 188 rst_received_ = true; 189 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); 190 191 stream_error_ = frame.error_code; 192 CloseWriteSide(); 193 CloseReadSide(); 194} 195 196void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 197 bool from_peer) { 198 if (read_side_closed_ && write_side_closed_) { 199 return; 200 } 201 if (error != QUIC_NO_ERROR) { 202 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 203 connection_error_ = error; 204 } 205 206 CloseWriteSide(); 207 CloseReadSide(); 208} 209 210void ReliableQuicStream::OnFinRead() { 211 DCHECK(sequencer_.IsClosed()); 212 CloseReadSide(); 213} 214 215void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 216 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 217 stream_error_ = error; 218 // Sending a RstStream results in calling CloseStream. 219 session()->SendRstStream(id(), error, stream_bytes_written_); 220 rst_sent_ = true; 221} 222 223void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 224 session()->connection()->SendConnectionClose(error); 225} 226 227void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 228 const string& details) { 229 session()->connection()->SendConnectionCloseWithDetails(error, details); 230} 231 232QuicVersion ReliableQuicStream::version() const { 233 return session()->connection()->version(); 234} 235 236void ReliableQuicStream::WriteOrBufferData( 237 StringPiece data, 238 bool fin, 239 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 240 if (data.empty() && !fin) { 241 LOG(DFATAL) << "data.empty() && !fin"; 242 return; 243 } 244 245 if (fin_buffered_) { 246 LOG(DFATAL) << "Fin already buffered"; 247 return; 248 } 249 250 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; 251 if (ack_notifier_delegate != NULL) { 252 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); 253 } 254 255 QuicConsumedData consumed_data(0, false); 256 fin_buffered_ = fin; 257 258 if (queued_data_.empty()) { 259 struct iovec iov(MakeIovec(data)); 260 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); 261 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 262 } 263 264 bool write_completed; 265 // If there's unconsumed data or an unconsumed fin, queue it. 266 if (consumed_data.bytes_consumed < data.length() || 267 (fin && !consumed_data.fin_consumed)) { 268 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); 269 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); 270 write_completed = false; 271 } else { 272 write_completed = true; 273 } 274 275 if ((proxy_delegate.get() != NULL) && 276 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { 277 proxy_delegate->WroteData(write_completed); 278 } 279} 280 281void ReliableQuicStream::OnCanWrite() { 282 bool fin = false; 283 while (!queued_data_.empty()) { 284 PendingData* pending_data = &queued_data_.front(); 285 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); 286 if (queued_data_.size() == 1 && fin_buffered_) { 287 fin = true; 288 } 289 struct iovec iov(MakeIovec(pending_data->data)); 290 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); 291 if (consumed_data.bytes_consumed == pending_data->data.size() && 292 fin == consumed_data.fin_consumed) { 293 queued_data_.pop_front(); 294 if (delegate != NULL) { 295 delegate->WroteData(true); 296 } 297 } else { 298 if (consumed_data.bytes_consumed > 0) { 299 pending_data->data.erase(0, consumed_data.bytes_consumed); 300 if (delegate != NULL) { 301 delegate->WroteData(false); 302 } 303 } 304 break; 305 } 306 } 307} 308 309void ReliableQuicStream::MaybeSendBlocked() { 310 flow_controller_.MaybeSendBlocked(); 311 connection_flow_controller_->MaybeSendBlocked(); 312 // If we are connection level flow control blocked, then add the stream 313 // to the write blocked list. It will be given a chance to write when a 314 // connection level WINDOW_UPDATE arrives. 315 if (connection_flow_controller_->IsBlocked() && 316 !flow_controller_.IsBlocked()) { 317 session_->MarkWriteBlocked(id(), EffectivePriority()); 318 } 319} 320 321QuicConsumedData ReliableQuicStream::WritevData( 322 const struct iovec* iov, 323 int iov_count, 324 bool fin, 325 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 326 if (write_side_closed_) { 327 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 328 return QuicConsumedData(0, false); 329 } 330 331 // How much data we want to write. 332 size_t write_length = TotalIovecLength(iov, iov_count); 333 334 // A FIN with zero data payload should not be flow control blocked. 335 bool fin_with_zero_data = (fin && write_length == 0); 336 337 if (flow_controller_.IsEnabled()) { 338 // How much data we are allowed to write from flow control. 339 uint64 send_window = flow_controller_.SendWindowSize(); 340 if (connection_flow_controller_->IsEnabled()) { 341 send_window = 342 min(send_window, connection_flow_controller_->SendWindowSize()); 343 } 344 345 if (send_window == 0 && !fin_with_zero_data) { 346 // Quick return if we can't send anything. 347 MaybeSendBlocked(); 348 return QuicConsumedData(0, false); 349 } 350 351 if (write_length > send_window) { 352 // Don't send the FIN if we aren't going to send all the data. 353 fin = false; 354 355 // Writing more data would be a violation of flow control. 356 write_length = send_window; 357 } 358 } 359 360 // Fill an IOVector with bytes from the iovec. 361 IOVector data; 362 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 363 364 QuicConsumedData consumed_data = session()->WritevData( 365 id(), data, stream_bytes_written_, fin, GetFecProtection(), 366 ack_notifier_delegate); 367 stream_bytes_written_ += consumed_data.bytes_consumed; 368 369 AddBytesSent(consumed_data.bytes_consumed); 370 371 if (consumed_data.bytes_consumed == write_length) { 372 if (!fin_with_zero_data) { 373 MaybeSendBlocked(); 374 } 375 if (fin && consumed_data.fin_consumed) { 376 fin_sent_ = true; 377 CloseWriteSide(); 378 } else if (fin && !consumed_data.fin_consumed) { 379 session_->MarkWriteBlocked(id(), EffectivePriority()); 380 } 381 } else { 382 session_->MarkWriteBlocked(id(), EffectivePriority()); 383 } 384 return consumed_data; 385} 386 387FecProtection ReliableQuicStream::GetFecProtection() { 388 return fec_policy_ == FEC_PROTECT_ALWAYS ? MUST_FEC_PROTECT : MAY_FEC_PROTECT; 389} 390 391void ReliableQuicStream::CloseReadSide() { 392 if (read_side_closed_) { 393 return; 394 } 395 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 396 397 read_side_closed_ = true; 398 if (write_side_closed_) { 399 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 400 session_->CloseStream(id()); 401 } 402} 403 404void ReliableQuicStream::CloseWriteSide() { 405 if (write_side_closed_) { 406 return; 407 } 408 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 409 410 write_side_closed_ = true; 411 if (read_side_closed_) { 412 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 413 session_->CloseStream(id()); 414 } 415} 416 417bool ReliableQuicStream::HasBufferedData() const { 418 return !queued_data_.empty(); 419} 420 421void ReliableQuicStream::OnClose() { 422 CloseReadSide(); 423 CloseWriteSide(); 424 425 if (!fin_sent_ && !rst_sent_) { 426 // For flow control accounting, we must tell the peer how many bytes we have 427 // written on this stream before termination. Done here if needed, using a 428 // RST frame. 429 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 430 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING, 431 stream_bytes_written_); 432 rst_sent_ = true; 433 } 434 435 // We are closing the stream and will not process any further incoming bytes. 436 // As there may be more bytes in flight and we need to ensure that both 437 // endpoints have the same connection level flow control state, mark all 438 // unreceived or buffered bytes as consumed. 439 uint64 bytes_to_consume = flow_controller_.highest_received_byte_offset() - 440 flow_controller_.bytes_consumed(); 441 AddBytesConsumed(bytes_to_consume); 442} 443 444void ReliableQuicStream::OnWindowUpdateFrame( 445 const QuicWindowUpdateFrame& frame) { 446 if (!flow_controller_.IsEnabled()) { 447 DLOG(DFATAL) << "Flow control not enabled! " << version(); 448 return; 449 } 450 451 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { 452 // We can write again! 453 // TODO(rjshade): This does not respect priorities (e.g. multiple 454 // outstanding POSTs are unblocked on arrival of 455 // SHLO with initial window). 456 // As long as the connection is not flow control blocked, we can write! 457 OnCanWrite(); 458 } 459} 460 461bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(uint64 new_offset) { 462 if (flow_controller_.IsEnabled()) { 463 uint64 increment = 464 new_offset - flow_controller_.highest_received_byte_offset(); 465 if (flow_controller_.UpdateHighestReceivedOffset(new_offset)) { 466 // If |new_offset| increased the stream flow controller's highest received 467 // offset, then we need to increase the connection flow controller's value 468 // by the incremental difference. 469 connection_flow_controller_->UpdateHighestReceivedOffset( 470 connection_flow_controller_->highest_received_byte_offset() + 471 increment); 472 return true; 473 } 474 } 475 return false; 476} 477 478void ReliableQuicStream::AddBytesSent(uint64 bytes) { 479 if (flow_controller_.IsEnabled()) { 480 flow_controller_.AddBytesSent(bytes); 481 connection_flow_controller_->AddBytesSent(bytes); 482 } 483} 484 485void ReliableQuicStream::AddBytesConsumed(uint64 bytes) { 486 if (flow_controller_.IsEnabled()) { 487 // Only adjust stream level flow controller if we are still reading. 488 if (!read_side_closed_) { 489 flow_controller_.AddBytesConsumed(bytes); 490 } 491 492 connection_flow_controller_->AddBytesConsumed(bytes); 493 } 494} 495 496bool ReliableQuicStream::IsFlowControlBlocked() { 497 return flow_controller_.IsBlocked() || 498 connection_flow_controller_->IsBlocked(); 499} 500 501} // namespace net 502