reliable_quic_stream.cc revision 0f1bc08d4cfcc34181b0b5cbf065c40f687bf740
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9#include "net/spdy/write_blocked_list.h" 10 11using base::StringPiece; 12using std::min; 13 14namespace net { 15 16namespace { 17 18// This is somewhat arbitrary. It's possible, but unlikely, we will either fail 19// to set a priority client-side, or cancel a stream before stripping the 20// priority from the wire server-side. In either case, start out with a 21// priority in the middle. 22QuicPriority kDefaultPriority = 3; 23 24// Appends bytes from data into partial_data_buffer. Once partial_data_buffer 25// reaches 4 bytes, copies the data into 'result' and clears 26// partial_data_buffer. 27// Returns the number of bytes consumed. 28uint32 StripUint32(const char* data, uint32 data_len, 29 string* partial_data_buffer, 30 uint32* result) { 31 DCHECK_GT(4u, partial_data_buffer->length()); 32 size_t missing_size = 4 - partial_data_buffer->length(); 33 if (data_len < missing_size) { 34 StringPiece(data, data_len).AppendToString(partial_data_buffer); 35 return data_len; 36 } 37 StringPiece(data, missing_size).AppendToString(partial_data_buffer); 38 DCHECK_EQ(4u, partial_data_buffer->length()); 39 memcpy(result, partial_data_buffer->data(), 4); 40 partial_data_buffer->clear(); 41 return missing_size; 42} 43 44} // namespace 45 46ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 47 QuicSession* session) 48 : sequencer_(this), 49 id_(id), 50 session_(session), 51 visitor_(NULL), 52 stream_bytes_read_(0), 53 stream_bytes_written_(0), 54 headers_decompressed_(false), 55 priority_(kDefaultPriority), 56 headers_id_(0), 57 decompression_failed_(false), 58 stream_error_(QUIC_STREAM_NO_ERROR), 59 connection_error_(QUIC_NO_ERROR), 60 read_side_closed_(false), 61 write_side_closed_(false), 62 priority_parsed_(false), 63 fin_buffered_(false), 64 fin_sent_(false), 65 is_server_(session_->is_server()) { 66} 67 68ReliableQuicStream::~ReliableQuicStream() { 69} 70 71bool ReliableQuicStream::WillAcceptStreamFrame( 72 const QuicStreamFrame& frame) const { 73 if (read_side_closed_) { 74 return true; 75 } 76 if (frame.stream_id != id_) { 77 LOG(ERROR) << "Error!"; 78 return false; 79 } 80 return sequencer_.WillAcceptStreamFrame(frame); 81} 82 83bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 84 DCHECK_EQ(frame.stream_id, id_); 85 if (read_side_closed_) { 86 DLOG(INFO) << ENDPOINT << "Ignoring frame " << frame.stream_id; 87 // We don't want to be reading: blackhole the data. 88 return true; 89 } 90 // Note: This count include duplicate data received. 91 stream_bytes_read_ += frame.data.length(); 92 93 bool accepted = sequencer_.OnStreamFrame(frame); 94 95 return accepted; 96} 97 98void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 99 stream_error_ = error; 100 TerminateFromPeer(false); // Full close. 101} 102 103void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 104 bool from_peer) { 105 if (read_side_closed_ && write_side_closed_) { 106 return; 107 } 108 if (error != QUIC_NO_ERROR) { 109 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 110 connection_error_ = error; 111 } 112 113 if (from_peer) { 114 TerminateFromPeer(false); 115 } else { 116 CloseWriteSide(); 117 CloseReadSide(); 118 } 119} 120 121void ReliableQuicStream::TerminateFromPeer(bool half_close) { 122 if (!half_close) { 123 CloseWriteSide(); 124 } 125 CloseReadSide(); 126} 127 128void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 129 stream_error_ = error; 130 if (error != QUIC_STREAM_NO_ERROR) { 131 // Sending a RstStream results in calling CloseStream. 132 session()->SendRstStream(id(), error); 133 } else { 134 session_->CloseStream(id()); 135 } 136} 137 138void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 139 session()->connection()->SendConnectionClose(error); 140} 141 142void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 143 const string& details) { 144 session()->connection()->SendConnectionCloseWithDetails(error, details); 145} 146 147size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 148 if (headers_decompressed_ && decompressed_headers_.empty()) { 149 return sequencer_.Readv(iov, iov_len); 150 } 151 size_t bytes_consumed = 0; 152 size_t iov_index = 0; 153 while (iov_index < iov_len && 154 decompressed_headers_.length() > bytes_consumed) { 155 size_t bytes_to_read = min(iov[iov_index].iov_len, 156 decompressed_headers_.length() - bytes_consumed); 157 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 158 memcpy(iov_ptr, 159 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 160 bytes_consumed += bytes_to_read; 161 ++iov_index; 162 } 163 decompressed_headers_.erase(0, bytes_consumed); 164 return bytes_consumed; 165} 166 167int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 168 if (headers_decompressed_ && decompressed_headers_.empty()) { 169 return sequencer_.GetReadableRegions(iov, iov_len); 170 } 171 if (iov_len == 0) { 172 return 0; 173 } 174 iov[0].iov_base = static_cast<void*>( 175 const_cast<char*>(decompressed_headers_.data())); 176 iov[0].iov_len = decompressed_headers_.length(); 177 return 1; 178} 179 180bool ReliableQuicStream::IsHalfClosed() const { 181 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 182 return false; 183 } 184 return sequencer_.IsHalfClosed(); 185} 186 187bool ReliableQuicStream::HasBytesToRead() const { 188 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 189} 190 191const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 192 return session_->peer_address(); 193} 194 195QuicSpdyCompressor* ReliableQuicStream::compressor() { 196 return session_->compressor(); 197} 198 199bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { 200 return session_->GetSSLInfo(ssl_info); 201} 202 203QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 204 DCHECK(data.size() > 0 || fin); 205 return WriteOrBuffer(data, fin); 206} 207 208 209void ReliableQuicStream::set_priority(QuicPriority priority) { 210 DCHECK_EQ(0u, stream_bytes_written_); 211 priority_ = priority; 212} 213 214QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 215 DCHECK(!fin_buffered_); 216 217 QuicConsumedData consumed_data(0, false); 218 fin_buffered_ = fin; 219 220 if (queued_data_.empty()) { 221 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 222 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 223 } 224 225 // If there's unconsumed data or an unconsumed fin, queue it. 226 if (consumed_data.bytes_consumed < data.length() || 227 (fin && !consumed_data.fin_consumed)) { 228 queued_data_.push_back( 229 string(data.data() + consumed_data.bytes_consumed, 230 data.length() - consumed_data.bytes_consumed)); 231 } 232 233 return QuicConsumedData(data.size(), true); 234} 235 236void ReliableQuicStream::OnCanWrite() { 237 bool fin = false; 238 while (!queued_data_.empty()) { 239 const string& data = queued_data_.front(); 240 if (queued_data_.size() == 1 && fin_buffered_) { 241 fin = true; 242 } 243 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 244 if (consumed_data.bytes_consumed == data.size() && 245 fin == consumed_data.fin_consumed) { 246 queued_data_.pop_front(); 247 } else { 248 queued_data_.front().erase(0, consumed_data.bytes_consumed); 249 break; 250 } 251 } 252} 253 254QuicConsumedData ReliableQuicStream::WriteDataInternal( 255 StringPiece data, bool fin) { 256 struct iovec iov = {const_cast<char*>(data.data()), 257 static_cast<size_t>(data.size())}; 258 return WritevDataInternal(&iov, 1, fin); 259} 260 261QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov, 262 int iov_count, 263 bool fin) { 264 if (write_side_closed_) { 265 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 266 return QuicConsumedData(0, false); 267 } 268 269 size_t write_length = 0u; 270 for (int i = 0; i < iov_count; ++i) { 271 write_length += iov[i].iov_len; 272 } 273 QuicConsumedData consumed_data = 274 session()->WritevData(id(), iov, iov_count, stream_bytes_written_, fin); 275 stream_bytes_written_ += consumed_data.bytes_consumed; 276 if (consumed_data.bytes_consumed == write_length) { 277 if (fin && consumed_data.fin_consumed) { 278 fin_sent_ = true; 279 CloseWriteSide(); 280 } else if (fin && !consumed_data.fin_consumed) { 281 session_->MarkWriteBlocked(id(), EffectivePriority()); 282 } 283 } else { 284 session_->MarkWriteBlocked(id(), EffectivePriority()); 285 } 286 return consumed_data; 287} 288 289QuicPriority ReliableQuicStream::EffectivePriority() const { 290 return priority(); 291} 292 293void ReliableQuicStream::CloseReadSide() { 294 if (read_side_closed_) { 295 return; 296 } 297 DLOG(INFO) << ENDPOINT << "Done reading from stream " << id(); 298 299 read_side_closed_ = true; 300 if (write_side_closed_) { 301 DLOG(INFO) << ENDPOINT << "Closing stream: " << id(); 302 session_->CloseStream(id()); 303 } 304} 305 306uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 307 DCHECK_NE(0u, data_len); 308 if (id() == kCryptoStreamId) { 309 // The crypto stream does not use compression. 310 return ProcessData(data, data_len); 311 } 312 313 uint32 total_bytes_consumed = 0; 314 if (headers_id_ == 0u) { 315 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); 316 data += total_bytes_consumed; 317 data_len -= total_bytes_consumed; 318 if (data_len == 0 || !session_->connection()->connected()) { 319 return total_bytes_consumed; 320 } 321 } 322 DCHECK_NE(0u, headers_id_); 323 324 // Once the headers are finished, we simply pass the data through. 325 if (headers_decompressed_) { 326 // Some buffered header data remains. 327 if (!decompressed_headers_.empty()) { 328 ProcessHeaderData(); 329 } 330 if (decompressed_headers_.empty()) { 331 DVLOG(1) << "Delegating procesing to ProcessData"; 332 total_bytes_consumed += ProcessData(data, data_len); 333 } 334 return total_bytes_consumed; 335 } 336 337 QuicHeaderId current_header_id = 338 session_->decompressor()->current_header_id(); 339 // Ensure that this header id looks sane. 340 if (headers_id_ < current_header_id || 341 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 342 DVLOG(1) << ENDPOINT 343 << "Invalid headers for stream: " << id() 344 << " header_id: " << headers_id_ 345 << " current_header_id: " << current_header_id; 346 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 347 return total_bytes_consumed; 348 } 349 350 // If we are head-of-line blocked on decompression, then back up. 351 if (current_header_id != headers_id_) { 352 session_->MarkDecompressionBlocked(headers_id_, id()); 353 DVLOG(1) << ENDPOINT 354 << "Unable to decompress header data for stream: " << id() 355 << " header_id: " << headers_id_; 356 return total_bytes_consumed; 357 } 358 359 // Decompressed data will be delivered to decompressed_headers_. 360 size_t bytes_consumed = session_->decompressor()->DecompressData( 361 StringPiece(data, data_len), this); 362 DCHECK_NE(0u, bytes_consumed); 363 if (bytes_consumed > data_len) { 364 DCHECK(false) << "DecompressData returned illegal value"; 365 OnDecompressionError(); 366 return total_bytes_consumed; 367 } 368 total_bytes_consumed += bytes_consumed; 369 data += bytes_consumed; 370 data_len -= bytes_consumed; 371 372 if (decompression_failed_) { 373 // The session will have been closed in OnDecompressionError. 374 return total_bytes_consumed; 375 } 376 377 // Headers are complete if the decompressor has moved on to the 378 // next stream. 379 headers_decompressed_ = 380 session_->decompressor()->current_header_id() != headers_id_; 381 if (!headers_decompressed_) { 382 DCHECK_EQ(0u, data_len); 383 } 384 385 ProcessHeaderData(); 386 387 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 388 return total_bytes_consumed; 389 } 390 391 // We have processed all of the decompressed data but we might 392 // have some more raw data to process. 393 if (data_len > 0) { 394 total_bytes_consumed += ProcessData(data, data_len); 395 } 396 397 // The sequencer will push any additional buffered frames if this data 398 // has been completely consumed. 399 return total_bytes_consumed; 400} 401 402uint32 ReliableQuicStream::ProcessHeaderData() { 403 if (decompressed_headers_.empty()) { 404 return 0; 405 } 406 407 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 408 decompressed_headers_.length()); 409 if (bytes_processed == decompressed_headers_.length()) { 410 decompressed_headers_.clear(); 411 } else { 412 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 413 } 414 return bytes_processed; 415} 416 417void ReliableQuicStream::OnDecompressorAvailable() { 418 DCHECK_EQ(headers_id_, 419 session_->decompressor()->current_header_id()); 420 DCHECK(!headers_decompressed_); 421 DCHECK(!decompression_failed_); 422 DCHECK_EQ(0u, decompressed_headers_.length()); 423 424 while (!headers_decompressed_) { 425 struct iovec iovec; 426 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { 427 return; 428 } 429 430 size_t bytes_consumed = session_->decompressor()->DecompressData( 431 StringPiece(static_cast<char*>(iovec.iov_base), 432 iovec.iov_len), 433 this); 434 DCHECK_LE(bytes_consumed, iovec.iov_len); 435 if (decompression_failed_) { 436 return; 437 } 438 sequencer_.MarkConsumed(bytes_consumed); 439 440 headers_decompressed_ = 441 session_->decompressor()->current_header_id() != headers_id_; 442 } 443 444 // Either the headers are complete, or the all data as been consumed. 445 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 446 if (IsHalfClosed()) { 447 TerminateFromPeer(true); 448 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 449 sequencer_.FlushBufferedFrames(); 450 } 451} 452 453bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 454 data.AppendToString(&decompressed_headers_); 455 return true; 456} 457 458void ReliableQuicStream::OnDecompressionError() { 459 DCHECK(!decompression_failed_); 460 decompression_failed_ = true; 461 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 462} 463 464 465void ReliableQuicStream::CloseWriteSide() { 466 if (write_side_closed_) { 467 return; 468 } 469 DLOG(INFO) << ENDPOINT << "Done writing to stream " << id(); 470 471 write_side_closed_ = true; 472 if (read_side_closed_) { 473 DLOG(INFO) << ENDPOINT << "Closing stream: " << id(); 474 session_->CloseStream(id()); 475 } 476} 477 478bool ReliableQuicStream::HasBufferedData() { 479 return !queued_data_.empty(); 480} 481 482void ReliableQuicStream::OnClose() { 483 CloseReadSide(); 484 CloseWriteSide(); 485 486 if (visitor_) { 487 Visitor* visitor = visitor_; 488 // Calling Visitor::OnClose() may result the destruction of the visitor, 489 // so we need to ensure we don't call it again. 490 visitor_ = NULL; 491 visitor->OnClose(this); 492 } 493} 494 495uint32 ReliableQuicStream::StripPriorityAndHeaderId( 496 const char* data, uint32 data_len) { 497 uint32 total_bytes_parsed = 0; 498 499 if (!priority_parsed_ && session_->connection()->is_server()) { 500 QuicPriority temporary_priority = priority_; 501 total_bytes_parsed = StripUint32( 502 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); 503 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { 504 priority_parsed_ = true; 505 506 // Spdy priorities are inverted, so the highest numerical value is the 507 // lowest legal priority. 508 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { 509 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); 510 return 0; 511 } 512 priority_ = temporary_priority; 513 } 514 data += total_bytes_parsed; 515 data_len -= total_bytes_parsed; 516 } 517 if (data_len > 0 && headers_id_ == 0u) { 518 // The headers ID has not yet been read. Strip it from the beginning of 519 // the data stream. 520 total_bytes_parsed += StripUint32( 521 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); 522 } 523 return total_bytes_parsed; 524} 525 526} // namespace net 527