reliable_quic_stream.cc revision 7d4cd473f85ac64c3747c96c277f9e506a0d2246
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9 10using base::StringPiece; 11using std::min; 12 13namespace net { 14 15ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 16 QuicSession* session) 17 : sequencer_(this), 18 id_(id), 19 session_(session), 20 visitor_(NULL), 21 stream_bytes_read_(0), 22 stream_bytes_written_(0), 23 headers_decompressed_(false), 24 headers_id_(0), 25 stream_error_(QUIC_STREAM_NO_ERROR), 26 connection_error_(QUIC_NO_ERROR), 27 read_side_closed_(false), 28 write_side_closed_(false), 29 fin_buffered_(false), 30 fin_sent_(false) { 31} 32 33ReliableQuicStream::~ReliableQuicStream() { 34} 35 36bool ReliableQuicStream::WillAcceptStreamFrame( 37 const QuicStreamFrame& frame) const { 38 if (read_side_closed_) { 39 return true; 40 } 41 if (frame.stream_id != id_) { 42 LOG(ERROR) << "Error!"; 43 return false; 44 } 45 return sequencer_.WillAcceptStreamFrame(frame); 46} 47 48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 49 DCHECK_EQ(frame.stream_id, id_); 50 if (read_side_closed_) { 51 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 52 // We don't want to be reading: blackhole the data. 53 return true; 54 } 55 // Note: This count include duplicate data received. 56 stream_bytes_read_ += frame.data.length(); 57 58 bool accepted = sequencer_.OnStreamFrame(frame); 59 60 if (frame.fin) { 61 sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size()); 62 } 63 64 return accepted; 65} 66 67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 68 stream_error_ = error; 69 TerminateFromPeer(false); // Full close. 70} 71 72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 73 if (read_side_closed_ && write_side_closed_) { 74 return; 75 } 76 if (error != QUIC_NO_ERROR) { 77 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 78 connection_error_ = error; 79 } 80 81 if (from_peer) { 82 TerminateFromPeer(false); 83 } else { 84 CloseWriteSide(); 85 CloseReadSide(); 86 } 87} 88 89void ReliableQuicStream::TerminateFromPeer(bool half_close) { 90 if (!half_close) { 91 CloseWriteSide(); 92 } 93 CloseReadSide(); 94} 95 96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 97 stream_error_ = error; 98 if (error != QUIC_STREAM_NO_ERROR) { 99 // Sending a RstStream results in calling CloseStream. 100 session()->SendRstStream(id(), error); 101 } else { 102 session_->CloseStream(id()); 103 } 104} 105 106size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 107 if (headers_decompressed_ && decompressed_headers_.empty()) { 108 return sequencer_.Readv(iov, iov_len); 109 } 110 size_t bytes_consumed = 0; 111 size_t iov_index = 0; 112 while (iov_index < iov_len && 113 decompressed_headers_.length() > bytes_consumed) { 114 size_t bytes_to_read = min(iov[iov_index].iov_len, 115 decompressed_headers_.length() - bytes_consumed); 116 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 117 memcpy(iov_ptr, 118 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 119 bytes_consumed += bytes_to_read; 120 ++iov_index; 121 } 122 decompressed_headers_.erase(0, bytes_consumed); 123 return bytes_consumed; 124} 125 126int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 127 if (headers_decompressed_ && decompressed_headers_.empty()) { 128 return sequencer_.GetReadableRegions(iov, iov_len); 129 } 130 if (iov_len == 0) { 131 return 0; 132 } 133 iov[0].iov_base = static_cast<void*>( 134 const_cast<char*>(decompressed_headers_.data())); 135 iov[0].iov_len = decompressed_headers_.length(); 136 return 1; 137} 138 139bool ReliableQuicStream::IsHalfClosed() const { 140 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 141 return false; 142 } 143 return sequencer_.IsHalfClosed(); 144} 145 146bool ReliableQuicStream::HasBytesToRead() const { 147 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 148} 149 150const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 151 return session_->peer_address(); 152} 153 154QuicSpdyCompressor* ReliableQuicStream::compressor() { 155 return session_->compressor(); 156} 157 158QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 159 DCHECK(data.size() > 0 || fin); 160 return WriteOrBuffer(data, fin); 161} 162 163QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 164 DCHECK(!fin_buffered_); 165 166 QuicConsumedData consumed_data(0, false); 167 fin_buffered_ = fin; 168 169 if (queued_data_.empty()) { 170 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 171 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 172 } 173 174 // If there's unconsumed data or an unconsumed fin, queue it. 175 if (consumed_data.bytes_consumed < data.length() || 176 (fin && !consumed_data.fin_consumed)) { 177 queued_data_.push_back( 178 string(data.data() + consumed_data.bytes_consumed, 179 data.length() - consumed_data.bytes_consumed)); 180 } 181 182 return QuicConsumedData(data.size(), true); 183} 184 185void ReliableQuicStream::OnCanWrite() { 186 bool fin = false; 187 while (!queued_data_.empty()) { 188 const string& data = queued_data_.front(); 189 if (queued_data_.size() == 1 && fin_buffered_) { 190 fin = true; 191 } 192 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 193 if (consumed_data.bytes_consumed == data.size() && 194 fin == consumed_data.fin_consumed) { 195 queued_data_.pop_front(); 196 } else { 197 queued_data_.front().erase(0, consumed_data.bytes_consumed); 198 break; 199 } 200 } 201} 202 203QuicConsumedData ReliableQuicStream::WriteDataInternal( 204 StringPiece data, bool fin) { 205 if (write_side_closed_) { 206 DLOG(ERROR) << "Attempt to write when the write side is closed"; 207 return QuicConsumedData(0, false); 208 } 209 210 QuicConsumedData consumed_data = 211 session()->WriteData(id(), data, stream_bytes_written_, fin); 212 stream_bytes_written_ += consumed_data.bytes_consumed; 213 if (consumed_data.bytes_consumed == data.length()) { 214 if (fin && consumed_data.fin_consumed) { 215 fin_sent_ = true; 216 CloseWriteSide(); 217 } else if (fin && !consumed_data.fin_consumed) { 218 session_->MarkWriteBlocked(id()); 219 } 220 } else { 221 session_->MarkWriteBlocked(id()); 222 } 223 return consumed_data; 224} 225 226void ReliableQuicStream::CloseReadSide() { 227 if (read_side_closed_) { 228 return; 229 } 230 DLOG(INFO) << "Done reading from stream " << id(); 231 232 read_side_closed_ = true; 233 if (write_side_closed_) { 234 DLOG(INFO) << "Closing stream: " << id(); 235 session_->CloseStream(id()); 236 } 237} 238 239uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 240 if (id() == kCryptoStreamId) { 241 // The crypto stream does not use compression. 242 return ProcessData(data, data_len); 243 } 244 uint32 total_bytes_consumed = 0; 245 if (headers_id_ == 0u) { 246 // The headers ID has not yet been read. Strip it from the beginning of 247 // the data stream. 248 DCHECK_GT(4u, headers_id_buffer_.length()); 249 size_t missing_size = 4 - headers_id_buffer_.length(); 250 if (data_len < missing_size) { 251 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); 252 return data_len; 253 } 254 total_bytes_consumed += missing_size; 255 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 256 DCHECK_EQ(4u, headers_id_buffer_.length()); 257 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 258 headers_id_buffer_.clear(); 259 data += missing_size; 260 data_len -= missing_size; 261 } 262 DCHECK_NE(0u, headers_id_); 263 264 // Once the headers are finished, we simply pass the data through. 265 if (headers_decompressed_) { 266 // Some buffered header data remains. 267 if (!decompressed_headers_.empty()) { 268 ProcessHeaderData(); 269 } 270 if (decompressed_headers_.empty()) { 271 DVLOG(1) << "Delegating procesing to ProcessData"; 272 total_bytes_consumed += ProcessData(data, data_len); 273 } 274 return total_bytes_consumed; 275 } 276 277 QuicHeaderId current_header_id = 278 session_->decompressor()->current_header_id(); 279 // Ensure that this header id looks sane. 280 if (headers_id_ < current_header_id || 281 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 282 DVLOG(1) << "Invalid headers for stream: " << id() 283 << " header_id: " << headers_id_ 284 << " current_header_id: " << current_header_id; 285 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 286 return total_bytes_consumed; 287 } 288 289 // If we are head-of-line blocked on decompression, then back up. 290 if (current_header_id != headers_id_) { 291 session_->MarkDecompressionBlocked(headers_id_, id()); 292 DVLOG(1) << "Unable to decompress header data for stream: " << id() 293 << " header_id: " << headers_id_; 294 return total_bytes_consumed; 295 } 296 297 // Decompressed data will be delivered to decompressed_headers_. 298 size_t bytes_consumed = session_->decompressor()->DecompressData( 299 StringPiece(data, data_len), this); 300 total_bytes_consumed += bytes_consumed; 301 302 // Headers are complete if the decompressor has moved on to the 303 // next stream. 304 headers_decompressed_ = 305 session_->decompressor()->current_header_id() != headers_id_; 306 307 ProcessHeaderData(); 308 309 // We have processed all of the decompressed data but we might 310 // have some more raw data to process. 311 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { 312 total_bytes_consumed += ProcessData(data + bytes_consumed, 313 data_len - bytes_consumed); 314 } 315 316 // The sequencer will push any additional buffered frames if this data 317 // has been completely consumed. 318 return total_bytes_consumed; 319} 320 321uint32 ReliableQuicStream::ProcessHeaderData() { 322 if (decompressed_headers_.empty()) { 323 return 0; 324 } 325 326 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 327 decompressed_headers_.length()); 328 if (bytes_processed == decompressed_headers_.length()) { 329 decompressed_headers_.clear(); 330 } else { 331 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 332 } 333 return bytes_processed; 334} 335 336void ReliableQuicStream::OnDecompressorAvailable() { 337 DCHECK_EQ(headers_id_, 338 session_->decompressor()->current_header_id()); 339 DCHECK(!headers_decompressed_); 340 DCHECK_EQ(0u, decompressed_headers_.length()); 341 342 size_t total_bytes_consumed = 0; 343 struct iovec iovecs[5]; 344 while (!headers_decompressed_) { 345 size_t num_iovecs = 346 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); 347 348 if (num_iovecs == 0) { 349 return; 350 } 351 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { 352 total_bytes_consumed += session_->decompressor()->DecompressData( 353 StringPiece(static_cast<char*>(iovecs[i].iov_base), 354 iovecs[i].iov_len), this); 355 356 headers_decompressed_ = 357 session_->decompressor()->current_header_id() != headers_id_; 358 } 359 } 360 361 // Either the headers are complete, or the all data as been consumed. 362 sequencer_.MarkConsumed(total_bytes_consumed); 363 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 364 if (IsHalfClosed()) { 365 TerminateFromPeer(true); 366 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 367 sequencer_.FlushBufferedFrames(); 368 } 369} 370 371bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 372 data.AppendToString(&decompressed_headers_); 373 return true; 374} 375 376void ReliableQuicStream::OnDecompressionError() { 377 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 378} 379 380 381void ReliableQuicStream::CloseWriteSide() { 382 if (write_side_closed_) { 383 return; 384 } 385 DLOG(INFO) << "Done writing to stream " << id(); 386 387 write_side_closed_ = true; 388 if (read_side_closed_) { 389 DLOG(INFO) << "Closing stream: " << id(); 390 session_->CloseStream(id()); 391 } 392} 393 394void ReliableQuicStream::OnClose() { 395 CloseReadSide(); 396 CloseWriteSide(); 397 398 if (visitor_) { 399 Visitor* visitor = visitor_; 400 // Calling Visitor::OnClose() may result the destruction of the visitor, 401 // so we need to ensure we don't call it again. 402 visitor_ = NULL; 403 visitor->OnClose(this); 404 } 405} 406 407} // namespace net 408