reliable_quic_stream.cc revision 90dce4d38c5ff5333bea97d859d4e484e27edf0c
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9 10using base::StringPiece; 11using std::min; 12 13namespace net { 14 15ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 16 QuicSession* session) 17 : sequencer_(this), 18 id_(id), 19 session_(session), 20 visitor_(NULL), 21 stream_bytes_read_(0), 22 stream_bytes_written_(0), 23 headers_decompressed_(false), 24 headers_id_(0), 25 stream_error_(QUIC_STREAM_NO_ERROR), 26 connection_error_(QUIC_NO_ERROR), 27 read_side_closed_(false), 28 write_side_closed_(false), 29 fin_buffered_(false), 30 fin_sent_(false) { 31} 32 33ReliableQuicStream::~ReliableQuicStream() { 34} 35 36bool ReliableQuicStream::WillAcceptStreamFrame( 37 const QuicStreamFrame& frame) const { 38 if (read_side_closed_) { 39 return true; 40 } 41 if (frame.stream_id != id_) { 42 LOG(ERROR) << "Error!"; 43 return false; 44 } 45 return sequencer_.WillAcceptStreamFrame(frame); 46} 47 48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 49 DCHECK_EQ(frame.stream_id, id_); 50 if (read_side_closed_) { 51 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 52 // We don't want to be reading: blackhole the data. 53 return true; 54 } 55 // Note: This count include duplicate data received. 56 stream_bytes_read_ += frame.data.length(); 57 58 bool accepted = sequencer_.OnStreamFrame(frame); 59 60 if (frame.fin) { 61 sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), true); 62 } 63 64 return accepted; 65} 66 67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 68 stream_error_ = error; 69 TerminateFromPeer(false); // Full close. 70} 71 72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 73 if (IsClosed()) { 74 return; 75 } 76 if (error != QUIC_NO_ERROR) { 77 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 78 connection_error_ = error; 79 } 80 81 if (from_peer) { 82 TerminateFromPeer(false); 83 } else { 84 CloseWriteSide(); 85 CloseReadSide(); 86 } 87} 88 89void ReliableQuicStream::TerminateFromPeer(bool half_close) { 90 if (!half_close) { 91 CloseWriteSide(); 92 } 93 CloseReadSide(); 94} 95 96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 97 stream_error_ = error; 98 session()->SendRstStream(id(), error); 99} 100 101int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { 102 if (headers_decompressed_ && decompressed_headers_.empty()) { 103 return sequencer_.Readv(iov, iov_len); 104 } 105 size_t bytes_consumed = 0; 106 int iov_index = 0; 107 while (iov_index < iov_len && 108 decompressed_headers_.length() > bytes_consumed) { 109 int bytes_to_read = min(iov[iov_index].iov_len, 110 decompressed_headers_.length() - bytes_consumed); 111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 112 memcpy(iov_ptr, 113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 114 bytes_consumed += bytes_to_read; 115 ++iov_index; 116 } 117 decompressed_headers_.erase(0, bytes_consumed); 118 return bytes_consumed; 119} 120 121int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { 122 if (headers_decompressed_ && decompressed_headers_.empty()) { 123 return sequencer_.GetReadableRegions(iov, iov_len); 124 } 125 if (iov_len == 0) { 126 return 0; 127 } 128 iov[0].iov_base = static_cast<void*>( 129 const_cast<char*>(decompressed_headers_.data())); 130 iov[0].iov_len = decompressed_headers_.length(); 131 return 1; 132} 133 134bool ReliableQuicStream::IsHalfClosed() const { 135 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 136 return false; 137 } 138 return sequencer_.IsHalfClosed(); 139} 140 141bool ReliableQuicStream::IsClosed() const { 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); 143} 144 145bool ReliableQuicStream::HasBytesToRead() const { 146 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 147} 148 149const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 150 return session_->peer_address(); 151} 152 153QuicSpdyCompressor* ReliableQuicStream::compressor() { 154 return session_->compressor(); 155} 156 157QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 158 return WriteOrBuffer(data, fin); 159} 160 161QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 162 DCHECK(!fin_buffered_); 163 164 QuicConsumedData consumed_data(0, false); 165 fin_buffered_ = fin; 166 167 if (queued_data_.empty()) { 168 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 169 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 170 } 171 172 // If there's unconsumed data or an unconsumed fin, queue it. 173 if (consumed_data.bytes_consumed < data.length() || 174 (fin && !consumed_data.fin_consumed)) { 175 queued_data_.push_back( 176 string(data.data() + consumed_data.bytes_consumed, 177 data.length() - consumed_data.bytes_consumed)); 178 } 179 180 return QuicConsumedData(data.size(), true); 181} 182 183void ReliableQuicStream::OnCanWrite() { 184 bool fin = false; 185 while (!queued_data_.empty()) { 186 const string& data = queued_data_.front(); 187 if (queued_data_.size() == 1 && fin_buffered_) { 188 fin = true; 189 } 190 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 191 if (consumed_data.bytes_consumed == data.size() && 192 fin == consumed_data.fin_consumed) { 193 queued_data_.pop_front(); 194 } else { 195 queued_data_.front().erase(0, consumed_data.bytes_consumed); 196 break; 197 } 198 } 199} 200 201QuicConsumedData ReliableQuicStream::WriteDataInternal( 202 StringPiece data, bool fin) { 203 if (write_side_closed_) { 204 DLOG(ERROR) << "Attempt to write when the write side is closed"; 205 return QuicConsumedData(0, false); 206 } 207 208 QuicConsumedData consumed_data = 209 session()->WriteData(id(), data, stream_bytes_written_, fin); 210 stream_bytes_written_ += consumed_data.bytes_consumed; 211 if (consumed_data.bytes_consumed == data.length()) { 212 if (fin && consumed_data.fin_consumed) { 213 fin_sent_ = true; 214 CloseWriteSide(); 215 } 216 } else { 217 session_->MarkWriteBlocked(id()); 218 } 219 return consumed_data; 220} 221 222void ReliableQuicStream::CloseReadSide() { 223 if (read_side_closed_) { 224 return; 225 } 226 DLOG(INFO) << "Done reading from stream " << id(); 227 228 read_side_closed_ = true; 229 if (write_side_closed_) { 230 DLOG(INFO) << "Closing stream: " << id(); 231 session_->CloseStream(id()); 232 } 233} 234 235uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 236 if (id() == kCryptoStreamId) { 237 // The crypto stream does not use compression. 238 return ProcessData(data, data_len); 239 } 240 uint32 total_bytes_consumed = 0; 241 if (headers_id_ == 0u) { 242 // The headers ID has not yet been read. Strip it from the beginning of 243 // the data stream. 244 DCHECK_GT(4u, headers_id_buffer_.length()); 245 size_t missing_size = 4 - headers_id_buffer_.length(); 246 if (data_len < missing_size) { 247 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); 248 return data_len; 249 } 250 total_bytes_consumed += missing_size; 251 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 252 DCHECK_EQ(4u, headers_id_buffer_.length()); 253 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 254 headers_id_buffer_.clear(); 255 data += missing_size; 256 data_len -= missing_size; 257 } 258 DCHECK_NE(0u, headers_id_); 259 260 // Once the headers are finished, we simply pass the data through. 261 if (headers_decompressed_) { 262 // Some buffered header data remains. 263 if (!decompressed_headers_.empty()) { 264 ProcessHeaderData(); 265 } 266 if (decompressed_headers_.empty()) { 267 DVLOG(1) << "Delegating procesing to ProcessData"; 268 total_bytes_consumed += ProcessData(data, data_len); 269 } 270 return total_bytes_consumed; 271 } 272 273 QuicHeaderId current_header_id = 274 session_->decompressor()->current_header_id(); 275 // Ensure that this header id looks sane. 276 if (headers_id_ < current_header_id || 277 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 278 DVLOG(1) << "Invalid headers for stream: " << id() 279 << " header_id: " << headers_id_ 280 << " current_header_id: " << current_header_id; 281 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 282 return total_bytes_consumed; 283 } 284 285 // If we are head-of-line blocked on decompression, then back up. 286 if (current_header_id != headers_id_) { 287 session_->MarkDecompressionBlocked(headers_id_, id()); 288 DVLOG(1) << "Unable to decompress header data for stream: " << id() 289 << " header_id: " << headers_id_; 290 return total_bytes_consumed; 291 } 292 293 // Decompressed data will be delivered to decompressed_headers_. 294 size_t bytes_consumed = session_->decompressor()->DecompressData( 295 StringPiece(data, data_len), this); 296 total_bytes_consumed += bytes_consumed; 297 298 // Headers are complete if the decompressor has moved on to the 299 // next stream. 300 headers_decompressed_ = 301 session_->decompressor()->current_header_id() != headers_id_; 302 303 ProcessHeaderData(); 304 305 // We have processed all of the decompressed data but we might 306 // have some more raw data to process. 307 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { 308 total_bytes_consumed += ProcessData(data + bytes_consumed, 309 data_len - bytes_consumed); 310 } 311 312 // The sequencer will push any additional buffered frames if this data 313 // has been completely consumed. 314 return total_bytes_consumed; 315} 316 317uint32 ReliableQuicStream::ProcessHeaderData() { 318 if (decompressed_headers_.empty()) { 319 return 0; 320 } 321 322 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 323 decompressed_headers_.length()); 324 if (bytes_processed == decompressed_headers_.length()) { 325 decompressed_headers_.clear(); 326 } else { 327 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 328 } 329 return bytes_processed; 330} 331 332void ReliableQuicStream::OnDecompressorAvailable() { 333 DCHECK_EQ(headers_id_, 334 session_->decompressor()->current_header_id()); 335 DCHECK(!headers_decompressed_); 336 DCHECK_EQ(0u, decompressed_headers_.length()); 337 338 size_t total_bytes_consumed = 0; 339 struct iovec iovecs[5]; 340 while (!headers_decompressed_) { 341 size_t num_iovecs = 342 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); 343 344 if (num_iovecs == 0) { 345 return; 346 } 347 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { 348 total_bytes_consumed += session_->decompressor()->DecompressData( 349 StringPiece(static_cast<char*>(iovecs[i].iov_base), 350 iovecs[i].iov_len), this); 351 352 headers_decompressed_ = 353 session_->decompressor()->current_header_id() != headers_id_; 354 } 355 } 356 357 // Either the headers are complete, or the all data as been consumed. 358 sequencer_.MarkConsumed(total_bytes_consumed); 359 360 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 361 362 if (headers_decompressed_ && decompressed_headers_.empty()) { 363 sequencer_.FlushBufferedFrames(); 364 } 365} 366 367bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 368 data.AppendToString(&decompressed_headers_); 369 return true; 370} 371 372void ReliableQuicStream::OnDecompressionError() { 373 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 374} 375 376 377void ReliableQuicStream::CloseWriteSide() { 378 if (write_side_closed_) { 379 return; 380 } 381 DLOG(INFO) << "Done writing to stream " << id(); 382 383 write_side_closed_ = true; 384 if (read_side_closed_) { 385 DLOG(INFO) << "Closing stream: " << id(); 386 session_->CloseStream(id()); 387 } 388} 389 390void ReliableQuicStream::OnClose() { 391 CloseReadSide(); 392 CloseWriteSide(); 393 394 if (visitor_) { 395 Visitor* visitor = visitor_; 396 // Calling Visitor::OnClose() may result the destruction of the visitor, 397 // so we need to ensure we don't call it again. 398 visitor_ = NULL; 399 visitor->OnClose(this); 400 } 401} 402 403} // namespace net 404