reliable_quic_stream.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "base/logging.h" 8#include "net/quic/quic_session.h" 9#include "net/quic/quic_spdy_decompressor.h" 10#include "net/quic/quic_write_blocked_list.h" 11 12using base::StringPiece; 13using std::min; 14 15namespace net { 16 17#define ENDPOINT (is_server_ ? "Server: " : " Client: ") 18 19namespace { 20 21struct iovec MakeIovec(StringPiece data) { 22 struct iovec iov = {const_cast<char*>(data.data()), 23 static_cast<size_t>(data.size())}; 24 return iov; 25} 26 27} // namespace 28 29ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 30 QuicSession* session) 31 : sequencer_(this), 32 id_(id), 33 session_(session), 34 stream_bytes_read_(0), 35 stream_bytes_written_(0), 36 stream_error_(QUIC_STREAM_NO_ERROR), 37 connection_error_(QUIC_NO_ERROR), 38 read_side_closed_(false), 39 write_side_closed_(false), 40 fin_buffered_(false), 41 fin_sent_(false), 42 rst_sent_(false), 43 is_server_(session_->is_server()) { 44} 45 46ReliableQuicStream::~ReliableQuicStream() { 47} 48 49bool ReliableQuicStream::WillAcceptStreamFrame( 50 const QuicStreamFrame& frame) const { 51 if (read_side_closed_) { 52 return true; 53 } 54 if (frame.stream_id != id_) { 55 LOG(ERROR) << "Error!"; 56 return false; 57 } 58 return sequencer_.WillAcceptStreamFrame(frame); 59} 60 61bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 62 DCHECK_EQ(frame.stream_id, id_); 63 if (read_side_closed_) { 64 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 65 // We don't want to be reading: blackhole the data. 66 return true; 67 } 68 // Note: This count include duplicate data received. 69 stream_bytes_read_ += frame.data.TotalBufferSize(); 70 71 bool accepted = sequencer_.OnStreamFrame(frame); 72 73 return accepted; 74} 75 76void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { 77 stream_error_ = frame.error_code; 78 CloseWriteSide(); 79 CloseReadSide(); 80} 81 82void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 83 bool from_peer) { 84 if (read_side_closed_ && write_side_closed_) { 85 return; 86 } 87 if (error != QUIC_NO_ERROR) { 88 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 89 connection_error_ = error; 90 } 91 92 CloseWriteSide(); 93 CloseReadSide(); 94} 95 96void ReliableQuicStream::OnFinRead() { 97 DCHECK(sequencer_.IsClosed()); 98 CloseReadSide(); 99} 100 101void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 102 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 103 stream_error_ = error; 104 // Sending a RstStream results in calling CloseStream. 105 session()->SendRstStream(id(), error, stream_bytes_written_); 106 rst_sent_ = true; 107} 108 109void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 110 session()->connection()->SendConnectionClose(error); 111} 112 113void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 114 const string& details) { 115 session()->connection()->SendConnectionCloseWithDetails(error, details); 116} 117 118QuicVersion ReliableQuicStream::version() { 119 return session()->connection()->version(); 120} 121 122void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { 123 if (data.empty() && !fin) { 124 LOG(DFATAL) << "data.empty() && !fin"; 125 return; 126 } 127 128 if (fin_buffered_) { 129 LOG(DFATAL) << "Fin already buffered"; 130 return; 131 } 132 133 QuicConsumedData consumed_data(0, false); 134 fin_buffered_ = fin; 135 136 if (queued_data_.empty()) { 137 struct iovec iov(MakeIovec(data)); 138 consumed_data = WritevData(&iov, 1, fin, NULL); 139 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 140 } 141 142 // If there's unconsumed data or an unconsumed fin, queue it. 143 if (consumed_data.bytes_consumed < data.length() || 144 (fin && !consumed_data.fin_consumed)) { 145 queued_data_.push_back( 146 string(data.data() + consumed_data.bytes_consumed, 147 data.length() - consumed_data.bytes_consumed)); 148 } 149} 150 151void ReliableQuicStream::OnCanWrite() { 152 bool fin = false; 153 while (!queued_data_.empty()) { 154 const string& data = queued_data_.front(); 155 if (queued_data_.size() == 1 && fin_buffered_) { 156 fin = true; 157 } 158 struct iovec iov(MakeIovec(data)); 159 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL); 160 if (consumed_data.bytes_consumed == data.size() && 161 fin == consumed_data.fin_consumed) { 162 queued_data_.pop_front(); 163 } else { 164 queued_data_.front().erase(0, consumed_data.bytes_consumed); 165 break; 166 } 167 } 168} 169 170QuicConsumedData ReliableQuicStream::WritevData( 171 const struct iovec* iov, 172 int iov_count, 173 bool fin, 174 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 175 if (write_side_closed_) { 176 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 177 return QuicConsumedData(0, false); 178 } 179 180 size_t write_length = 0u; 181 for (int i = 0; i < iov_count; ++i) { 182 write_length += iov[i].iov_len; 183 } 184 QuicConsumedData consumed_data = session()->WritevData( 185 id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate); 186 stream_bytes_written_ += consumed_data.bytes_consumed; 187 if (consumed_data.bytes_consumed == write_length) { 188 if (fin && consumed_data.fin_consumed) { 189 fin_sent_ = true; 190 CloseWriteSide(); 191 } else if (fin && !consumed_data.fin_consumed) { 192 session_->MarkWriteBlocked(id(), EffectivePriority()); 193 } 194 } else { 195 session_->MarkWriteBlocked(id(), EffectivePriority()); 196 } 197 return consumed_data; 198} 199 200void ReliableQuicStream::CloseReadSide() { 201 if (read_side_closed_) { 202 return; 203 } 204 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 205 206 read_side_closed_ = true; 207 if (write_side_closed_) { 208 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 209 session_->CloseStream(id()); 210 } 211} 212 213void ReliableQuicStream::CloseWriteSide() { 214 if (write_side_closed_) { 215 return; 216 } 217 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 218 219 write_side_closed_ = true; 220 if (read_side_closed_) { 221 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 222 session_->CloseStream(id()); 223 } 224} 225 226bool ReliableQuicStream::HasBufferedData() { 227 return !queued_data_.empty(); 228} 229 230void ReliableQuicStream::OnClose() { 231 CloseReadSide(); 232 CloseWriteSide(); 233 234 if (version() > QUIC_VERSION_13 && 235 !fin_sent_ && !rst_sent_) { 236 // For flow control accounting, we must tell the peer how many bytes we have 237 // written on this stream before termination. Done here if needed, using a 238 // RST frame. 239 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 240 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); 241 rst_sent_ = true; 242 } 243} 244 245} // namespace net 246