1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_data_stream.h"
6
7#include "base/logging.h"
8#include "net/quic/quic_session.h"
9#include "net/quic/quic_spdy_decompressor.h"
10#include "net/spdy/write_blocked_list.h"
11
12using base::StringPiece;
13using std::min;
14
15namespace net {
16
17#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
18
19namespace {
20
21// This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
22// to set a priority client-side, or cancel a stream before stripping the
23// priority from the wire server-side.  In either case, start out with a
24// priority in the middle.
25QuicPriority kDefaultPriority = 3;
26
27// Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
28// reaches 4 bytes, copies the data into 'result' and clears
29// partial_data_buffer.
30// Returns the number of bytes consumed.
31uint32 StripUint32(const char* data, uint32 data_len,
32                   string* partial_data_buffer,
33                   uint32* result) {
34  DCHECK_GT(4u, partial_data_buffer->length());
35  size_t missing_size = 4 - partial_data_buffer->length();
36  if (data_len < missing_size) {
37    StringPiece(data, data_len).AppendToString(partial_data_buffer);
38    return data_len;
39  }
40  StringPiece(data, missing_size).AppendToString(partial_data_buffer);
41  DCHECK_EQ(4u, partial_data_buffer->length());
42  memcpy(result, partial_data_buffer->data(), 4);
43  partial_data_buffer->clear();
44  return missing_size;
45}
46
47}  // namespace
48
49QuicDataStream::QuicDataStream(QuicStreamId id,
50                               QuicSession* session)
51    : ReliableQuicStream(id, session),
52      visitor_(NULL),
53      headers_decompressed_(false),
54      priority_(kDefaultPriority),
55      headers_id_(0),
56      decompression_failed_(false),
57      priority_parsed_(false) {
58  DCHECK_NE(kCryptoStreamId, id);
59}
60
61QuicDataStream::~QuicDataStream() {
62}
63
64size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) {
65  if (FinishedReadingHeaders()) {
66    // If the headers have been read, simply delegate to the sequencer's
67    // Readv method.
68    return sequencer()->Readv(iov, iov_len);
69  }
70  // Otherwise, copy decompressed header data into |iov|.
71  size_t bytes_consumed = 0;
72  size_t iov_index = 0;
73  while (iov_index < iov_len &&
74         decompressed_headers_.length() > bytes_consumed) {
75    size_t bytes_to_read = min(iov[iov_index].iov_len,
76                               decompressed_headers_.length() - bytes_consumed);
77    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
78    memcpy(iov_ptr,
79           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
80    bytes_consumed += bytes_to_read;
81    ++iov_index;
82  }
83  decompressed_headers_.erase(0, bytes_consumed);
84  return bytes_consumed;
85}
86
87int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) {
88  if (FinishedReadingHeaders()) {
89    return sequencer()->GetReadableRegions(iov, iov_len);
90  }
91  if (iov_len == 0) {
92    return 0;
93  }
94  iov[0].iov_base = static_cast<void*>(
95      const_cast<char*>(decompressed_headers_.data()));
96  iov[0].iov_len = decompressed_headers_.length();
97  return 1;
98}
99
100bool QuicDataStream::IsDoneReading() const {
101  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
102    return false;
103  }
104  return sequencer()->IsClosed();
105}
106
107bool QuicDataStream::HasBytesToRead() const {
108  return !decompressed_headers_.empty() || sequencer()->HasBytesToRead();
109}
110
111void QuicDataStream::set_priority(QuicPriority priority) {
112  DCHECK_EQ(0u, stream_bytes_written());
113  priority_ = priority;
114}
115
116QuicPriority QuicDataStream::EffectivePriority() const {
117  return priority();
118}
119
120uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
121  DCHECK_NE(0u, data_len);
122
123  uint32 total_bytes_consumed = 0;
124  if (headers_id_ == 0u) {
125    total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
126    data += total_bytes_consumed;
127    data_len -= total_bytes_consumed;
128    if (data_len == 0 || total_bytes_consumed == 0) {
129      return total_bytes_consumed;
130    }
131  }
132  DCHECK_NE(0u, headers_id_);
133
134  // Once the headers are finished, we simply pass the data through.
135  if (headers_decompressed_) {
136    // Some buffered header data remains.
137    if (!decompressed_headers_.empty()) {
138      ProcessHeaderData();
139    }
140    if (decompressed_headers_.empty()) {
141      DVLOG(1) << "Delegating procesing to ProcessData";
142      total_bytes_consumed += ProcessData(data, data_len);
143    }
144    return total_bytes_consumed;
145  }
146
147  QuicHeaderId current_header_id =
148      session()->decompressor()->current_header_id();
149  // Ensure that this header id looks sane.
150  if (headers_id_ < current_header_id ||
151      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
152    DVLOG(1) << ENDPOINT
153             << "Invalid headers for stream: " << id()
154             << " header_id: " << headers_id_
155             << " current_header_id: " << current_header_id;
156    session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
157    return total_bytes_consumed;
158  }
159
160  // If we are head-of-line blocked on decompression, then back up.
161  if (current_header_id != headers_id_) {
162    session()->MarkDecompressionBlocked(headers_id_, id());
163    DVLOG(1) << ENDPOINT
164             << "Unable to decompress header data for stream: " << id()
165             << " header_id: " << headers_id_;
166    return total_bytes_consumed;
167  }
168
169  // Decompressed data will be delivered to decompressed_headers_.
170  size_t bytes_consumed = session()->decompressor()->DecompressData(
171      StringPiece(data, data_len), this);
172  DCHECK_NE(0u, bytes_consumed);
173  if (bytes_consumed > data_len) {
174    DCHECK(false) << "DecompressData returned illegal value";
175    OnDecompressionError();
176    return total_bytes_consumed;
177  }
178  total_bytes_consumed += bytes_consumed;
179  data += bytes_consumed;
180  data_len -= bytes_consumed;
181
182  if (decompression_failed_) {
183    // The session will have been closed in OnDecompressionError.
184    return total_bytes_consumed;
185  }
186
187  // Headers are complete if the decompressor has moved on to the
188  // next stream.
189  headers_decompressed_ =
190      session()->decompressor()->current_header_id() != headers_id_;
191  if (!headers_decompressed_) {
192    DCHECK_EQ(0u, data_len);
193  }
194
195  ProcessHeaderData();
196
197  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
198    return total_bytes_consumed;
199  }
200
201  // We have processed all of the decompressed data but we might
202  // have some more raw data to process.
203  if (data_len > 0) {
204    total_bytes_consumed += ProcessData(data, data_len);
205  }
206
207  // The sequencer will push any additional buffered frames if this data
208  // has been completely consumed.
209  return total_bytes_consumed;
210}
211
212const IPEndPoint& QuicDataStream::GetPeerAddress() {
213  return session()->peer_address();
214}
215
216QuicSpdyCompressor* QuicDataStream::compressor() {
217  return session()->compressor();
218}
219
220bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
221  return session()->GetSSLInfo(ssl_info);
222}
223
224uint32 QuicDataStream::ProcessHeaderData() {
225  if (decompressed_headers_.empty()) {
226    return 0;
227  }
228
229  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
230                                       decompressed_headers_.length());
231  if (bytes_processed == decompressed_headers_.length()) {
232    decompressed_headers_.clear();
233  } else {
234    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
235  }
236  return bytes_processed;
237}
238
239void QuicDataStream::OnDecompressorAvailable() {
240  DCHECK_EQ(headers_id_,
241            session()->decompressor()->current_header_id());
242  DCHECK(!headers_decompressed_);
243  DCHECK(!decompression_failed_);
244  DCHECK_EQ(0u, decompressed_headers_.length());
245
246  while (!headers_decompressed_) {
247    struct iovec iovec;
248    if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
249      return;
250    }
251
252    size_t bytes_consumed = session()->decompressor()->DecompressData(
253        StringPiece(static_cast<char*>(iovec.iov_base),
254                    iovec.iov_len),
255        this);
256    DCHECK_LE(bytes_consumed, iovec.iov_len);
257    if (decompression_failed_) {
258      return;
259    }
260    sequencer()->MarkConsumed(bytes_consumed);
261
262    headers_decompressed_ =
263        session()->decompressor()->current_header_id() != headers_id_;
264  }
265
266  // Either the headers are complete, or the all data as been consumed.
267  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
268  if (IsDoneReading()) {
269    OnFinRead();
270  } else if (FinishedReadingHeaders()) {
271    sequencer()->FlushBufferedFrames();
272  }
273}
274
275bool QuicDataStream::OnDecompressedData(StringPiece data) {
276  data.AppendToString(&decompressed_headers_);
277  return true;
278}
279
280void QuicDataStream::OnDecompressionError() {
281  DCHECK(!decompression_failed_);
282  decompression_failed_ = true;
283  session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
284}
285
286void QuicDataStream::OnClose() {
287  ReliableQuicStream::OnClose();
288
289  if (visitor_) {
290    Visitor* visitor = visitor_;
291    // Calling Visitor::OnClose() may result the destruction of the visitor,
292    // so we need to ensure we don't call it again.
293    visitor_ = NULL;
294    visitor->OnClose(this);
295  }
296}
297
298uint32 QuicDataStream::StripPriorityAndHeaderId(
299    const char* data, uint32 data_len) {
300  uint32 total_bytes_parsed = 0;
301
302  if (!priority_parsed_ && session()->connection()->is_server()) {
303    QuicPriority temporary_priority = priority_;
304    total_bytes_parsed = StripUint32(
305        data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
306    if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
307      priority_parsed_ = true;
308
309      // Spdy priorities are inverted, so the highest numerical value is the
310      // lowest legal priority.
311      if (temporary_priority > QuicUtils::LowestPriority()) {
312        session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
313        return 0;
314      }
315      priority_ = temporary_priority;
316    }
317    data += total_bytes_parsed;
318    data_len -= total_bytes_parsed;
319  }
320  if (data_len > 0 && headers_id_ == 0u) {
321    // The headers ID has not yet been read.  Strip it from the beginning of
322    // the data stream.
323    total_bytes_parsed += StripUint32(
324        data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
325  }
326  return total_bytes_parsed;
327}
328
329bool QuicDataStream::FinishedReadingHeaders() {
330  return headers_decompressed_ && decompressed_headers_.empty();
331}
332
333}  // namespace net
334