reliable_quic_stream.cc revision e5d81f57cb97b3b6b7fccc9c5610d21eb81db09d
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "base/logging.h" 8#include "net/quic/iovector.h" 9#include "net/quic/quic_session.h" 10#include "net/quic/quic_write_blocked_list.h" 11 12using base::StringPiece; 13using std::min; 14 15namespace net { 16 17#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 18 19namespace { 20 21struct iovec MakeIovec(StringPiece data) { 22 struct iovec iov = {const_cast<char*>(data.data()), 23 static_cast<size_t>(data.size())}; 24 return iov; 25} 26 27} // namespace 28 29// Wrapper that aggregates OnAckNotifications for packets sent using 30// WriteOrBufferData and delivers them to the original 31// QuicAckNotifier::DelegateInterface after all bytes written using 32// WriteOrBufferData are acked. This level of indirection is 33// necessary because the delegate interface provides no mechanism that 34// WriteOrBufferData can use to inform it that the write required 35// multiple WritevData calls or that only part of the data has been 36// sent out by the time ACKs start arriving. 37class ReliableQuicStream::ProxyAckNotifierDelegate 38 : public QuicAckNotifier::DelegateInterface { 39 public: 40 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) 41 : delegate_(delegate), 42 pending_acks_(0), 43 wrote_last_data_(false), 44 num_original_packets_(0), 45 num_original_bytes_(0), 46 num_retransmitted_packets_(0), 47 num_retransmitted_bytes_(0) { 48 } 49 50 virtual void OnAckNotification(int num_original_packets, 51 int num_original_bytes, 52 int num_retransmitted_packets, 53 int num_retransmitted_bytes) OVERRIDE { 54 DCHECK_LT(0, pending_acks_); 55 --pending_acks_; 56 num_original_packets_ += num_original_packets; 57 num_original_bytes_ += num_original_bytes; 58 num_retransmitted_packets_ += num_retransmitted_packets; 59 num_retransmitted_bytes_ += num_retransmitted_bytes; 60 61 if (wrote_last_data_ && pending_acks_ == 0) { 62 delegate_->OnAckNotification(num_original_packets_, 63 num_original_bytes_, 64 num_retransmitted_packets_, 65 num_retransmitted_bytes_); 66 } 67 } 68 69 void WroteData(bool last_data) { 70 DCHECK(!wrote_last_data_); 71 ++pending_acks_; 72 wrote_last_data_ = last_data; 73 } 74 75 protected: 76 // Delegates are ref counted. 77 virtual ~ProxyAckNotifierDelegate() { 78 } 79 80 private: 81 // Original delegate. delegate_->OnAckNotification will be called when: 82 // wrote_last_data_ == true and pending_acks_ == 0 83 scoped_refptr<DelegateInterface> delegate_; 84 85 // Number of outstanding acks. 86 int pending_acks_; 87 88 // True if no pending writes remain. 89 bool wrote_last_data_; 90 91 // Accumulators. 92 int num_original_packets_; 93 int num_original_bytes_; 94 int num_retransmitted_packets_; 95 int num_retransmitted_bytes_; 96 97 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); 98}; 99 100ReliableQuicStream::PendingData::PendingData( 101 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) 102 : data(data_in), delegate(delegate_in) { 103} 104 105ReliableQuicStream::PendingData::~PendingData() { 106} 107 108ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) 109 : sequencer_(this), 110 id_(id), 111 session_(session), 112 stream_bytes_read_(0), 113 stream_bytes_written_(0), 114 stream_error_(QUIC_STREAM_NO_ERROR), 115 connection_error_(QUIC_NO_ERROR), 116 flow_control_send_limit_( 117 session_->config()->peer_initial_flow_control_window_bytes()), 118 max_flow_control_receive_window_bytes_( 119 session_->connection()->max_flow_control_receive_window_bytes()), 120 flow_control_receive_window_offset_bytes_( 121 session_->connection()->max_flow_control_receive_window_bytes()), 122 read_side_closed_(false), 123 write_side_closed_(false), 124 fin_buffered_(false), 125 fin_sent_(false), 126 rst_sent_(false), 127 is_server_(session_->is_server()) { 128 DVLOG(1) << ENDPOINT << "Created stream " << id_ 129 << ", setting initial receive window to: " 130 << flow_control_receive_window_offset_bytes_ 131 << ", setting send window to: " << flow_control_send_limit_; 132} 133 134ReliableQuicStream::~ReliableQuicStream() { 135} 136 137bool ReliableQuicStream::WillAcceptStreamFrame( 138 const QuicStreamFrame& frame) const { 139 if (read_side_closed_) { 140 return true; 141 } 142 if (frame.stream_id != id_) { 143 LOG(ERROR) << "Error!"; 144 return false; 145 } 146 return sequencer_.WillAcceptStreamFrame(frame); 147} 148 149bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 150 DCHECK_EQ(frame.stream_id, id_); 151 if (read_side_closed_) { 152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 153 // We don't want to be reading: blackhole the data. 154 return true; 155 } 156 157 // This count include duplicate data received. 158 stream_bytes_read_ += frame.data.TotalBufferSize(); 159 160 bool accepted = sequencer_.OnStreamFrame(frame); 161 162 if (IsFlowControlEnabled()) { 163 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { 164 // TODO(rjshade): Lower severity from DFATAL once we have established that 165 // flow control is working correctly. 166 LOG(DFATAL) 167 << ENDPOINT << "Flow control violation on stream: " << id() 168 << ", our receive offset is: " 169 << flow_control_receive_window_offset_bytes_ 170 << ", we have consumed: " << sequencer_.num_bytes_consumed() 171 << ", we have buffered: " << sequencer_.num_bytes_buffered() 172 << ", total: " << TotalReceivedBytes(); 173 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); 174 return false; 175 } 176 MaybeSendWindowUpdate(); 177 } 178 179 return accepted; 180} 181 182void ReliableQuicStream::MaybeSendWindowUpdate() { 183 if (!IsFlowControlEnabled()) { 184 return; 185 } 186 187 // Send WindowUpdate to increase receive window if 188 // (receive window offset - consumed bytes) < (max window / 2). 189 // This is behaviour copied from SPDY. 190 size_t consumed_window = flow_control_receive_window_offset_bytes_ - 191 sequencer_.num_bytes_consumed(); 192 size_t threshold = (max_flow_control_receive_window_bytes_ / 2); 193 if (consumed_window < threshold) { 194 // Update our receive window. 195 flow_control_receive_window_offset_bytes_ += 196 (max_flow_control_receive_window_bytes_ - consumed_window); 197 DVLOG(1) << ENDPOINT << "Stream: " << id() 198 << ", sending WindowUpdate frame. " 199 << "Consumed bytes: " << sequencer_.num_bytes_consumed() 200 << ", Receive window offset: " 201 << flow_control_receive_window_offset_bytes_ 202 << ", Consumed window: " << consumed_window 203 << ", and threshold: " << threshold 204 << ". New receive window offset is: " 205 << flow_control_receive_window_offset_bytes_; 206 207 // Inform the peer of our new receive window. 208 session()->connection()->SendWindowUpdate( 209 id(), flow_control_receive_window_offset_bytes_); 210 } 211} 212 213void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { 214 stream_error_ = frame.error_code; 215 CloseWriteSide(); 216 CloseReadSide(); 217} 218 219void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 220 bool from_peer) { 221 if (read_side_closed_ && write_side_closed_) { 222 return; 223 } 224 if (error != QUIC_NO_ERROR) { 225 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 226 connection_error_ = error; 227 } 228 229 CloseWriteSide(); 230 CloseReadSide(); 231} 232 233void ReliableQuicStream::OnFinRead() { 234 DCHECK(sequencer_.IsClosed()); 235 CloseReadSide(); 236} 237 238void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 239 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 240 stream_error_ = error; 241 // Sending a RstStream results in calling CloseStream. 242 session()->SendRstStream(id(), error, stream_bytes_written_); 243 rst_sent_ = true; 244} 245 246void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 247 session()->connection()->SendConnectionClose(error); 248} 249 250void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 251 const string& details) { 252 session()->connection()->SendConnectionCloseWithDetails(error, details); 253} 254 255QuicVersion ReliableQuicStream::version() const { 256 return session()->connection()->version(); 257} 258 259void ReliableQuicStream::WriteOrBufferData( 260 StringPiece data, 261 bool fin, 262 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 263 if (data.empty() && !fin) { 264 LOG(DFATAL) << "data.empty() && !fin"; 265 return; 266 } 267 268 if (fin_buffered_) { 269 LOG(DFATAL) << "Fin already buffered"; 270 return; 271 } 272 273 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; 274 if (ack_notifier_delegate != NULL) { 275 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); 276 } 277 278 QuicConsumedData consumed_data(0, false); 279 fin_buffered_ = fin; 280 281 if (queued_data_.empty()) { 282 struct iovec iov(MakeIovec(data)); 283 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); 284 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 285 } 286 287 bool write_completed; 288 // If there's unconsumed data or an unconsumed fin, queue it. 289 if (consumed_data.bytes_consumed < data.length() || 290 (fin && !consumed_data.fin_consumed)) { 291 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); 292 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); 293 write_completed = false; 294 } else { 295 write_completed = true; 296 } 297 298 if ((proxy_delegate.get() != NULL) && 299 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { 300 proxy_delegate->WroteData(write_completed); 301 } 302} 303 304void ReliableQuicStream::OnCanWrite() { 305 bool fin = false; 306 while (!queued_data_.empty()) { 307 PendingData* pending_data = &queued_data_.front(); 308 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); 309 if (queued_data_.size() == 1 && fin_buffered_) { 310 fin = true; 311 } 312 struct iovec iov(MakeIovec(pending_data->data)); 313 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); 314 if (consumed_data.bytes_consumed == pending_data->data.size() && 315 fin == consumed_data.fin_consumed) { 316 queued_data_.pop_front(); 317 if (delegate != NULL) { 318 delegate->WroteData(true); 319 } 320 } else { 321 if (consumed_data.bytes_consumed > 0) { 322 pending_data->data.erase(0, consumed_data.bytes_consumed); 323 if (delegate != NULL) { 324 delegate->WroteData(false); 325 } 326 } 327 break; 328 } 329 } 330} 331 332QuicConsumedData ReliableQuicStream::WritevData( 333 const struct iovec* iov, 334 int iov_count, 335 bool fin, 336 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 337 if (write_side_closed_) { 338 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 339 return QuicConsumedData(0, false); 340 } 341 342 // How much data we want to write. 343 size_t write_length = TotalIovecLength(iov, iov_count); 344 345 // How much data we are allowed to write from flow control. 346 size_t send_window = SendWindowSize(); 347 348 // A FIN with zero data payload should not be flow control blocked. 349 bool fin_with_zero_data = (fin && write_length == 0); 350 351 if (IsFlowControlEnabled()) { 352 if (send_window == 0 && !fin_with_zero_data) { 353 // Quick return if we can't send anything. 354 session()->connection()->SendBlocked(id()); 355 return QuicConsumedData(0, false); 356 } 357 358 if (write_length > send_window) { 359 // Don't send the FIN if we aren't going to send all the data. 360 fin = false; 361 362 // Writing more data would be a violation of flow control. 363 write_length = send_window; 364 } 365 } 366 367 // Fill an IOVector with bytes from the iovec. 368 IOVector data; 369 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 370 371 QuicConsumedData consumed_data = session()->WritevData( 372 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); 373 stream_bytes_written_ += consumed_data.bytes_consumed; 374 375 if (consumed_data.bytes_consumed == write_length) { 376 if (IsFlowControlEnabled() && write_length == send_window && 377 !fin_with_zero_data) { 378 DVLOG(1) << ENDPOINT << "Stream " << id() 379 << " is flow control blocked. " 380 << "Send window: " << send_window 381 << ", stream_bytes_written: " << stream_bytes_written_ 382 << ", flow_control_send_limit: " 383 << flow_control_send_limit_; 384 // The entire send_window has been consumed, we are now flow control 385 // blocked. 386 session()->connection()->SendBlocked(id()); 387 } 388 if (fin && consumed_data.fin_consumed) { 389 fin_sent_ = true; 390 CloseWriteSide(); 391 } else if (fin && !consumed_data.fin_consumed) { 392 session_->MarkWriteBlocked(id(), EffectivePriority()); 393 } 394 } else { 395 session_->MarkWriteBlocked(id(), EffectivePriority()); 396 } 397 return consumed_data; 398} 399 400void ReliableQuicStream::CloseReadSide() { 401 if (read_side_closed_) { 402 return; 403 } 404 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 405 406 read_side_closed_ = true; 407 if (write_side_closed_) { 408 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 409 session_->CloseStream(id()); 410 } 411} 412 413void ReliableQuicStream::CloseWriteSide() { 414 if (write_side_closed_) { 415 return; 416 } 417 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 418 419 write_side_closed_ = true; 420 if (read_side_closed_) { 421 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 422 session_->CloseStream(id()); 423 } 424} 425 426bool ReliableQuicStream::HasBufferedData() { 427 return !queued_data_.empty(); 428} 429 430void ReliableQuicStream::OnClose() { 431 CloseReadSide(); 432 CloseWriteSide(); 433 434 if (version() > QUIC_VERSION_13 && 435 !fin_sent_ && !rst_sent_) { 436 // For flow control accounting, we must tell the peer how many bytes we have 437 // written on this stream before termination. Done here if needed, using a 438 // RST frame. 439 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 440 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); 441 rst_sent_ = true; 442 } 443} 444 445void ReliableQuicStream::OnWindowUpdateFrame( 446 const QuicWindowUpdateFrame& frame) { 447 if (!IsFlowControlEnabled()) { 448 DLOG(DFATAL) << "Flow control not enabled! " << version(); 449 return; 450 } 451 452 DVLOG(1) << ENDPOINT 453 << "OnWindowUpdateFrame for stream " << id() 454 << " with byte offset " << frame.byte_offset 455 << " , current offset: " << flow_control_send_limit_ << ")."; 456 457 UpdateFlowControlSendLimit(frame.byte_offset); 458} 459 460void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) { 461 if (offset <= flow_control_send_limit_) { 462 DVLOG(1) << ENDPOINT << "Stream " << id() 463 << ", not changing window, current: " << flow_control_send_limit_ 464 << " new: " << offset; 465 // No change to our send window. 466 return; 467 } 468 469 DVLOG(1) << ENDPOINT << "Stream " << id() 470 << ", changing window, current: " << flow_control_send_limit_ 471 << " new: " << offset; 472 // Send window has increased. 473 flow_control_send_limit_ = offset; 474 475 // We can write again! 476 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding 477 // POSTs are unblocked on arrival of SHLO with initial window). 478 OnCanWrite(); 479} 480 481bool ReliableQuicStream::IsFlowControlBlocked() const { 482 return IsFlowControlEnabled() && SendWindowSize() == 0; 483} 484 485uint64 ReliableQuicStream::SendWindowSize() const { 486 return flow_control_send_limit_ - stream_bytes_written(); 487} 488 489uint64 ReliableQuicStream::TotalReceivedBytes() const { 490 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered(); 491} 492 493} // namespace net 494