reliable_quic_stream.cc revision 68043e1e95eeb07d5cae7aca370b26518b0867d6
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9#include "net/spdy/write_blocked_list.h" 10 11using base::StringPiece; 12using std::min; 13 14namespace net { 15 16namespace { 17 18// This is somewhat arbitrary. It's possible, but unlikely, we will either fail 19// to set a priority client-side, or cancel a stream before stripping the 20// priority from the wire server-side. In either case, start out with a 21// priority in the middle. 22QuicPriority kDefaultPriority = 3; 23 24// Appends bytes from data into partial_data_buffer. Once partial_data_buffer 25// reaches 4 bytes, copies the data into 'result' and clears 26// partial_data_buffer. 27// Returns the number of bytes consumed. 28uint32 StripUint32(const char* data, uint32 data_len, 29 string* partial_data_buffer, 30 uint32* result) { 31 DCHECK_GT(4u, partial_data_buffer->length()); 32 size_t missing_size = 4 - partial_data_buffer->length(); 33 if (data_len < missing_size) { 34 StringPiece(data, data_len).AppendToString(partial_data_buffer); 35 return data_len; 36 } 37 StringPiece(data, missing_size).AppendToString(partial_data_buffer); 38 DCHECK_EQ(4u, partial_data_buffer->length()); 39 memcpy(result, partial_data_buffer->data(), 4); 40 partial_data_buffer->clear(); 41 return missing_size; 42} 43 44} // namespace 45 46ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 47 QuicSession* session) 48 : sequencer_(this), 49 id_(id), 50 session_(session), 51 visitor_(NULL), 52 stream_bytes_read_(0), 53 stream_bytes_written_(0), 54 headers_decompressed_(false), 55 priority_(kDefaultPriority), 56 headers_id_(0), 57 decompression_failed_(false), 58 stream_error_(QUIC_STREAM_NO_ERROR), 59 connection_error_(QUIC_NO_ERROR), 60 read_side_closed_(false), 61 write_side_closed_(false), 62 priority_parsed_(false), 63 fin_buffered_(false), 64 fin_sent_(false) { 65} 66 67ReliableQuicStream::~ReliableQuicStream() { 68} 69 70bool ReliableQuicStream::WillAcceptStreamFrame( 71 const QuicStreamFrame& frame) const { 72 if (read_side_closed_) { 73 return true; 74 } 75 if (frame.stream_id != id_) { 76 LOG(ERROR) << "Error!"; 77 return false; 78 } 79 return sequencer_.WillAcceptStreamFrame(frame); 80} 81 82bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 83 DCHECK_EQ(frame.stream_id, id_); 84 if (read_side_closed_) { 85 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 86 // We don't want to be reading: blackhole the data. 87 return true; 88 } 89 // Note: This count include duplicate data received. 90 stream_bytes_read_ += frame.data.length(); 91 92 bool accepted = sequencer_.OnStreamFrame(frame); 93 94 return accepted; 95} 96 97void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 98 stream_error_ = error; 99 TerminateFromPeer(false); // Full close. 100} 101 102void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 103 if (read_side_closed_ && write_side_closed_) { 104 return; 105 } 106 if (error != QUIC_NO_ERROR) { 107 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 108 connection_error_ = error; 109 } 110 111 if (from_peer) { 112 TerminateFromPeer(false); 113 } else { 114 CloseWriteSide(); 115 CloseReadSide(); 116 } 117} 118 119void ReliableQuicStream::TerminateFromPeer(bool half_close) { 120 if (!half_close) { 121 CloseWriteSide(); 122 } 123 CloseReadSide(); 124} 125 126void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 127 stream_error_ = error; 128 if (error != QUIC_STREAM_NO_ERROR) { 129 // Sending a RstStream results in calling CloseStream. 130 session()->SendRstStream(id(), error); 131 } else { 132 session_->CloseStream(id()); 133 } 134} 135 136size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 137 if (headers_decompressed_ && decompressed_headers_.empty()) { 138 return sequencer_.Readv(iov, iov_len); 139 } 140 size_t bytes_consumed = 0; 141 size_t iov_index = 0; 142 while (iov_index < iov_len && 143 decompressed_headers_.length() > bytes_consumed) { 144 size_t bytes_to_read = min(iov[iov_index].iov_len, 145 decompressed_headers_.length() - bytes_consumed); 146 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 147 memcpy(iov_ptr, 148 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 149 bytes_consumed += bytes_to_read; 150 ++iov_index; 151 } 152 decompressed_headers_.erase(0, bytes_consumed); 153 return bytes_consumed; 154} 155 156int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 157 if (headers_decompressed_ && decompressed_headers_.empty()) { 158 return sequencer_.GetReadableRegions(iov, iov_len); 159 } 160 if (iov_len == 0) { 161 return 0; 162 } 163 iov[0].iov_base = static_cast<void*>( 164 const_cast<char*>(decompressed_headers_.data())); 165 iov[0].iov_len = decompressed_headers_.length(); 166 return 1; 167} 168 169bool ReliableQuicStream::IsHalfClosed() const { 170 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 171 return false; 172 } 173 return sequencer_.IsHalfClosed(); 174} 175 176bool ReliableQuicStream::HasBytesToRead() const { 177 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 178} 179 180const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 181 return session_->peer_address(); 182} 183 184QuicSpdyCompressor* ReliableQuicStream::compressor() { 185 return session_->compressor(); 186} 187 188bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { 189 return session_->GetSSLInfo(ssl_info); 190} 191 192QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 193 DCHECK(data.size() > 0 || fin); 194 return WriteOrBuffer(data, fin); 195} 196 197 198void ReliableQuicStream::set_priority(QuicPriority priority) { 199 DCHECK_EQ(0u, stream_bytes_written_); 200 priority_ = priority; 201} 202 203QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 204 DCHECK(!fin_buffered_); 205 206 QuicConsumedData consumed_data(0, false); 207 fin_buffered_ = fin; 208 209 if (queued_data_.empty()) { 210 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 211 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 212 } 213 214 // If there's unconsumed data or an unconsumed fin, queue it. 215 if (consumed_data.bytes_consumed < data.length() || 216 (fin && !consumed_data.fin_consumed)) { 217 queued_data_.push_back( 218 string(data.data() + consumed_data.bytes_consumed, 219 data.length() - consumed_data.bytes_consumed)); 220 } 221 222 return QuicConsumedData(data.size(), true); 223} 224 225void ReliableQuicStream::OnCanWrite() { 226 bool fin = false; 227 while (!queued_data_.empty()) { 228 const string& data = queued_data_.front(); 229 if (queued_data_.size() == 1 && fin_buffered_) { 230 fin = true; 231 } 232 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 233 if (consumed_data.bytes_consumed == data.size() && 234 fin == consumed_data.fin_consumed) { 235 queued_data_.pop_front(); 236 } else { 237 queued_data_.front().erase(0, consumed_data.bytes_consumed); 238 break; 239 } 240 } 241} 242 243QuicConsumedData ReliableQuicStream::WriteDataInternal( 244 StringPiece data, bool fin) { 245 struct iovec iov = {const_cast<char*>(data.data()), 246 static_cast<size_t>(data.size())}; 247 return WritevDataInternal(&iov, 1, fin); 248} 249 250QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov, 251 int iov_count, 252 bool fin) { 253 if (write_side_closed_) { 254 DLOG(ERROR) << "Attempt to write when the write side is closed"; 255 return QuicConsumedData(0, false); 256 } 257 258 size_t write_length = 0u; 259 for (int i = 0; i < iov_count; ++i) { 260 write_length += iov[i].iov_len; 261 } 262 QuicConsumedData consumed_data = 263 session()->WritevData(id(), iov, iov_count, stream_bytes_written_, fin); 264 stream_bytes_written_ += consumed_data.bytes_consumed; 265 if (consumed_data.bytes_consumed == write_length) { 266 if (fin && consumed_data.fin_consumed) { 267 fin_sent_ = true; 268 CloseWriteSide(); 269 } else if (fin && !consumed_data.fin_consumed) { 270 session_->MarkWriteBlocked(id(), EffectivePriority()); 271 } 272 } else { 273 session_->MarkWriteBlocked(id(), EffectivePriority()); 274 } 275 return consumed_data; 276} 277 278QuicPriority ReliableQuicStream::EffectivePriority() const { 279 return priority(); 280} 281 282void ReliableQuicStream::CloseReadSide() { 283 if (read_side_closed_) { 284 return; 285 } 286 DLOG(INFO) << "Done reading from stream " << id(); 287 288 read_side_closed_ = true; 289 if (write_side_closed_) { 290 DLOG(INFO) << "Closing stream: " << id(); 291 session_->CloseStream(id()); 292 } 293} 294 295uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 296 DCHECK_NE(0u, data_len); 297 if (id() == kCryptoStreamId) { 298 // The crypto stream does not use compression. 299 return ProcessData(data, data_len); 300 } 301 302 uint32 total_bytes_consumed = 0; 303 if (headers_id_ == 0u) { 304 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); 305 data += total_bytes_consumed; 306 data_len -= total_bytes_consumed; 307 if (data_len == 0 || !session_->connection()->connected()) { 308 return total_bytes_consumed; 309 } 310 } 311 DCHECK_NE(0u, headers_id_); 312 313 // Once the headers are finished, we simply pass the data through. 314 if (headers_decompressed_) { 315 // Some buffered header data remains. 316 if (!decompressed_headers_.empty()) { 317 ProcessHeaderData(); 318 } 319 if (decompressed_headers_.empty()) { 320 DVLOG(1) << "Delegating procesing to ProcessData"; 321 total_bytes_consumed += ProcessData(data, data_len); 322 } 323 return total_bytes_consumed; 324 } 325 326 QuicHeaderId current_header_id = 327 session_->decompressor()->current_header_id(); 328 // Ensure that this header id looks sane. 329 if (headers_id_ < current_header_id || 330 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 331 DVLOG(1) << "Invalid headers for stream: " << id() 332 << " header_id: " << headers_id_ 333 << " current_header_id: " << current_header_id; 334 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 335 return total_bytes_consumed; 336 } 337 338 // If we are head-of-line blocked on decompression, then back up. 339 if (current_header_id != headers_id_) { 340 session_->MarkDecompressionBlocked(headers_id_, id()); 341 DVLOG(1) << "Unable to decompress header data for stream: " << id() 342 << " header_id: " << headers_id_; 343 return total_bytes_consumed; 344 } 345 346 // Decompressed data will be delivered to decompressed_headers_. 347 size_t bytes_consumed = session_->decompressor()->DecompressData( 348 StringPiece(data, data_len), this); 349 DCHECK_NE(0u, bytes_consumed); 350 if (bytes_consumed > data_len) { 351 DCHECK(false) << "DecompressData returned illegal value"; 352 OnDecompressionError(); 353 return total_bytes_consumed; 354 } 355 total_bytes_consumed += bytes_consumed; 356 data += bytes_consumed; 357 data_len -= bytes_consumed; 358 359 if (decompression_failed_) { 360 // The session will have been closed in OnDecompressionError. 361 return total_bytes_consumed; 362 } 363 364 // Headers are complete if the decompressor has moved on to the 365 // next stream. 366 headers_decompressed_ = 367 session_->decompressor()->current_header_id() != headers_id_; 368 if (!headers_decompressed_) { 369 DCHECK_EQ(0u, data_len); 370 } 371 372 ProcessHeaderData(); 373 374 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 375 return total_bytes_consumed; 376 } 377 378 // We have processed all of the decompressed data but we might 379 // have some more raw data to process. 380 if (data_len > 0) { 381 total_bytes_consumed += ProcessData(data, data_len); 382 } 383 384 // The sequencer will push any additional buffered frames if this data 385 // has been completely consumed. 386 return total_bytes_consumed; 387} 388 389uint32 ReliableQuicStream::ProcessHeaderData() { 390 if (decompressed_headers_.empty()) { 391 return 0; 392 } 393 394 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 395 decompressed_headers_.length()); 396 if (bytes_processed == decompressed_headers_.length()) { 397 decompressed_headers_.clear(); 398 } else { 399 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 400 } 401 return bytes_processed; 402} 403 404void ReliableQuicStream::OnDecompressorAvailable() { 405 DCHECK_EQ(headers_id_, 406 session_->decompressor()->current_header_id()); 407 DCHECK(!headers_decompressed_); 408 DCHECK(!decompression_failed_); 409 DCHECK_EQ(0u, decompressed_headers_.length()); 410 411 while (!headers_decompressed_) { 412 struct iovec iovec; 413 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { 414 return; 415 } 416 417 size_t bytes_consumed = session_->decompressor()->DecompressData( 418 StringPiece(static_cast<char*>(iovec.iov_base), 419 iovec.iov_len), 420 this); 421 DCHECK_LE(bytes_consumed, iovec.iov_len); 422 if (decompression_failed_) { 423 return; 424 } 425 sequencer_.MarkConsumed(bytes_consumed); 426 427 headers_decompressed_ = 428 session_->decompressor()->current_header_id() != headers_id_; 429 } 430 431 // Either the headers are complete, or the all data as been consumed. 432 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 433 if (IsHalfClosed()) { 434 TerminateFromPeer(true); 435 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 436 sequencer_.FlushBufferedFrames(); 437 } 438} 439 440bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 441 data.AppendToString(&decompressed_headers_); 442 return true; 443} 444 445void ReliableQuicStream::OnDecompressionError() { 446 DCHECK(!decompression_failed_); 447 decompression_failed_ = true; 448 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 449} 450 451 452void ReliableQuicStream::CloseWriteSide() { 453 if (write_side_closed_) { 454 return; 455 } 456 DLOG(INFO) << "Done writing to stream " << id(); 457 458 write_side_closed_ = true; 459 if (read_side_closed_) { 460 DLOG(INFO) << "Closing stream: " << id(); 461 session_->CloseStream(id()); 462 } 463} 464 465bool ReliableQuicStream::HasBufferedData() { 466 return !queued_data_.empty(); 467} 468 469void ReliableQuicStream::OnClose() { 470 CloseReadSide(); 471 CloseWriteSide(); 472 473 if (visitor_) { 474 Visitor* visitor = visitor_; 475 // Calling Visitor::OnClose() may result the destruction of the visitor, 476 // so we need to ensure we don't call it again. 477 visitor_ = NULL; 478 visitor->OnClose(this); 479 } 480} 481 482uint32 ReliableQuicStream::StripPriorityAndHeaderId( 483 const char* data, uint32 data_len) { 484 uint32 total_bytes_parsed = 0; 485 486 if (!priority_parsed_ && session_->connection()->is_server()) { 487 QuicPriority temporary_priority = priority_; 488 total_bytes_parsed = StripUint32( 489 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); 490 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { 491 priority_parsed_ = true; 492 // Spdy priorities are inverted, so the highest numerical value is the 493 // lowest legal priority. 494 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { 495 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); 496 return 0; 497 } 498 priority_ = temporary_priority; 499 } 500 data += total_bytes_parsed; 501 data_len -= total_bytes_parsed; 502 } 503 if (data_len > 0 && headers_id_ == 0u) { 504 // The headers ID has not yet been read. Strip it from the beginning of 505 // the data stream. 506 total_bytes_parsed += StripUint32( 507 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); 508 } 509 return total_bytes_parsed; 510} 511 512} // namespace net 513