1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_data_stream.h" 6 7#include "base/logging.h" 8#include "net/quic/quic_session.h" 9#include "net/quic/quic_spdy_decompressor.h" 10#include "net/spdy/write_blocked_list.h" 11 12using base::StringPiece; 13using std::min; 14 15namespace net { 16 17#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") 18 19namespace { 20 21// This is somewhat arbitrary. It's possible, but unlikely, we will either fail 22// to set a priority client-side, or cancel a stream before stripping the 23// priority from the wire server-side. In either case, start out with a 24// priority in the middle. 25QuicPriority kDefaultPriority = 3; 26 27// Appends bytes from data into partial_data_buffer. Once partial_data_buffer 28// reaches 4 bytes, copies the data into 'result' and clears 29// partial_data_buffer. 30// Returns the number of bytes consumed. 31uint32 StripUint32(const char* data, uint32 data_len, 32 string* partial_data_buffer, 33 uint32* result) { 34 DCHECK_GT(4u, partial_data_buffer->length()); 35 size_t missing_size = 4 - partial_data_buffer->length(); 36 if (data_len < missing_size) { 37 StringPiece(data, data_len).AppendToString(partial_data_buffer); 38 return data_len; 39 } 40 StringPiece(data, missing_size).AppendToString(partial_data_buffer); 41 DCHECK_EQ(4u, partial_data_buffer->length()); 42 memcpy(result, partial_data_buffer->data(), 4); 43 partial_data_buffer->clear(); 44 return missing_size; 45} 46 47} // namespace 48 49QuicDataStream::QuicDataStream(QuicStreamId id, 50 QuicSession* session) 51 : ReliableQuicStream(id, session), 52 visitor_(NULL), 53 headers_decompressed_(false), 54 priority_(kDefaultPriority), 55 headers_id_(0), 56 decompression_failed_(false), 57 priority_parsed_(false) { 58 DCHECK_NE(kCryptoStreamId, id); 59} 60 61QuicDataStream::~QuicDataStream() { 62} 63 64size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) { 65 if (FinishedReadingHeaders()) { 66 // If the headers have been read, simply delegate to the sequencer's 67 // Readv method. 68 return sequencer()->Readv(iov, iov_len); 69 } 70 // Otherwise, copy decompressed header data into |iov|. 71 size_t bytes_consumed = 0; 72 size_t iov_index = 0; 73 while (iov_index < iov_len && 74 decompressed_headers_.length() > bytes_consumed) { 75 size_t bytes_to_read = min(iov[iov_index].iov_len, 76 decompressed_headers_.length() - bytes_consumed); 77 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 78 memcpy(iov_ptr, 79 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 80 bytes_consumed += bytes_to_read; 81 ++iov_index; 82 } 83 decompressed_headers_.erase(0, bytes_consumed); 84 return bytes_consumed; 85} 86 87int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) { 88 if (FinishedReadingHeaders()) { 89 return sequencer()->GetReadableRegions(iov, iov_len); 90 } 91 if (iov_len == 0) { 92 return 0; 93 } 94 iov[0].iov_base = static_cast<void*>( 95 const_cast<char*>(decompressed_headers_.data())); 96 iov[0].iov_len = decompressed_headers_.length(); 97 return 1; 98} 99 100bool QuicDataStream::IsDoneReading() const { 101 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 102 return false; 103 } 104 return sequencer()->IsClosed(); 105} 106 107bool QuicDataStream::HasBytesToRead() const { 108 return !decompressed_headers_.empty() || sequencer()->HasBytesToRead(); 109} 110 111void QuicDataStream::set_priority(QuicPriority priority) { 112 DCHECK_EQ(0u, stream_bytes_written()); 113 priority_ = priority; 114} 115 116QuicPriority QuicDataStream::EffectivePriority() const { 117 return priority(); 118} 119 120uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { 121 DCHECK_NE(0u, data_len); 122 123 uint32 total_bytes_consumed = 0; 124 if (headers_id_ == 0u) { 125 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); 126 data += total_bytes_consumed; 127 data_len -= total_bytes_consumed; 128 if (data_len == 0 || total_bytes_consumed == 0) { 129 return total_bytes_consumed; 130 } 131 } 132 DCHECK_NE(0u, headers_id_); 133 134 // Once the headers are finished, we simply pass the data through. 135 if (headers_decompressed_) { 136 // Some buffered header data remains. 137 if (!decompressed_headers_.empty()) { 138 ProcessHeaderData(); 139 } 140 if (decompressed_headers_.empty()) { 141 DVLOG(1) << "Delegating procesing to ProcessData"; 142 total_bytes_consumed += ProcessData(data, data_len); 143 } 144 return total_bytes_consumed; 145 } 146 147 QuicHeaderId current_header_id = 148 session()->decompressor()->current_header_id(); 149 // Ensure that this header id looks sane. 150 if (headers_id_ < current_header_id || 151 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 152 DVLOG(1) << ENDPOINT 153 << "Invalid headers for stream: " << id() 154 << " header_id: " << headers_id_ 155 << " current_header_id: " << current_header_id; 156 session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 157 return total_bytes_consumed; 158 } 159 160 // If we are head-of-line blocked on decompression, then back up. 161 if (current_header_id != headers_id_) { 162 session()->MarkDecompressionBlocked(headers_id_, id()); 163 DVLOG(1) << ENDPOINT 164 << "Unable to decompress header data for stream: " << id() 165 << " header_id: " << headers_id_; 166 return total_bytes_consumed; 167 } 168 169 // Decompressed data will be delivered to decompressed_headers_. 170 size_t bytes_consumed = session()->decompressor()->DecompressData( 171 StringPiece(data, data_len), this); 172 DCHECK_NE(0u, bytes_consumed); 173 if (bytes_consumed > data_len) { 174 DCHECK(false) << "DecompressData returned illegal value"; 175 OnDecompressionError(); 176 return total_bytes_consumed; 177 } 178 total_bytes_consumed += bytes_consumed; 179 data += bytes_consumed; 180 data_len -= bytes_consumed; 181 182 if (decompression_failed_) { 183 // The session will have been closed in OnDecompressionError. 184 return total_bytes_consumed; 185 } 186 187 // Headers are complete if the decompressor has moved on to the 188 // next stream. 189 headers_decompressed_ = 190 session()->decompressor()->current_header_id() != headers_id_; 191 if (!headers_decompressed_) { 192 DCHECK_EQ(0u, data_len); 193 } 194 195 ProcessHeaderData(); 196 197 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 198 return total_bytes_consumed; 199 } 200 201 // We have processed all of the decompressed data but we might 202 // have some more raw data to process. 203 if (data_len > 0) { 204 total_bytes_consumed += ProcessData(data, data_len); 205 } 206 207 // The sequencer will push any additional buffered frames if this data 208 // has been completely consumed. 209 return total_bytes_consumed; 210} 211 212const IPEndPoint& QuicDataStream::GetPeerAddress() { 213 return session()->peer_address(); 214} 215 216QuicSpdyCompressor* QuicDataStream::compressor() { 217 return session()->compressor(); 218} 219 220bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { 221 return session()->GetSSLInfo(ssl_info); 222} 223 224uint32 QuicDataStream::ProcessHeaderData() { 225 if (decompressed_headers_.empty()) { 226 return 0; 227 } 228 229 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 230 decompressed_headers_.length()); 231 if (bytes_processed == decompressed_headers_.length()) { 232 decompressed_headers_.clear(); 233 } else { 234 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 235 } 236 return bytes_processed; 237} 238 239void QuicDataStream::OnDecompressorAvailable() { 240 DCHECK_EQ(headers_id_, 241 session()->decompressor()->current_header_id()); 242 DCHECK(!headers_decompressed_); 243 DCHECK(!decompression_failed_); 244 DCHECK_EQ(0u, decompressed_headers_.length()); 245 246 while (!headers_decompressed_) { 247 struct iovec iovec; 248 if (sequencer()->GetReadableRegions(&iovec, 1) == 0) { 249 return; 250 } 251 252 size_t bytes_consumed = session()->decompressor()->DecompressData( 253 StringPiece(static_cast<char*>(iovec.iov_base), 254 iovec.iov_len), 255 this); 256 DCHECK_LE(bytes_consumed, iovec.iov_len); 257 if (decompression_failed_) { 258 return; 259 } 260 sequencer()->MarkConsumed(bytes_consumed); 261 262 headers_decompressed_ = 263 session()->decompressor()->current_header_id() != headers_id_; 264 } 265 266 // Either the headers are complete, or the all data as been consumed. 267 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 268 if (IsDoneReading()) { 269 OnFinRead(); 270 } else if (FinishedReadingHeaders()) { 271 sequencer()->FlushBufferedFrames(); 272 } 273} 274 275bool QuicDataStream::OnDecompressedData(StringPiece data) { 276 data.AppendToString(&decompressed_headers_); 277 return true; 278} 279 280void QuicDataStream::OnDecompressionError() { 281 DCHECK(!decompression_failed_); 282 decompression_failed_ = true; 283 session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 284} 285 286void QuicDataStream::OnClose() { 287 ReliableQuicStream::OnClose(); 288 289 if (visitor_) { 290 Visitor* visitor = visitor_; 291 // Calling Visitor::OnClose() may result the destruction of the visitor, 292 // so we need to ensure we don't call it again. 293 visitor_ = NULL; 294 visitor->OnClose(this); 295 } 296} 297 298uint32 QuicDataStream::StripPriorityAndHeaderId( 299 const char* data, uint32 data_len) { 300 uint32 total_bytes_parsed = 0; 301 302 if (!priority_parsed_ && session()->connection()->is_server()) { 303 QuicPriority temporary_priority = priority_; 304 total_bytes_parsed = StripUint32( 305 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); 306 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { 307 priority_parsed_ = true; 308 309 // Spdy priorities are inverted, so the highest numerical value is the 310 // lowest legal priority. 311 if (temporary_priority > QuicUtils::LowestPriority()) { 312 session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); 313 return 0; 314 } 315 priority_ = temporary_priority; 316 } 317 data += total_bytes_parsed; 318 data_len -= total_bytes_parsed; 319 } 320 if (data_len > 0 && headers_id_ == 0u) { 321 // The headers ID has not yet been read. Strip it from the beginning of 322 // the data stream. 323 total_bytes_parsed += StripUint32( 324 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); 325 } 326 return total_bytes_parsed; 327} 328 329bool QuicDataStream::FinishedReadingHeaders() { 330 return headers_decompressed_ && decompressed_headers_.empty(); 331} 332 333} // namespace net 334