reliable_quic_stream.cc revision a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9#include "net/spdy/write_blocked_list.h" 10 11using base::StringPiece; 12using std::min; 13 14namespace net { 15 16namespace { 17 18// This is somewhat arbitrary. It's possible, but unlikely, we will either fail 19// to set a priority client-side, or cancel a stream before stripping the 20// priority from the wire server-side. In either case, start out with a 21// priority in the middle. 22QuicPriority kDefaultPriority = 3; 23 24// Appends bytes from data into partial_data_buffer. Once partial_data_buffer 25// reaches 4 bytes, copies the data into 'result' and clears 26// partial_data_buffer. 27// Returns the number of bytes consumed. 28uint32 StripUint32(const char* data, uint32 data_len, 29 string* partial_data_buffer, 30 uint32* result) { 31 DCHECK_GT(4u, partial_data_buffer->length()); 32 size_t missing_size = 4 - partial_data_buffer->length(); 33 if (data_len < missing_size) { 34 StringPiece(data, data_len).AppendToString(partial_data_buffer); 35 return data_len; 36 } 37 StringPiece(data, missing_size).AppendToString(partial_data_buffer); 38 DCHECK_EQ(4u, partial_data_buffer->length()); 39 memcpy(result, partial_data_buffer->data(), 4); 40 partial_data_buffer->clear(); 41 return missing_size; 42} 43 44} // namespace 45 46ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 47 QuicSession* session) 48 : sequencer_(this), 49 id_(id), 50 session_(session), 51 visitor_(NULL), 52 stream_bytes_read_(0), 53 stream_bytes_written_(0), 54 headers_decompressed_(false), 55 priority_(kDefaultPriority), 56 headers_id_(0), 57 decompression_failed_(false), 58 stream_error_(QUIC_STREAM_NO_ERROR), 59 connection_error_(QUIC_NO_ERROR), 60 read_side_closed_(false), 61 write_side_closed_(false), 62 priority_parsed_(false), 63 fin_buffered_(false), 64 fin_sent_(false), 65 is_server_(session_->is_server()) { 66} 67 68ReliableQuicStream::~ReliableQuicStream() { 69} 70 71bool ReliableQuicStream::WillAcceptStreamFrame( 72 const QuicStreamFrame& frame) const { 73 if (read_side_closed_) { 74 return true; 75 } 76 if (frame.stream_id != id_) { 77 LOG(ERROR) << "Error!"; 78 return false; 79 } 80 return sequencer_.WillAcceptStreamFrame(frame); 81} 82 83bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 84 DCHECK_EQ(frame.stream_id, id_); 85 if (read_side_closed_) { 86 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 87 // We don't want to be reading: blackhole the data. 88 return true; 89 } 90 // Note: This count include duplicate data received. 91 stream_bytes_read_ += frame.data.TotalBufferSize(); 92 93 bool accepted = sequencer_.OnStreamFrame(frame); 94 95 return accepted; 96} 97 98void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 99 stream_error_ = error; 100 CloseWriteSide(); 101 CloseReadSide(); 102} 103 104void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 105 bool from_peer) { 106 if (read_side_closed_ && write_side_closed_) { 107 return; 108 } 109 if (error != QUIC_NO_ERROR) { 110 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 111 connection_error_ = error; 112 } 113 114 CloseWriteSide(); 115 CloseReadSide(); 116} 117 118void ReliableQuicStream::OnFinRead() { 119 DCHECK(sequencer_.IsClosed()); 120 CloseReadSide(); 121} 122 123void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 124 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 125 stream_error_ = error; 126 // Sending a RstStream results in calling CloseStream. 127 session()->SendRstStream(id(), error); 128} 129 130void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 131 session()->connection()->SendConnectionClose(error); 132} 133 134void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 135 const string& details) { 136 session()->connection()->SendConnectionCloseWithDetails(error, details); 137} 138 139size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 140 if (headers_decompressed_ && decompressed_headers_.empty()) { 141 return sequencer_.Readv(iov, iov_len); 142 } 143 size_t bytes_consumed = 0; 144 size_t iov_index = 0; 145 while (iov_index < iov_len && 146 decompressed_headers_.length() > bytes_consumed) { 147 size_t bytes_to_read = min(iov[iov_index].iov_len, 148 decompressed_headers_.length() - bytes_consumed); 149 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 150 memcpy(iov_ptr, 151 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 152 bytes_consumed += bytes_to_read; 153 ++iov_index; 154 } 155 decompressed_headers_.erase(0, bytes_consumed); 156 return bytes_consumed; 157} 158 159int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 160 if (headers_decompressed_ && decompressed_headers_.empty()) { 161 return sequencer_.GetReadableRegions(iov, iov_len); 162 } 163 if (iov_len == 0) { 164 return 0; 165 } 166 iov[0].iov_base = static_cast<void*>( 167 const_cast<char*>(decompressed_headers_.data())); 168 iov[0].iov_len = decompressed_headers_.length(); 169 return 1; 170} 171 172bool ReliableQuicStream::IsDoneReading() const { 173 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 174 return false; 175 } 176 return sequencer_.IsClosed(); 177} 178 179bool ReliableQuicStream::HasBytesToRead() const { 180 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 181} 182 183const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 184 return session_->peer_address(); 185} 186 187QuicSpdyCompressor* ReliableQuicStream::compressor() { 188 return session_->compressor(); 189} 190 191bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { 192 return session_->GetSSLInfo(ssl_info); 193} 194 195QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 196 DCHECK(data.size() > 0 || fin); 197 return WriteOrBuffer(data, fin); 198} 199 200 201void ReliableQuicStream::set_priority(QuicPriority priority) { 202 DCHECK_EQ(0u, stream_bytes_written_); 203 priority_ = priority; 204} 205 206QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 207 DCHECK(!fin_buffered_); 208 209 QuicConsumedData consumed_data(0, false); 210 fin_buffered_ = fin; 211 212 if (queued_data_.empty()) { 213 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 214 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 215 } 216 217 // If there's unconsumed data or an unconsumed fin, queue it. 218 if (consumed_data.bytes_consumed < data.length() || 219 (fin && !consumed_data.fin_consumed)) { 220 queued_data_.push_back( 221 string(data.data() + consumed_data.bytes_consumed, 222 data.length() - consumed_data.bytes_consumed)); 223 } 224 225 return QuicConsumedData(data.size(), true); 226} 227 228void ReliableQuicStream::OnCanWrite() { 229 bool fin = false; 230 while (!queued_data_.empty()) { 231 const string& data = queued_data_.front(); 232 if (queued_data_.size() == 1 && fin_buffered_) { 233 fin = true; 234 } 235 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 236 if (consumed_data.bytes_consumed == data.size() && 237 fin == consumed_data.fin_consumed) { 238 queued_data_.pop_front(); 239 } else { 240 queued_data_.front().erase(0, consumed_data.bytes_consumed); 241 break; 242 } 243 } 244} 245 246QuicConsumedData ReliableQuicStream::WriteDataInternal( 247 StringPiece data, bool fin) { 248 struct iovec iov = {const_cast<char*>(data.data()), 249 static_cast<size_t>(data.size())}; 250 return WritevDataInternal(&iov, 1, fin, NULL); 251} 252 253QuicConsumedData ReliableQuicStream::WritevDataInternal( 254 const struct iovec* iov, 255 int iov_count, 256 bool fin, 257 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 258 if (write_side_closed_) { 259 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 260 return QuicConsumedData(0, false); 261 } 262 263 size_t write_length = 0u; 264 for (int i = 0; i < iov_count; ++i) { 265 write_length += iov[i].iov_len; 266 } 267 QuicConsumedData consumed_data = session()->WritevData( 268 id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate); 269 stream_bytes_written_ += consumed_data.bytes_consumed; 270 if (consumed_data.bytes_consumed == write_length) { 271 if (fin && consumed_data.fin_consumed) { 272 fin_sent_ = true; 273 CloseWriteSide(); 274 } else if (fin && !consumed_data.fin_consumed) { 275 session_->MarkWriteBlocked(id(), EffectivePriority()); 276 } 277 } else { 278 session_->MarkWriteBlocked(id(), EffectivePriority()); 279 } 280 return consumed_data; 281} 282 283QuicPriority ReliableQuicStream::EffectivePriority() const { 284 return priority(); 285} 286 287void ReliableQuicStream::CloseReadSide() { 288 if (read_side_closed_) { 289 return; 290 } 291 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 292 293 read_side_closed_ = true; 294 if (write_side_closed_) { 295 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 296 session_->CloseStream(id()); 297 } 298} 299 300uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 301 DCHECK_NE(0u, data_len); 302 if (id() == kCryptoStreamId) { 303 // The crypto stream does not use compression. 304 return ProcessData(data, data_len); 305 } 306 307 uint32 total_bytes_consumed = 0; 308 if (headers_id_ == 0u) { 309 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); 310 data += total_bytes_consumed; 311 data_len -= total_bytes_consumed; 312 if (data_len == 0 || total_bytes_consumed == 0) { 313 return total_bytes_consumed; 314 } 315 } 316 DCHECK_NE(0u, headers_id_); 317 318 // Once the headers are finished, we simply pass the data through. 319 if (headers_decompressed_) { 320 // Some buffered header data remains. 321 if (!decompressed_headers_.empty()) { 322 ProcessHeaderData(); 323 } 324 if (decompressed_headers_.empty()) { 325 DVLOG(1) << "Delegating procesing to ProcessData"; 326 total_bytes_consumed += ProcessData(data, data_len); 327 } 328 return total_bytes_consumed; 329 } 330 331 QuicHeaderId current_header_id = 332 session_->decompressor()->current_header_id(); 333 // Ensure that this header id looks sane. 334 if (headers_id_ < current_header_id || 335 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 336 DVLOG(1) << ENDPOINT 337 << "Invalid headers for stream: " << id() 338 << " header_id: " << headers_id_ 339 << " current_header_id: " << current_header_id; 340 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 341 return total_bytes_consumed; 342 } 343 344 // If we are head-of-line blocked on decompression, then back up. 345 if (current_header_id != headers_id_) { 346 session_->MarkDecompressionBlocked(headers_id_, id()); 347 DVLOG(1) << ENDPOINT 348 << "Unable to decompress header data for stream: " << id() 349 << " header_id: " << headers_id_; 350 return total_bytes_consumed; 351 } 352 353 // Decompressed data will be delivered to decompressed_headers_. 354 size_t bytes_consumed = session_->decompressor()->DecompressData( 355 StringPiece(data, data_len), this); 356 DCHECK_NE(0u, bytes_consumed); 357 if (bytes_consumed > data_len) { 358 DCHECK(false) << "DecompressData returned illegal value"; 359 OnDecompressionError(); 360 return total_bytes_consumed; 361 } 362 total_bytes_consumed += bytes_consumed; 363 data += bytes_consumed; 364 data_len -= bytes_consumed; 365 366 if (decompression_failed_) { 367 // The session will have been closed in OnDecompressionError. 368 return total_bytes_consumed; 369 } 370 371 // Headers are complete if the decompressor has moved on to the 372 // next stream. 373 headers_decompressed_ = 374 session_->decompressor()->current_header_id() != headers_id_; 375 if (!headers_decompressed_) { 376 DCHECK_EQ(0u, data_len); 377 } 378 379 ProcessHeaderData(); 380 381 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 382 return total_bytes_consumed; 383 } 384 385 // We have processed all of the decompressed data but we might 386 // have some more raw data to process. 387 if (data_len > 0) { 388 total_bytes_consumed += ProcessData(data, data_len); 389 } 390 391 // The sequencer will push any additional buffered frames if this data 392 // has been completely consumed. 393 return total_bytes_consumed; 394} 395 396uint32 ReliableQuicStream::ProcessHeaderData() { 397 if (decompressed_headers_.empty()) { 398 return 0; 399 } 400 401 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 402 decompressed_headers_.length()); 403 if (bytes_processed == decompressed_headers_.length()) { 404 decompressed_headers_.clear(); 405 } else { 406 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 407 } 408 return bytes_processed; 409} 410 411void ReliableQuicStream::OnDecompressorAvailable() { 412 DCHECK_EQ(headers_id_, 413 session_->decompressor()->current_header_id()); 414 DCHECK(!headers_decompressed_); 415 DCHECK(!decompression_failed_); 416 DCHECK_EQ(0u, decompressed_headers_.length()); 417 418 while (!headers_decompressed_) { 419 struct iovec iovec; 420 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { 421 return; 422 } 423 424 size_t bytes_consumed = session_->decompressor()->DecompressData( 425 StringPiece(static_cast<char*>(iovec.iov_base), 426 iovec.iov_len), 427 this); 428 DCHECK_LE(bytes_consumed, iovec.iov_len); 429 if (decompression_failed_) { 430 return; 431 } 432 sequencer_.MarkConsumed(bytes_consumed); 433 434 headers_decompressed_ = 435 session_->decompressor()->current_header_id() != headers_id_; 436 } 437 438 // Either the headers are complete, or the all data as been consumed. 439 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 440 if (IsDoneReading()) { 441 OnFinRead(); 442 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 443 sequencer_.FlushBufferedFrames(); 444 } 445} 446 447bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 448 data.AppendToString(&decompressed_headers_); 449 return true; 450} 451 452void ReliableQuicStream::OnDecompressionError() { 453 DCHECK(!decompression_failed_); 454 decompression_failed_ = true; 455 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 456} 457 458 459void ReliableQuicStream::CloseWriteSide() { 460 if (write_side_closed_) { 461 return; 462 } 463 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 464 465 write_side_closed_ = true; 466 if (read_side_closed_) { 467 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 468 session_->CloseStream(id()); 469 } 470} 471 472bool ReliableQuicStream::HasBufferedData() { 473 return !queued_data_.empty(); 474} 475 476void ReliableQuicStream::OnClose() { 477 CloseReadSide(); 478 CloseWriteSide(); 479 480 if (visitor_) { 481 Visitor* visitor = visitor_; 482 // Calling Visitor::OnClose() may result the destruction of the visitor, 483 // so we need to ensure we don't call it again. 484 visitor_ = NULL; 485 visitor->OnClose(this); 486 } 487} 488 489uint32 ReliableQuicStream::StripPriorityAndHeaderId( 490 const char* data, uint32 data_len) { 491 uint32 total_bytes_parsed = 0; 492 493 if (!priority_parsed_ && session_->connection()->is_server()) { 494 QuicPriority temporary_priority = priority_; 495 total_bytes_parsed = StripUint32( 496 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); 497 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) { 498 priority_parsed_ = true; 499 500 // Spdy priorities are inverted, so the highest numerical value is the 501 // lowest legal priority. 502 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { 503 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); 504 return 0; 505 } 506 priority_ = temporary_priority; 507 } 508 data += total_bytes_parsed; 509 data_len -= total_bytes_parsed; 510 } 511 if (data_len > 0 && headers_id_ == 0u) { 512 // The headers ID has not yet been read. Strip it from the beginning of 513 // the data stream. 514 total_bytes_parsed += StripUint32( 515 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); 516 } 517 return total_bytes_parsed; 518} 519 520} // namespace net 521