1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/quic/quic_stream_sequencer.h"
6
7#include <algorithm>
8#include <limits>
9
10#include "base/logging.h"
11#include "base/metrics/sparse_histogram.h"
12#include "net/quic/reliable_quic_stream.h"
13
14using std::make_pair;
15using std::min;
16using std::numeric_limits;
17
18namespace net {
19
20QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
21    : stream_(quic_stream),
22      num_bytes_consumed_(0),
23      close_offset_(numeric_limits<QuicStreamOffset>::max()),
24      blocked_(false),
25      num_bytes_buffered_(0),
26      num_frames_received_(0),
27      num_duplicate_frames_received_(0) {
28}
29
30QuicStreamSequencer::~QuicStreamSequencer() {
31}
32
33void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
34  ++num_frames_received_;
35  if (IsDuplicate(frame)) {
36    ++num_duplicate_frames_received_;
37    // Silently ignore duplicates.
38    return;
39  }
40
41  if (FrameOverlapsBufferedData(frame)) {
42    stream_->CloseConnectionWithDetails(
43        QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
44    return;
45  }
46
47  QuicStreamOffset byte_offset = frame.offset;
48  size_t data_len = frame.data.TotalBufferSize();
49  if (data_len == 0 && !frame.fin) {
50    // Stream frames must have data or a fin flag.
51    stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
52                                        "Empty stream frame without FIN set.");
53    return;
54  }
55
56  if (frame.fin) {
57    CloseStreamAtOffset(frame.offset + data_len);
58    if (data_len == 0) {
59      return;
60    }
61  }
62
63  IOVector data;
64  data.AppendIovec(frame.data.iovec(), frame.data.Size());
65
66  // If the frame has arrived in-order then we can process it immediately, only
67  // buffering if the stream is unable to process it.
68  if (!blocked_ && byte_offset == num_bytes_consumed_) {
69    DVLOG(1) << "Processing byte offset " << byte_offset;
70    size_t bytes_consumed = 0;
71    for (size_t i = 0; i < data.Size(); ++i) {
72      bytes_consumed += stream_->ProcessRawData(
73          static_cast<char*>(data.iovec()[i].iov_base),
74          data.iovec()[i].iov_len);
75    }
76    num_bytes_consumed_ += bytes_consumed;
77    stream_->AddBytesConsumed(bytes_consumed);
78
79    if (MaybeCloseStream()) {
80      return;
81    }
82    if (bytes_consumed > data_len) {
83      stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
84      return;
85    } else if (bytes_consumed == data_len) {
86      FlushBufferedFrames();
87      return;  // it's safe to ack this frame.
88    } else {
89      // Set ourselves up to buffer what's left.
90      data_len -= bytes_consumed;
91      data.Consume(bytes_consumed);
92      byte_offset += bytes_consumed;
93    }
94  }
95
96  // Buffer any remaining data to be consumed by the stream when ready.
97  for (size_t i = 0; i < data.Size(); ++i) {
98    DVLOG(1) << "Buffering stream data at offset " << byte_offset;
99    const iovec& iov = data.iovec()[i];
100    buffered_frames_.insert(make_pair(
101        byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
102    byte_offset += iov.iov_len;
103    num_bytes_buffered_ += iov.iov_len;
104  }
105  return;
106}
107
108void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
109  const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
110
111  // If we have a scheduled termination or close, any new offset should match
112  // it.
113  if (close_offset_ != kMaxOffset && offset != close_offset_) {
114    stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
115    return;
116  }
117
118  close_offset_ = offset;
119
120  MaybeCloseStream();
121}
122
123bool QuicStreamSequencer::MaybeCloseStream() {
124  if (!blocked_ && IsClosed()) {
125    DVLOG(1) << "Passing up termination, as we've processed "
126             << num_bytes_consumed_ << " of " << close_offset_
127             << " bytes.";
128    // Technically it's an error if num_bytes_consumed isn't exactly
129    // equal, but error handling seems silly at this point.
130    stream_->OnFinRead();
131    buffered_frames_.clear();
132    num_bytes_buffered_ = 0;
133    return true;
134  }
135  return false;
136}
137
138int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
139  DCHECK(!blocked_);
140  FrameMap::iterator it = buffered_frames_.begin();
141  size_t index = 0;
142  QuicStreamOffset offset = num_bytes_consumed_;
143  while (it != buffered_frames_.end() && index < iov_len) {
144    if (it->first != offset) return index;
145
146    iov[index].iov_base = static_cast<void*>(
147        const_cast<char*>(it->second.data()));
148    iov[index].iov_len = it->second.size();
149    offset += it->second.size();
150
151    ++index;
152    ++it;
153  }
154  return index;
155}
156
157int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
158  DCHECK(!blocked_);
159  FrameMap::iterator it = buffered_frames_.begin();
160  size_t iov_index = 0;
161  size_t iov_offset = 0;
162  size_t frame_offset = 0;
163  size_t initial_bytes_consumed = num_bytes_consumed_;
164
165  while (iov_index < iov_len &&
166         it != buffered_frames_.end() &&
167         it->first == num_bytes_consumed_) {
168    int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
169                            it->second.size() - frame_offset);
170
171    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
172    memcpy(iov_ptr,
173           it->second.data() + frame_offset, bytes_to_read);
174    frame_offset += bytes_to_read;
175    iov_offset += bytes_to_read;
176
177    if (iov[iov_index].iov_len == iov_offset) {
178      // We've filled this buffer.
179      iov_offset = 0;
180      ++iov_index;
181    }
182    if (it->second.size() == frame_offset) {
183      // We've copied this whole frame
184      RecordBytesConsumed(it->second.size());
185      buffered_frames_.erase(it);
186      it = buffered_frames_.begin();
187      frame_offset = 0;
188    }
189  }
190  // We've finished copying.  If we have a partial frame, update it.
191  if (frame_offset != 0) {
192    buffered_frames_.insert(
193        make_pair(it->first + frame_offset, it->second.substr(frame_offset)));
194    buffered_frames_.erase(buffered_frames_.begin());
195    RecordBytesConsumed(frame_offset);
196  }
197  return num_bytes_consumed_ - initial_bytes_consumed;
198}
199
200bool QuicStreamSequencer::HasBytesToRead() const {
201  FrameMap::const_iterator it = buffered_frames_.begin();
202
203  return it != buffered_frames_.end() && it->first == num_bytes_consumed_;
204}
205
206bool QuicStreamSequencer::IsClosed() const {
207  return num_bytes_consumed_ >= close_offset_;
208}
209
210bool QuicStreamSequencer::FrameOverlapsBufferedData(
211    const QuicStreamFrame& frame) const {
212  if (buffered_frames_.empty()) {
213    return false;
214  }
215
216  FrameMap::const_iterator next_frame =
217      buffered_frames_.lower_bound(frame.offset);
218  // Duplicate frames should have been dropped in IsDuplicate.
219  DCHECK(next_frame == buffered_frames_.end() ||
220         next_frame->first != frame.offset);
221
222  // If there is a buffered frame with a higher starting offset, then we check
223  // to see if the new frame runs into the higher frame.
224  if (next_frame != buffered_frames_.end() &&
225      (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) {
226    DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
227             << frame.data.TotalBufferSize() << " > " << next_frame->first;
228    return true;
229  }
230
231  // If there is a buffered frame with a lower starting offset, then we check
232  // to see if the buffered frame runs into the new frame.
233  if (next_frame != buffered_frames_.begin()) {
234    FrameMap::const_iterator preceeding_frame = --next_frame;
235    QuicStreamOffset offset = preceeding_frame->first;
236    uint64 data_length = preceeding_frame->second.length();
237    if ((offset + data_length) > frame.offset) {
238      DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + "
239               << data_length << " > " << frame.offset;
240      return true;
241    }
242  }
243  return false;
244}
245
246bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
247  // A frame is duplicate if the frame offset is smaller than our bytes consumed
248  // or we have stored the frame in our map.
249  // TODO(pwestin): Is it possible that a new frame contain more data even if
250  // the offset is the same?
251  return frame.offset < num_bytes_consumed_ ||
252      buffered_frames_.find(frame.offset) != buffered_frames_.end();
253}
254
255void QuicStreamSequencer::SetBlockedUntilFlush() {
256  blocked_ = true;
257}
258
259void QuicStreamSequencer::FlushBufferedFrames() {
260  blocked_ = false;
261  FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_);
262  while (it != buffered_frames_.end()) {
263    DVLOG(1) << "Flushing buffered packet at offset " << it->first;
264    string* data = &it->second;
265    size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
266                                                    data->size());
267    RecordBytesConsumed(bytes_consumed);
268    if (MaybeCloseStream()) {
269      return;
270    }
271    if (bytes_consumed > data->size()) {
272      stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);  // Programming error
273      return;
274    } else if (bytes_consumed == data->size()) {
275      buffered_frames_.erase(it);
276      it = buffered_frames_.find(num_bytes_consumed_);
277    } else {
278      string new_data = it->second.substr(bytes_consumed);
279      buffered_frames_.erase(it);
280      buffered_frames_.insert(make_pair(num_bytes_consumed_, new_data));
281      return;
282    }
283  }
284  MaybeCloseStream();
285}
286
287void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
288  num_bytes_consumed_ += bytes_consumed;
289  num_bytes_buffered_ -= bytes_consumed;
290
291  stream_->AddBytesConsumed(bytes_consumed);
292}
293
294}  // namespace net
295