reliable_quic_stream.cc revision b2df76ea8fec9e32f6f3718986dba0d95315b29c
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/reliable_quic_stream.h" 6 7#include "net/quic/quic_session.h" 8#include "net/quic/quic_spdy_decompressor.h" 9 10using base::StringPiece; 11using std::min; 12 13namespace net { 14 15ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 16 QuicSession* session) 17 : sequencer_(this), 18 id_(id), 19 session_(session), 20 visitor_(NULL), 21 stream_bytes_read_(0), 22 stream_bytes_written_(0), 23 headers_decompressed_(false), 24 headers_id_(0), 25 stream_error_(QUIC_STREAM_NO_ERROR), 26 connection_error_(QUIC_NO_ERROR), 27 read_side_closed_(false), 28 write_side_closed_(false), 29 fin_buffered_(false), 30 fin_sent_(false) { 31} 32 33ReliableQuicStream::~ReliableQuicStream() { 34} 35 36bool ReliableQuicStream::WillAcceptStreamFrame( 37 const QuicStreamFrame& frame) const { 38 if (read_side_closed_) { 39 return true; 40 } 41 if (frame.stream_id != id_) { 42 LOG(ERROR) << "Error!"; 43 return false; 44 } 45 return sequencer_.WillAcceptStreamFrame(frame); 46} 47 48bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 49 DCHECK_EQ(frame.stream_id, id_); 50 if (read_side_closed_) { 51 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 52 // We don't want to be reading: blackhole the data. 53 return true; 54 } 55 // Note: This count include duplicate data received. 56 stream_bytes_read_ += frame.data.length(); 57 58 bool accepted = sequencer_.OnStreamFrame(frame); 59 60 if (frame.fin) { 61 sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size(), true); 62 } 63 64 return accepted; 65} 66 67void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 68 stream_error_ = error; 69 TerminateFromPeer(false); // Full close. 70} 71 72void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 73 if (IsClosed()) { 74 return; 75 } 76 if (error != QUIC_NO_ERROR) { 77 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 78 connection_error_ = error; 79 } 80 81 if (from_peer) { 82 TerminateFromPeer(false); 83 } else { 84 CloseWriteSide(); 85 CloseReadSide(); 86 } 87} 88 89void ReliableQuicStream::TerminateFromPeer(bool half_close) { 90 if (!half_close) { 91 CloseWriteSide(); 92 } 93 CloseReadSide(); 94} 95 96void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 97 stream_error_ = error; 98 session()->SendRstStream(id(), error); 99} 100 101int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { 102 if (headers_decompressed_ && decompressed_headers_.empty()) { 103 return sequencer_.Readv(iov, iov_len); 104 } 105 size_t bytes_consumed = 0; 106 int iov_index = 0; 107 while (iov_index < iov_len && 108 decompressed_headers_.length() > bytes_consumed) { 109 int bytes_to_read = min(iov[iov_index].iov_len, 110 decompressed_headers_.length() - bytes_consumed); 111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 112 memcpy(iov_ptr, 113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 114 bytes_consumed += bytes_to_read; 115 ++iov_index; 116 } 117 decompressed_headers_.erase(0, bytes_consumed); 118 return bytes_consumed; 119} 120 121int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { 122 if (headers_decompressed_ && decompressed_headers_.empty()) { 123 return sequencer_.GetReadableRegions(iov, iov_len); 124 } 125 if (iov_len == 0) { 126 return 0; 127 } 128 iov[0].iov_base = static_cast<void*>( 129 const_cast<char*>(decompressed_headers_.data())); 130 iov[0].iov_len = decompressed_headers_.length(); 131 return 1; 132} 133 134bool ReliableQuicStream::IsHalfClosed() const { 135 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 136 return false; 137 } 138 return sequencer_.IsHalfClosed(); 139} 140 141bool ReliableQuicStream::IsClosed() const { 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); 143} 144 145bool ReliableQuicStream::HasBytesToRead() const { 146 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 147} 148 149const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 150 return session_->peer_address(); 151} 152 153QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 154 return WriteOrBuffer(data, fin); 155} 156 157QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 158 DCHECK(!fin_buffered_); 159 160 QuicConsumedData consumed_data(0, false); 161 fin_buffered_ = fin; 162 163 if (queued_data_.empty()) { 164 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 165 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 166 } 167 168 // If there's unconsumed data or an unconsumed fin, queue it. 169 if (consumed_data.bytes_consumed < data.length() || 170 (fin && !consumed_data.fin_consumed)) { 171 queued_data_.push_back( 172 string(data.data() + consumed_data.bytes_consumed, 173 data.length() - consumed_data.bytes_consumed)); 174 } 175 176 return QuicConsumedData(data.size(), true); 177} 178 179void ReliableQuicStream::OnCanWrite() { 180 bool fin = false; 181 while (!queued_data_.empty()) { 182 const string& data = queued_data_.front(); 183 if (queued_data_.size() == 1 && fin_buffered_) { 184 fin = true; 185 } 186 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 187 if (consumed_data.bytes_consumed == data.size() && 188 fin == consumed_data.fin_consumed) { 189 queued_data_.pop_front(); 190 } else { 191 queued_data_.front().erase(0, consumed_data.bytes_consumed); 192 break; 193 } 194 } 195} 196 197QuicConsumedData ReliableQuicStream::WriteDataInternal( 198 StringPiece data, bool fin) { 199 if (write_side_closed_) { 200 DLOG(ERROR) << "Attempt to write when the write side is closed"; 201 return QuicConsumedData(0, false); 202 } 203 204 QuicConsumedData consumed_data = 205 session()->WriteData(id(), data, stream_bytes_written_, fin); 206 stream_bytes_written_ += consumed_data.bytes_consumed; 207 if (consumed_data.bytes_consumed == data.length()) { 208 if (fin && consumed_data.fin_consumed) { 209 fin_sent_ = true; 210 CloseWriteSide(); 211 } 212 } else { 213 session_->MarkWriteBlocked(id()); 214 } 215 return consumed_data; 216} 217 218void ReliableQuicStream::CloseReadSide() { 219 if (read_side_closed_) { 220 return; 221 } 222 DLOG(INFO) << "Done reading from stream " << id(); 223 224 read_side_closed_ = true; 225 if (write_side_closed_) { 226 DLOG(INFO) << "Closing stream: " << id(); 227 session_->CloseStream(id()); 228 } 229} 230 231uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 232 if (id() == kCryptoStreamId) { 233 // The crypto stream does not use compression. 234 return ProcessData(data, data_len); 235 } 236 uint32 total_bytes_consumed = 0; 237 if (headers_id_ == 0u) { 238 // The headers ID has not yet been read. Strip it from the beginning of 239 // the data stream. 240 DCHECK_GT(4u, headers_id_buffer_.length()); 241 size_t missing_size = 4 - headers_id_buffer_.length(); 242 if (data_len < missing_size) { 243 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); 244 return data_len; 245 } 246 total_bytes_consumed += missing_size; 247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 248 DCHECK_EQ(4u, headers_id_buffer_.length()); 249 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 250 headers_id_buffer_.clear(); 251 data += missing_size; 252 data_len -= missing_size; 253 } 254 DCHECK_NE(0u, headers_id_); 255 256 // Once the headers are finished, we simply pass the data through. 257 if (headers_decompressed_) { 258 // Some buffered header data remains. 259 if (!decompressed_headers_.empty()) { 260 ProcessHeaderData(); 261 } 262 if (decompressed_headers_.empty()) { 263 DVLOG(1) << "Delegating procesing to ProcessData"; 264 total_bytes_consumed += ProcessData(data, data_len); 265 } 266 return total_bytes_consumed; 267 } 268 269 QuicHeaderId current_header_id = 270 session_->decompressor()->current_header_id(); 271 // Ensure that this header id looks sane. 272 if (headers_id_ < current_header_id || 273 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 274 DVLOG(1) << "Invalid headers for stream: " << id() 275 << " header_id: " << headers_id_ 276 << " current_header_id: " << current_header_id; 277 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 278 return total_bytes_consumed; 279 } 280 281 // If we are head-of-line blocked on decompression, then back up. 282 if (current_header_id != headers_id_) { 283 session_->MarkDecompressionBlocked(headers_id_, id()); 284 DVLOG(1) << "Unable to decmpress header data for stream: " << id() 285 << " header_id: " << headers_id_; 286 return total_bytes_consumed; 287 } 288 289 // Decompressed data will be delivered to decompressed_headers_. 290 size_t bytes_consumed = session_->decompressor()->DecompressData( 291 StringPiece(data, data_len), this); 292 total_bytes_consumed += bytes_consumed; 293 294 // Headers are complete if the decompressor has moved on to the 295 // next stream. 296 headers_decompressed_ = 297 session_->decompressor()->current_header_id() != headers_id_; 298 299 ProcessHeaderData(); 300 301 // We have processed all of the decompressed data but we might 302 // have some more raw data to process. 303 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { 304 total_bytes_consumed += ProcessData(data + bytes_consumed, 305 data_len - bytes_consumed); 306 } 307 308 // The sequencer will push any additional buffered frames if this data 309 // has been completely consumed. 310 return total_bytes_consumed; 311} 312 313uint32 ReliableQuicStream::ProcessHeaderData() { 314 if (decompressed_headers_.empty()) { 315 return 0; 316 } 317 318 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 319 decompressed_headers_.length()); 320 if (bytes_processed == decompressed_headers_.length()) { 321 decompressed_headers_.clear(); 322 } else { 323 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 324 } 325 return bytes_processed; 326} 327 328void ReliableQuicStream::OnDecompressorAvailable() { 329 DCHECK_EQ(headers_id_, 330 session_->decompressor()->current_header_id()); 331 DCHECK(!headers_decompressed_); 332 DCHECK_EQ(0u, decompressed_headers_.length()); 333 334 size_t total_bytes_consumed = 0; 335 struct iovec iovecs[5]; 336 while (!headers_decompressed_) { 337 size_t num_iovecs = 338 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); 339 340 if (num_iovecs == 0) { 341 return; 342 } 343 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { 344 total_bytes_consumed += session_->decompressor()->DecompressData( 345 StringPiece(static_cast<char*>(iovecs[i].iov_base), 346 iovecs[i].iov_len), this); 347 348 headers_decompressed_ = 349 session_->decompressor()->current_header_id() != headers_id_; 350 } 351 } 352 353 // Either the headers are complete, or the all data as been consumed. 354 sequencer_.MarkConsumed(total_bytes_consumed); 355 356 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 357 358 if (headers_decompressed_ && decompressed_headers_.empty()) { 359 sequencer_.FlushBufferedFrames(); 360 } 361} 362 363bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 364 data.AppendToString(&decompressed_headers_); 365 return true; 366} 367 368void ReliableQuicStream::CloseWriteSide() { 369 if (write_side_closed_) { 370 return; 371 } 372 DLOG(INFO) << "Done writing to stream " << id(); 373 374 write_side_closed_ = true; 375 if (read_side_closed_) { 376 DLOG(INFO) << "Closing stream: " << id(); 377 session_->CloseStream(id()); 378 } 379} 380 381void ReliableQuicStream::OnClose() { 382 CloseReadSide(); 383 CloseWriteSide(); 384 385 if (visitor_) { 386 Visitor* visitor = visitor_; 387 // Calling Visitor::OnClose() may result the destruction of the visitor, 388 // so we need to ensure we don't call it again. 389 visitor_ = NULL; 390 visitor->OnClose(this); 391 } 392} 393 394} // namespace net 395