reliable_quic_stream.cc revision 5e3f23d412006dc4db4e659864679f29341e113f
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9 10using base::StringPiece; 11using std::min; 12 13namespace net { 14 15ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 16 QuicSession* session) 17 : sequencer_(this), 18 id_(id), 19 session_(session), 20 visitor_(NULL), 21 stream_bytes_read_(0), 22 stream_bytes_written_(0), 23 headers_decompressed_(false), 24 headers_id_(0), 25 stream_error_(QUIC_STREAM_NO_ERROR), 26 connection_error_(QUIC_NO_ERROR), 27 read_side_closed_(false), 28 write_side_closed_(false), 29 fin_buffered_(false), 30 fin_sent_(false) { 31} 32 33ReliableQuicStream::~ReliableQuicStream() { 34} 35 36bool ReliableQuicStream::WillAcceptStreamFrame( 37 const QuicStreamFrame& frame) const { 38 if (read_side_closed_) { 39 return true; 40 } 41 if (frame.stream_id != id_) { 42 LOG(ERROR) << "Error!"; 43 return false; 44 } 45 return sequencer_.WillAcceptStreamFrame(frame); 46} 47 48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 49 DCHECK_EQ(frame.stream_id, id_); 50 if (read_side_closed_) { 51 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 52 // We don't want to be reading: blackhole the data. 53 return true; 54 } 55 // Note: This count include duplicate data received. 56 stream_bytes_read_ += frame.data.length(); 57 58 bool accepted = sequencer_.OnStreamFrame(frame); 59 60 if (frame.fin) { 61 sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), true); 62 } 63 64 return accepted; 65} 66 67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 68 stream_error_ = error; 69 TerminateFromPeer(false); // Full close. 70} 71 72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 73 if (IsClosed()) { 74 return; 75 } 76 if (error != QUIC_NO_ERROR) { 77 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 78 connection_error_ = error; 79 } 80 81 if (from_peer) { 82 TerminateFromPeer(false); 83 } else { 84 CloseWriteSide(); 85 CloseReadSide(); 86 } 87} 88 89void ReliableQuicStream::TerminateFromPeer(bool half_close) { 90 if (!half_close) { 91 CloseWriteSide(); 92 } 93 CloseReadSide(); 94} 95 96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 97 stream_error_ = error; 98 if (error != QUIC_STREAM_NO_ERROR) { 99 // Sending a RstStream results in calling CloseStream. 100 session()->SendRstStream(id(), error); 101 } else { 102 session_->CloseStream(id()); 103 } 104} 105 106size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 107 if (headers_decompressed_ && decompressed_headers_.empty()) { 108 return sequencer_.Readv(iov, iov_len); 109 } 110 size_t bytes_consumed = 0; 111 size_t iov_index = 0; 112 while (iov_index < iov_len && 113 decompressed_headers_.length() > bytes_consumed) { 114 size_t bytes_to_read = min(iov[iov_index].iov_len, 115 decompressed_headers_.length() - bytes_consumed); 116 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 117 memcpy(iov_ptr, 118 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 119 bytes_consumed += bytes_to_read; 120 ++iov_index; 121 } 122 decompressed_headers_.erase(0, bytes_consumed); 123 return bytes_consumed; 124} 125 126int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 127 if (headers_decompressed_ && decompressed_headers_.empty()) { 128 return sequencer_.GetReadableRegions(iov, iov_len); 129 } 130 if (iov_len == 0) { 131 return 0; 132 } 133 iov[0].iov_base = static_cast<void*>( 134 const_cast<char*>(decompressed_headers_.data())); 135 iov[0].iov_len = decompressed_headers_.length(); 136 return 1; 137} 138 139bool ReliableQuicStream::IsHalfClosed() const { 140 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 141 return false; 142 } 143 return sequencer_.IsHalfClosed(); 144} 145 146bool ReliableQuicStream::IsClosed() const { 147 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); 148} 149 150bool ReliableQuicStream::HasBytesToRead() const { 151 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 152} 153 154const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 155 return session_->peer_address(); 156} 157 158QuicSpdyCompressor* ReliableQuicStream::compressor() { 159 return session_->compressor(); 160} 161 162QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 163 DCHECK(data.size() > 0 || fin); 164 return WriteOrBuffer(data, fin); 165} 166 167QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 168 DCHECK(!fin_buffered_); 169 170 QuicConsumedData consumed_data(0, false); 171 fin_buffered_ = fin; 172 173 if (queued_data_.empty()) { 174 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 175 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 176 } 177 178 // If there's unconsumed data or an unconsumed fin, queue it. 179 if (consumed_data.bytes_consumed < data.length() || 180 (fin && !consumed_data.fin_consumed)) { 181 queued_data_.push_back( 182 string(data.data() + consumed_data.bytes_consumed, 183 data.length() - consumed_data.bytes_consumed)); 184 } 185 186 return QuicConsumedData(data.size(), true); 187} 188 189void ReliableQuicStream::OnCanWrite() { 190 bool fin = false; 191 while (!queued_data_.empty()) { 192 const string& data = queued_data_.front(); 193 if (queued_data_.size() == 1 && fin_buffered_) { 194 fin = true; 195 } 196 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 197 if (consumed_data.bytes_consumed == data.size() && 198 fin == consumed_data.fin_consumed) { 199 queued_data_.pop_front(); 200 } else { 201 queued_data_.front().erase(0, consumed_data.bytes_consumed); 202 break; 203 } 204 } 205} 206 207QuicConsumedData ReliableQuicStream::WriteDataInternal( 208 StringPiece data, bool fin) { 209 if (write_side_closed_) { 210 DLOG(ERROR) << "Attempt to write when the write side is closed"; 211 return QuicConsumedData(0, false); 212 } 213 214 QuicConsumedData consumed_data = 215 session()->WriteData(id(), data, stream_bytes_written_, fin); 216 stream_bytes_written_ += consumed_data.bytes_consumed; 217 if (consumed_data.bytes_consumed == data.length()) { 218 if (fin && consumed_data.fin_consumed) { 219 fin_sent_ = true; 220 CloseWriteSide(); 221 } else if (fin && !consumed_data.fin_consumed) { 222 session_->MarkWriteBlocked(id()); 223 } 224 } else { 225 session_->MarkWriteBlocked(id()); 226 } 227 return consumed_data; 228} 229 230void ReliableQuicStream::CloseReadSide() { 231 if (read_side_closed_) { 232 return; 233 } 234 DLOG(INFO) << "Done reading from stream " << id(); 235 236 read_side_closed_ = true; 237 if (write_side_closed_) { 238 DLOG(INFO) << "Closing stream: " << id(); 239 session_->CloseStream(id()); 240 } 241} 242 243uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 244 if (id() == kCryptoStreamId) { 245 // The crypto stream does not use compression. 246 return ProcessData(data, data_len); 247 } 248 uint32 total_bytes_consumed = 0; 249 if (headers_id_ == 0u) { 250 // The headers ID has not yet been read. Strip it from the beginning of 251 // the data stream. 252 DCHECK_GT(4u, headers_id_buffer_.length()); 253 size_t missing_size = 4 - headers_id_buffer_.length(); 254 if (data_len < missing_size) { 255 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); 256 return data_len; 257 } 258 total_bytes_consumed += missing_size; 259 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 260 DCHECK_EQ(4u, headers_id_buffer_.length()); 261 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 262 headers_id_buffer_.clear(); 263 data += missing_size; 264 data_len -= missing_size; 265 } 266 DCHECK_NE(0u, headers_id_); 267 268 // Once the headers are finished, we simply pass the data through. 269 if (headers_decompressed_) { 270 // Some buffered header data remains. 271 if (!decompressed_headers_.empty()) { 272 ProcessHeaderData(); 273 } 274 if (decompressed_headers_.empty()) { 275 DVLOG(1) << "Delegating procesing to ProcessData"; 276 total_bytes_consumed += ProcessData(data, data_len); 277 } 278 return total_bytes_consumed; 279 } 280 281 QuicHeaderId current_header_id = 282 session_->decompressor()->current_header_id(); 283 // Ensure that this header id looks sane. 284 if (headers_id_ < current_header_id || 285 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 286 DVLOG(1) << "Invalid headers for stream: " << id() 287 << " header_id: " << headers_id_ 288 << " current_header_id: " << current_header_id; 289 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 290 return total_bytes_consumed; 291 } 292 293 // If we are head-of-line blocked on decompression, then back up. 294 if (current_header_id != headers_id_) { 295 session_->MarkDecompressionBlocked(headers_id_, id()); 296 DVLOG(1) << "Unable to decompress header data for stream: " << id() 297 << " header_id: " << headers_id_; 298 return total_bytes_consumed; 299 } 300 301 // Decompressed data will be delivered to decompressed_headers_. 302 size_t bytes_consumed = session_->decompressor()->DecompressData( 303 StringPiece(data, data_len), this); 304 total_bytes_consumed += bytes_consumed; 305 306 // Headers are complete if the decompressor has moved on to the 307 // next stream. 308 headers_decompressed_ = 309 session_->decompressor()->current_header_id() != headers_id_; 310 311 ProcessHeaderData(); 312 313 // We have processed all of the decompressed data but we might 314 // have some more raw data to process. 315 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { 316 total_bytes_consumed += ProcessData(data + bytes_consumed, 317 data_len - bytes_consumed); 318 } 319 320 // The sequencer will push any additional buffered frames if this data 321 // has been completely consumed. 322 return total_bytes_consumed; 323} 324 325uint32 ReliableQuicStream::ProcessHeaderData() { 326 if (decompressed_headers_.empty()) { 327 return 0; 328 } 329 330 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 331 decompressed_headers_.length()); 332 if (bytes_processed == decompressed_headers_.length()) { 333 decompressed_headers_.clear(); 334 } else { 335 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 336 } 337 return bytes_processed; 338} 339 340void ReliableQuicStream::OnDecompressorAvailable() { 341 DCHECK_EQ(headers_id_, 342 session_->decompressor()->current_header_id()); 343 DCHECK(!headers_decompressed_); 344 DCHECK_EQ(0u, decompressed_headers_.length()); 345 346 size_t total_bytes_consumed = 0; 347 struct iovec iovecs[5]; 348 while (!headers_decompressed_) { 349 size_t num_iovecs = 350 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); 351 352 if (num_iovecs == 0) { 353 return; 354 } 355 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { 356 total_bytes_consumed += session_->decompressor()->DecompressData( 357 StringPiece(static_cast<char*>(iovecs[i].iov_base), 358 iovecs[i].iov_len), this); 359 360 headers_decompressed_ = 361 session_->decompressor()->current_header_id() != headers_id_; 362 } 363 } 364 365 // Either the headers are complete, or the all data as been consumed. 366 sequencer_.MarkConsumed(total_bytes_consumed); 367 368 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 369 370 if (headers_decompressed_ && decompressed_headers_.empty()) { 371 sequencer_.FlushBufferedFrames(); 372 } 373} 374 375bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 376 data.AppendToString(&decompressed_headers_); 377 return true; 378} 379 380void ReliableQuicStream::OnDecompressionError() { 381 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 382} 383 384 385void ReliableQuicStream::CloseWriteSide() { 386 if (write_side_closed_) { 387 return; 388 } 389 DLOG(INFO) << "Done writing to stream " << id(); 390 391 write_side_closed_ = true; 392 if (read_side_closed_) { 393 DLOG(INFO) << "Closing stream: " << id(); 394 session_->CloseStream(id()); 395 } 396} 397 398void ReliableQuicStream::OnClose() { 399 CloseReadSide(); 400 CloseWriteSide(); 401 402 if (visitor_) { 403 Visitor* visitor = visitor_; 404 // Calling Visitor::OnClose() may result the destruction of the visitor, 405 // so we need to ensure we don't call it again. 406 visitor_ = NULL; 407 visitor->OnClose(this); 408 } 409} 410 411} // namespace net 412