reliable_quic_stream.cc revision 58537e28ecd584eab876aee8be7156509866d23a
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9 10using base::StringPiece; 11using std::min; 12 13namespace net { 14 15namespace { 16 17// This is somewhat arbitrary. It's possible, but unlikely, we will either fail 18// to set a priority client-side, or cancel a stream before stripping the 19// priority from the wire server-side. In either case, start out with a 20// priority in the middle. 21QuicPriority kDefaultPriority = 3; 22 23// Appends bytes from data into partial_data_buffer. Once partial_data_buffer 24// reaches 4 bytes, copies the data into 'result' and clears 25// partial_data_buffer. 26// Returns the number of bytes consumed. 27uint32 StripUint32(const char* data, uint32 data_len, 28 string* partial_data_buffer, 29 uint32* result) { 30 DCHECK_GT(4u, partial_data_buffer->length()); 31 size_t missing_size = 4 - partial_data_buffer->length(); 32 if (data_len < missing_size) { 33 StringPiece(data, data_len).AppendToString(partial_data_buffer); 34 return data_len; 35 } 36 StringPiece(data, missing_size).AppendToString(partial_data_buffer); 37 DCHECK_EQ(4u, partial_data_buffer->length()); 38 memcpy(result, partial_data_buffer->data(), 4); 39 partial_data_buffer->clear(); 40 return missing_size; 41} 42 43} // namespace 44 45ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 46 QuicSession* session) 47 : sequencer_(this), 48 id_(id), 49 session_(session), 50 visitor_(NULL), 51 stream_bytes_read_(0), 52 stream_bytes_written_(0), 53 headers_decompressed_(false), 54 priority_(kDefaultPriority), 55 headers_id_(0), 56 decompression_failed_(false), 57 stream_error_(QUIC_STREAM_NO_ERROR), 58 connection_error_(QUIC_NO_ERROR), 59 read_side_closed_(false), 60 write_side_closed_(false), 61 priority_parsed_(false), 62 fin_buffered_(false), 63 fin_sent_(false) { 64} 65 66ReliableQuicStream::~ReliableQuicStream() { 67} 68 69bool ReliableQuicStream::WillAcceptStreamFrame( 70 const QuicStreamFrame& frame) const { 71 if (read_side_closed_) { 72 return true; 73 } 74 if (frame.stream_id != id_) { 75 LOG(ERROR) << "Error!"; 76 return false; 77 } 78 return sequencer_.WillAcceptStreamFrame(frame); 79} 80 81bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 82 DCHECK_EQ(frame.stream_id, id_); 83 if (read_side_closed_) { 84 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 85 // We don't want to be reading: blackhole the data. 86 return true; 87 } 88 // Note: This count include duplicate data received. 89 stream_bytes_read_ += frame.data.length(); 90 91 bool accepted = sequencer_.OnStreamFrame(frame); 92 93 return accepted; 94} 95 96void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 97 stream_error_ = error; 98 TerminateFromPeer(false); // Full close. 99} 100 101void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 102 if (read_side_closed_ && write_side_closed_) { 103 return; 104 } 105 if (error != QUIC_NO_ERROR) { 106 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 107 connection_error_ = error; 108 } 109 110 if (from_peer) { 111 TerminateFromPeer(false); 112 } else { 113 CloseWriteSide(); 114 CloseReadSide(); 115 } 116} 117 118void ReliableQuicStream::TerminateFromPeer(bool half_close) { 119 if (!half_close) { 120 CloseWriteSide(); 121 } 122 CloseReadSide(); 123} 124 125void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 126 stream_error_ = error; 127 if (error != QUIC_STREAM_NO_ERROR) { 128 // Sending a RstStream results in calling CloseStream. 129 session()->SendRstStream(id(), error); 130 } else { 131 session_->CloseStream(id()); 132 } 133} 134 135size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 136 if (headers_decompressed_ && decompressed_headers_.empty()) { 137 return sequencer_.Readv(iov, iov_len); 138 } 139 size_t bytes_consumed = 0; 140 size_t iov_index = 0; 141 while (iov_index < iov_len && 142 decompressed_headers_.length() > bytes_consumed) { 143 size_t bytes_to_read = min(iov[iov_index].iov_len, 144 decompressed_headers_.length() - bytes_consumed); 145 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 146 memcpy(iov_ptr, 147 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 148 bytes_consumed += bytes_to_read; 149 ++iov_index; 150 } 151 decompressed_headers_.erase(0, bytes_consumed); 152 return bytes_consumed; 153} 154 155int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 156 if (headers_decompressed_ && decompressed_headers_.empty()) { 157 return sequencer_.GetReadableRegions(iov, iov_len); 158 } 159 if (iov_len == 0) { 160 return 0; 161 } 162 iov[0].iov_base = static_cast<void*>( 163 const_cast<char*>(decompressed_headers_.data())); 164 iov[0].iov_len = decompressed_headers_.length(); 165 return 1; 166} 167 168bool ReliableQuicStream::IsHalfClosed() const { 169 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 170 return false; 171 } 172 return sequencer_.IsHalfClosed(); 173} 174 175bool ReliableQuicStream::HasBytesToRead() const { 176 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 177} 178 179const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 180 return session_->peer_address(); 181} 182 183QuicSpdyCompressor* ReliableQuicStream::compressor() { 184 return session_->compressor(); 185} 186 187bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { 188 return session_->GetSSLInfo(ssl_info); 189} 190 191QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 192 DCHECK(data.size() > 0 || fin); 193 return WriteOrBuffer(data, fin); 194} 195 196QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 197 DCHECK(!fin_buffered_); 198 199 QuicConsumedData consumed_data(0, false); 200 fin_buffered_ = fin; 201 202 if (queued_data_.empty()) { 203 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 204 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 205 } 206 207 // If there's unconsumed data or an unconsumed fin, queue it. 208 if (consumed_data.bytes_consumed < data.length() || 209 (fin && !consumed_data.fin_consumed)) { 210 queued_data_.push_back( 211 string(data.data() + consumed_data.bytes_consumed, 212 data.length() - consumed_data.bytes_consumed)); 213 } 214 215 return QuicConsumedData(data.size(), true); 216} 217 218void ReliableQuicStream::OnCanWrite() { 219 bool fin = false; 220 while (!queued_data_.empty()) { 221 const string& data = queued_data_.front(); 222 if (queued_data_.size() == 1 && fin_buffered_) { 223 fin = true; 224 } 225 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 226 if (consumed_data.bytes_consumed == data.size() && 227 fin == consumed_data.fin_consumed) { 228 queued_data_.pop_front(); 229 } else { 230 queued_data_.front().erase(0, consumed_data.bytes_consumed); 231 break; 232 } 233 } 234} 235 236QuicConsumedData ReliableQuicStream::WriteDataInternal( 237 StringPiece data, bool fin) { 238 if (write_side_closed_) { 239 DLOG(ERROR) << "Attempt to write when the write side is closed"; 240 return QuicConsumedData(0, false); 241 } 242 243 QuicConsumedData consumed_data = 244 session()->WriteData(id(), data, stream_bytes_written_, fin); 245 stream_bytes_written_ += consumed_data.bytes_consumed; 246 if (consumed_data.bytes_consumed == data.length()) { 247 if (fin && consumed_data.fin_consumed) { 248 fin_sent_ = true; 249 CloseWriteSide(); 250 } else if (fin && !consumed_data.fin_consumed) { 251 session_->MarkWriteBlocked(id()); 252 } 253 } else { 254 session_->MarkWriteBlocked(id()); 255 } 256 return consumed_data; 257} 258 259void ReliableQuicStream::CloseReadSide() { 260 if (read_side_closed_) { 261 return; 262 } 263 DLOG(INFO) << "Done reading from stream " << id(); 264 265 read_side_closed_ = true; 266 if (write_side_closed_) { 267 DLOG(INFO) << "Closing stream: " << id(); 268 session_->CloseStream(id()); 269 } 270} 271 272uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 273 if (id() == kCryptoStreamId) { 274 if (data_len == 0) { 275 return 0; 276 } 277 // The crypto stream does not use compression. 278 return ProcessData(data, data_len); 279 } 280 281 uint32 total_bytes_consumed = 0; 282 if (headers_id_ == 0u) { 283 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); 284 data += total_bytes_consumed; 285 data_len -= total_bytes_consumed; 286 if (data_len == 0) { 287 return total_bytes_consumed; 288 } 289 } 290 DCHECK_NE(0u, headers_id_); 291 292 // Once the headers are finished, we simply pass the data through. 293 if (headers_decompressed_) { 294 // Some buffered header data remains. 295 if (!decompressed_headers_.empty()) { 296 ProcessHeaderData(); 297 } 298 if (decompressed_headers_.empty()) { 299 DVLOG(1) << "Delegating procesing to ProcessData"; 300 total_bytes_consumed += ProcessData(data, data_len); 301 } 302 return total_bytes_consumed; 303 } 304 305 QuicHeaderId current_header_id = 306 session_->decompressor()->current_header_id(); 307 // Ensure that this header id looks sane. 308 if (headers_id_ < current_header_id || 309 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 310 DVLOG(1) << "Invalid headers for stream: " << id() 311 << " header_id: " << headers_id_ 312 << " current_header_id: " << current_header_id; 313 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 314 return total_bytes_consumed; 315 } 316 317 // If we are head-of-line blocked on decompression, then back up. 318 if (current_header_id != headers_id_) { 319 session_->MarkDecompressionBlocked(headers_id_, id()); 320 DVLOG(1) << "Unable to decompress header data for stream: " << id() 321 << " header_id: " << headers_id_; 322 return total_bytes_consumed; 323 } 324 325 // Decompressed data will be delivered to decompressed_headers_. 326 size_t bytes_consumed = session_->decompressor()->DecompressData( 327 StringPiece(data, data_len), this); 328 DCHECK_NE(0u, bytes_consumed); 329 if (bytes_consumed > data_len) { 330 DCHECK(false) << "DecompressData returned illegal value"; 331 OnDecompressionError(); 332 return total_bytes_consumed; 333 } 334 total_bytes_consumed += bytes_consumed; 335 data += bytes_consumed; 336 data_len -= bytes_consumed; 337 338 if (decompression_failed_) { 339 // The session will have been closed in OnDecompressionError. 340 return total_bytes_consumed; 341 } 342 343 // Headers are complete if the decompressor has moved on to the 344 // next stream. 345 headers_decompressed_ = 346 session_->decompressor()->current_header_id() != headers_id_; 347 if (!headers_decompressed_) { 348 DCHECK_EQ(0u, data_len); 349 } 350 351 ProcessHeaderData(); 352 353 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 354 return total_bytes_consumed; 355 } 356 357 // We have processed all of the decompressed data but we might 358 // have some more raw data to process. 359 if (data_len > 0) { 360 total_bytes_consumed += ProcessData(data, data_len); 361 } 362 363 // The sequencer will push any additional buffered frames if this data 364 // has been completely consumed. 365 return total_bytes_consumed; 366} 367 368uint32 ReliableQuicStream::ProcessHeaderData() { 369 if (decompressed_headers_.empty()) { 370 return 0; 371 } 372 373 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 374 decompressed_headers_.length()); 375 if (bytes_processed == decompressed_headers_.length()) { 376 decompressed_headers_.clear(); 377 } else { 378 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 379 } 380 return bytes_processed; 381} 382 383void ReliableQuicStream::OnDecompressorAvailable() { 384 DCHECK_EQ(headers_id_, 385 session_->decompressor()->current_header_id()); 386 DCHECK(!headers_decompressed_); 387 DCHECK(!decompression_failed_); 388 DCHECK_EQ(0u, decompressed_headers_.length()); 389 390 while (!headers_decompressed_) { 391 struct iovec iovec; 392 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { 393 return; 394 } 395 396 size_t bytes_consumed = session_->decompressor()->DecompressData( 397 StringPiece(static_cast<char*>(iovec.iov_base), 398 iovec.iov_len), 399 this); 400 DCHECK_LE(bytes_consumed, iovec.iov_len); 401 if (decompression_failed_) { 402 return; 403 } 404 sequencer_.MarkConsumed(bytes_consumed); 405 406 headers_decompressed_ = 407 session_->decompressor()->current_header_id() != headers_id_; 408 } 409 410 // Either the headers are complete, or the all data as been consumed. 411 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 412 if (IsHalfClosed()) { 413 TerminateFromPeer(true); 414 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 415 sequencer_.FlushBufferedFrames(); 416 } 417} 418 419bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 420 data.AppendToString(&decompressed_headers_); 421 return true; 422} 423 424void ReliableQuicStream::OnDecompressionError() { 425 DCHECK(!decompression_failed_); 426 decompression_failed_ = true; 427 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 428} 429 430 431void ReliableQuicStream::CloseWriteSide() { 432 if (write_side_closed_) { 433 return; 434 } 435 DLOG(INFO) << "Done writing to stream " << id(); 436 437 write_side_closed_ = true; 438 if (read_side_closed_) { 439 DLOG(INFO) << "Closing stream: " << id(); 440 session_->CloseStream(id()); 441 } 442} 443 444bool ReliableQuicStream::HasBufferedData() { 445 return !queued_data_.empty(); 446} 447 448void ReliableQuicStream::OnClose() { 449 CloseReadSide(); 450 CloseWriteSide(); 451 452 if (visitor_) { 453 Visitor* visitor = visitor_; 454 // Calling Visitor::OnClose() may result the destruction of the visitor, 455 // so we need to ensure we don't call it again. 456 visitor_ = NULL; 457 visitor->OnClose(this); 458 } 459} 460 461uint32 ReliableQuicStream::StripPriorityAndHeaderId( 462 const char* data, uint32 data_len) { 463 uint32 total_bytes_parsed = 0; 464 465 if (!priority_parsed_ && 466 session_->connection()->version() >= QUIC_VERSION_9 && 467 session_->connection()->is_server()) { 468 total_bytes_parsed = StripUint32( 469 data, data_len, &headers_id_and_priority_buffer_, &priority_); 470 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { 471 // TODO(alyssar) check for priority out of bounds. 472 priority_parsed_ = true; 473 } 474 data += total_bytes_parsed; 475 data_len -= total_bytes_parsed; 476 } 477 if (data_len > 0 && headers_id_ == 0u) { 478 // The headers ID has not yet been read. Strip it from the beginning of 479 // the data stream. 480 total_bytes_parsed += StripUint32( 481 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); 482 } 483 return total_bytes_parsed; 484} 485 486} // namespace net 487