1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_write_queue.h"
6
7#include <cstddef>
8#include <vector>
9
10#include "base/logging.h"
11#include "base/stl_util.h"
12#include "net/spdy/spdy_buffer.h"
13#include "net/spdy/spdy_buffer_producer.h"
14#include "net/spdy/spdy_stream.h"
15
16namespace net {
17
18SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {}
19
20SpdyWriteQueue::PendingWrite::PendingWrite(
21    SpdyFrameType frame_type,
22    SpdyBufferProducer* frame_producer,
23    const base::WeakPtr<SpdyStream>& stream)
24    : frame_type(frame_type),
25      frame_producer(frame_producer),
26      stream(stream),
27      has_stream(stream.get() != NULL) {}
28
29SpdyWriteQueue::PendingWrite::~PendingWrite() {}
30
31SpdyWriteQueue::SpdyWriteQueue() : removing_writes_(false) {}
32
33SpdyWriteQueue::~SpdyWriteQueue() {
34  Clear();
35}
36
37bool SpdyWriteQueue::IsEmpty() const {
38  for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
39    if (!queue_[i].empty())
40      return false;
41  }
42  return true;
43}
44
45void SpdyWriteQueue::Enqueue(RequestPriority priority,
46                             SpdyFrameType frame_type,
47                             scoped_ptr<SpdyBufferProducer> frame_producer,
48                             const base::WeakPtr<SpdyStream>& stream) {
49  CHECK(!removing_writes_);
50  CHECK_GE(priority, MINIMUM_PRIORITY);
51  CHECK_LE(priority, MAXIMUM_PRIORITY);
52  if (stream.get())
53    DCHECK_EQ(stream->priority(), priority);
54  queue_[priority].push_back(
55      PendingWrite(frame_type, frame_producer.release(), stream));
56}
57
58bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
59                             scoped_ptr<SpdyBufferProducer>* frame_producer,
60                             base::WeakPtr<SpdyStream>* stream) {
61  CHECK(!removing_writes_);
62  for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
63    if (!queue_[i].empty()) {
64      PendingWrite pending_write = queue_[i].front();
65      queue_[i].pop_front();
66      *frame_type = pending_write.frame_type;
67      frame_producer->reset(pending_write.frame_producer);
68      *stream = pending_write.stream;
69      if (pending_write.has_stream)
70        DCHECK(stream->get());
71      return true;
72    }
73  }
74  return false;
75}
76
77void SpdyWriteQueue::RemovePendingWritesForStream(
78    const base::WeakPtr<SpdyStream>& stream) {
79  CHECK(!removing_writes_);
80  removing_writes_ = true;
81  RequestPriority priority = stream->priority();
82  CHECK_GE(priority, MINIMUM_PRIORITY);
83  CHECK_LE(priority, MAXIMUM_PRIORITY);
84
85  DCHECK(stream.get());
86#if DCHECK_IS_ON
87  // |stream| should not have pending writes in a queue not matching
88  // its priority.
89  for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
90    if (priority == i)
91      continue;
92    for (std::deque<PendingWrite>::const_iterator it = queue_[i].begin();
93         it != queue_[i].end(); ++it) {
94      DCHECK_NE(it->stream.get(), stream.get());
95    }
96  }
97#endif
98
99  // Defer deletion until queue iteration is complete, as
100  // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
101  std::vector<SpdyBufferProducer*> erased_buffer_producers;
102
103  // Do the actual deletion and removal, preserving FIFO-ness.
104  std::deque<PendingWrite>* queue = &queue_[priority];
105  std::deque<PendingWrite>::iterator out_it = queue->begin();
106  for (std::deque<PendingWrite>::const_iterator it = queue->begin();
107       it != queue->end(); ++it) {
108    if (it->stream.get() == stream.get()) {
109      erased_buffer_producers.push_back(it->frame_producer);
110    } else {
111      *out_it = *it;
112      ++out_it;
113    }
114  }
115  queue->erase(out_it, queue->end());
116  removing_writes_ = false;
117  STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
118}
119
120void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
121    SpdyStreamId last_good_stream_id) {
122  CHECK(!removing_writes_);
123  removing_writes_ = true;
124  std::vector<SpdyBufferProducer*> erased_buffer_producers;
125
126  for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
127    // Do the actual deletion and removal, preserving FIFO-ness.
128    std::deque<PendingWrite>* queue = &queue_[i];
129    std::deque<PendingWrite>::iterator out_it = queue->begin();
130    for (std::deque<PendingWrite>::const_iterator it = queue->begin();
131         it != queue->end(); ++it) {
132      if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
133                               it->stream->stream_id() == 0)) {
134        erased_buffer_producers.push_back(it->frame_producer);
135      } else {
136        *out_it = *it;
137        ++out_it;
138      }
139    }
140    queue->erase(out_it, queue->end());
141  }
142  removing_writes_ = false;
143  STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
144}
145
146void SpdyWriteQueue::Clear() {
147  CHECK(!removing_writes_);
148  removing_writes_ = true;
149  std::vector<SpdyBufferProducer*> erased_buffer_producers;
150
151  for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
152    for (std::deque<PendingWrite>::iterator it = queue_[i].begin();
153         it != queue_[i].end(); ++it) {
154      erased_buffer_producers.push_back(it->frame_producer);
155    }
156    queue_[i].clear();
157  }
158  removing_writes_ = false;
159  STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
160}
161
162}  // namespace net
163