1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/spdy/spdy_write_queue.h"
6
7#include <cstddef>
8#include <cstring>
9#include <string>
10
11#include "base/basictypes.h"
12#include "base/memory/ref_counted.h"
13#include "base/memory/scoped_ptr.h"
14#include "base/strings/string_number_conversions.h"
15#include "net/base/net_log.h"
16#include "net/base/request_priority.h"
17#include "net/spdy/spdy_buffer_producer.h"
18#include "net/spdy/spdy_stream.h"
19#include "testing/gtest/include/gtest/gtest.h"
20#include "url/gurl.h"
21
22namespace net {
23
24namespace {
25
26using std::string;
27
28const char kOriginal[] = "original";
29const char kRequeued[] = "requeued";
30
31class SpdyWriteQueueTest : public ::testing::Test {};
32
33// Makes a SpdyFrameProducer producing a frame with the data in the
34// given string.
35scoped_ptr<SpdyBufferProducer> StringToProducer(const std::string& s) {
36  scoped_ptr<char[]> data(new char[s.size()]);
37  std::memcpy(data.get(), s.data(), s.size());
38  return scoped_ptr<SpdyBufferProducer>(
39      new SimpleBufferProducer(
40          scoped_ptr<SpdyBuffer>(
41              new SpdyBuffer(
42                  scoped_ptr<SpdyFrame>(
43                      new SpdyFrame(data.release(), s.size(), true))))));
44}
45
46// Makes a SpdyBufferProducer producing a frame with the data in the
47// given int (converted to a string).
48scoped_ptr<SpdyBufferProducer> IntToProducer(int i) {
49  return StringToProducer(base::IntToString(i));
50}
51
52// Producer whose produced buffer will enqueue yet another buffer into the
53// SpdyWriteQueue upon destruction.
54class RequeingBufferProducer : public SpdyBufferProducer {
55 public:
56  RequeingBufferProducer(SpdyWriteQueue* queue) {
57    buffer_.reset(new SpdyBuffer(kOriginal, arraysize(kOriginal)));
58    buffer_->AddConsumeCallback(
59        base::Bind(RequeingBufferProducer::ConsumeCallback, queue));
60  }
61
62  virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
63    return buffer_.Pass();
64  }
65
66  static void ConsumeCallback(SpdyWriteQueue* queue,
67                              size_t size,
68                              SpdyBuffer::ConsumeSource source) {
69    scoped_ptr<SpdyBufferProducer> producer(
70        new SimpleBufferProducer(scoped_ptr<SpdyBuffer>(
71            new SpdyBuffer(kRequeued, arraysize(kRequeued)))));
72
73    queue->Enqueue(
74        MEDIUM, RST_STREAM, producer.Pass(), base::WeakPtr<SpdyStream>());
75  }
76
77 private:
78  scoped_ptr<SpdyBuffer> buffer_;
79};
80
81// Produces a frame with the given producer and returns a copy of its
82// data as a string.
83std::string ProducerToString(scoped_ptr<SpdyBufferProducer> producer) {
84  scoped_ptr<SpdyBuffer> buffer = producer->ProduceBuffer();
85  return std::string(buffer->GetRemainingData(), buffer->GetRemainingSize());
86}
87
88// Produces a frame with the given producer and returns a copy of its
89// data as an int (converted from a string).
90int ProducerToInt(scoped_ptr<SpdyBufferProducer> producer) {
91  int i = 0;
92  EXPECT_TRUE(base::StringToInt(ProducerToString(producer.Pass()), &i));
93  return i;
94}
95
96// Makes a SpdyStream with the given priority and a NULL SpdySession
97// -- be careful to not call any functions that expect the session to
98// be there.
99SpdyStream* MakeTestStream(RequestPriority priority) {
100  return new SpdyStream(
101      SPDY_BIDIRECTIONAL_STREAM, base::WeakPtr<SpdySession>(),
102      GURL(), priority, 0, 0, BoundNetLog());
103}
104
105// Add some frame producers of different priority. The producers
106// should be dequeued in priority order with their associated stream.
107TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
108  SpdyWriteQueue write_queue;
109
110  scoped_ptr<SpdyBufferProducer> producer_low = StringToProducer("LOW");
111  scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM");
112  scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST");
113
114  scoped_ptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM));
115  scoped_ptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST));
116
117  // A NULL stream should still work.
118  write_queue.Enqueue(
119      LOW, SYN_STREAM, producer_low.Pass(), base::WeakPtr<SpdyStream>());
120  write_queue.Enqueue(
121      MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium->GetWeakPtr());
122  write_queue.Enqueue(
123      HIGHEST, RST_STREAM, producer_highest.Pass(),
124      stream_highest->GetWeakPtr());
125
126  SpdyFrameType frame_type = DATA;
127  scoped_ptr<SpdyBufferProducer> frame_producer;
128  base::WeakPtr<SpdyStream> stream;
129  ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
130  EXPECT_EQ(RST_STREAM, frame_type);
131  EXPECT_EQ("HIGHEST", ProducerToString(frame_producer.Pass()));
132  EXPECT_EQ(stream_highest, stream.get());
133
134  ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
135  EXPECT_EQ(SYN_REPLY, frame_type);
136  EXPECT_EQ("MEDIUM", ProducerToString(frame_producer.Pass()));
137  EXPECT_EQ(stream_medium, stream.get());
138
139  ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
140  EXPECT_EQ(SYN_STREAM, frame_type);
141  EXPECT_EQ("LOW", ProducerToString(frame_producer.Pass()));
142  EXPECT_EQ(NULL, stream.get());
143
144  EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
145}
146
147// Add some frame producers with the same priority. The producers
148// should be dequeued in FIFO order with their associated stream.
149TEST_F(SpdyWriteQueueTest, DequeuesFIFO) {
150  SpdyWriteQueue write_queue;
151
152  scoped_ptr<SpdyBufferProducer> producer1 = IntToProducer(1);
153  scoped_ptr<SpdyBufferProducer> producer2 = IntToProducer(2);
154  scoped_ptr<SpdyBufferProducer> producer3 = IntToProducer(3);
155
156  scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
157  scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
158  scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY));
159
160  write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(),
161                      stream1->GetWeakPtr());
162  write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(),
163                      stream2->GetWeakPtr());
164  write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(),
165                      stream3->GetWeakPtr());
166
167  SpdyFrameType frame_type = DATA;
168  scoped_ptr<SpdyBufferProducer> frame_producer;
169  base::WeakPtr<SpdyStream> stream;
170  ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
171  EXPECT_EQ(SYN_STREAM, frame_type);
172  EXPECT_EQ(1, ProducerToInt(frame_producer.Pass()));
173  EXPECT_EQ(stream1, stream.get());
174
175  ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
176  EXPECT_EQ(SYN_REPLY, frame_type);
177  EXPECT_EQ(2, ProducerToInt(frame_producer.Pass()));
178  EXPECT_EQ(stream2, stream.get());
179
180  ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
181  EXPECT_EQ(RST_STREAM, frame_type);
182  EXPECT_EQ(3, ProducerToInt(frame_producer.Pass()));
183  EXPECT_EQ(stream3, stream.get());
184
185  EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
186}
187
188// Enqueue a bunch of writes and then call
189// RemovePendingWritesForStream() on one of the streams. No dequeued
190// write should be for that stream.
191TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
192  SpdyWriteQueue write_queue;
193
194  scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
195  scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
196
197  for (int i = 0; i < 100; ++i) {
198    base::WeakPtr<SpdyStream> stream =
199        (((i % 3) == 0) ? stream1 : stream2)->GetWeakPtr();
200    write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream);
201  }
202
203  write_queue.RemovePendingWritesForStream(stream2->GetWeakPtr());
204
205  for (int i = 0; i < 100; i += 3) {
206    SpdyFrameType frame_type = DATA;
207    scoped_ptr<SpdyBufferProducer> frame_producer;
208    base::WeakPtr<SpdyStream> stream;
209    ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
210    EXPECT_EQ(SYN_STREAM, frame_type);
211    EXPECT_EQ(i, ProducerToInt(frame_producer.Pass()));
212    EXPECT_EQ(stream1, stream.get());
213  }
214
215  SpdyFrameType frame_type = DATA;
216  scoped_ptr<SpdyBufferProducer> frame_producer;
217  base::WeakPtr<SpdyStream> stream;
218  EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
219}
220
221// Enqueue a bunch of writes and then call
222// RemovePendingWritesForStreamsAfter(). No dequeued write should be for
223// those streams without a stream id, or with a stream_id after that
224// argument.
225TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) {
226  SpdyWriteQueue write_queue;
227
228  scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
229  stream1->set_stream_id(1);
230  scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
231  stream2->set_stream_id(3);
232  scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY));
233  stream3->set_stream_id(5);
234  // No stream id assigned.
235  scoped_ptr<SpdyStream> stream4(MakeTestStream(DEFAULT_PRIORITY));
236  base::WeakPtr<SpdyStream> streams[] = {
237    stream1->GetWeakPtr(), stream2->GetWeakPtr(),
238    stream3->GetWeakPtr(), stream4->GetWeakPtr()
239  };
240
241  for (int i = 0; i < 100; ++i) {
242    write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
243                        streams[i % arraysize(streams)]);
244  }
245
246  write_queue.RemovePendingWritesForStreamsAfter(stream1->stream_id());
247
248  for (int i = 0; i < 100; i += arraysize(streams)) {
249    SpdyFrameType frame_type = DATA;
250    scoped_ptr<SpdyBufferProducer> frame_producer;
251    base::WeakPtr<SpdyStream> stream;
252    ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream))
253        << "Unable to Dequeue i: " << i;
254    EXPECT_EQ(SYN_STREAM, frame_type);
255    EXPECT_EQ(i, ProducerToInt(frame_producer.Pass()));
256    EXPECT_EQ(stream1, stream.get());
257  }
258
259  SpdyFrameType frame_type = DATA;
260  scoped_ptr<SpdyBufferProducer> frame_producer;
261  base::WeakPtr<SpdyStream> stream;
262  EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
263}
264
265// Enqueue a bunch of writes and then call Clear(). The write queue
266// should clean up the memory properly, and Dequeue() should return
267// false.
268TEST_F(SpdyWriteQueueTest, Clear) {
269  SpdyWriteQueue write_queue;
270
271  for (int i = 0; i < 100; ++i) {
272    write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
273                        base::WeakPtr<SpdyStream>());
274  }
275
276  write_queue.Clear();
277
278  SpdyFrameType frame_type = DATA;
279  scoped_ptr<SpdyBufferProducer> frame_producer;
280  base::WeakPtr<SpdyStream> stream;
281  EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
282}
283
284TEST_F(SpdyWriteQueueTest, RequeingProducerWithoutReentrance) {
285  SpdyWriteQueue queue;
286  queue.Enqueue(
287      DEFAULT_PRIORITY,
288      SYN_STREAM,
289      scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
290      base::WeakPtr<SpdyStream>());
291  {
292    SpdyFrameType frame_type;
293    scoped_ptr<SpdyBufferProducer> producer;
294    base::WeakPtr<SpdyStream> stream;
295
296    EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream));
297    EXPECT_TRUE(queue.IsEmpty());
298    EXPECT_EQ(string(kOriginal), producer->ProduceBuffer()->GetRemainingData());
299  }
300  // |producer| was destroyed, and a buffer is re-queued.
301  EXPECT_FALSE(queue.IsEmpty());
302
303  SpdyFrameType frame_type;
304  scoped_ptr<SpdyBufferProducer> producer;
305  base::WeakPtr<SpdyStream> stream;
306
307  EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream));
308  EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
309}
310
311TEST_F(SpdyWriteQueueTest, ReentranceOnClear) {
312  SpdyWriteQueue queue;
313  queue.Enqueue(
314      DEFAULT_PRIORITY,
315      SYN_STREAM,
316      scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
317      base::WeakPtr<SpdyStream>());
318
319  queue.Clear();
320  EXPECT_FALSE(queue.IsEmpty());
321
322  SpdyFrameType frame_type;
323  scoped_ptr<SpdyBufferProducer> producer;
324  base::WeakPtr<SpdyStream> stream;
325
326  EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &stream));
327  EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
328}
329
330TEST_F(SpdyWriteQueueTest, ReentranceOnRemovePendingWritesAfter) {
331  scoped_ptr<SpdyStream> stream(MakeTestStream(DEFAULT_PRIORITY));
332  stream->set_stream_id(2);
333
334  SpdyWriteQueue queue;
335  queue.Enqueue(
336      DEFAULT_PRIORITY,
337      SYN_STREAM,
338      scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
339      stream->GetWeakPtr());
340
341  queue.RemovePendingWritesForStreamsAfter(1);
342  EXPECT_FALSE(queue.IsEmpty());
343
344  SpdyFrameType frame_type;
345  scoped_ptr<SpdyBufferProducer> producer;
346  base::WeakPtr<SpdyStream> weak_stream;
347
348  EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &weak_stream));
349  EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
350}
351
352TEST_F(SpdyWriteQueueTest, ReentranceOnRemovePendingWritesForStream) {
353  scoped_ptr<SpdyStream> stream(MakeTestStream(DEFAULT_PRIORITY));
354  stream->set_stream_id(2);
355
356  SpdyWriteQueue queue;
357  queue.Enqueue(
358      DEFAULT_PRIORITY,
359      SYN_STREAM,
360      scoped_ptr<SpdyBufferProducer>(new RequeingBufferProducer(&queue)),
361      stream->GetWeakPtr());
362
363  queue.RemovePendingWritesForStream(stream->GetWeakPtr());
364  EXPECT_FALSE(queue.IsEmpty());
365
366  SpdyFrameType frame_type;
367  scoped_ptr<SpdyBufferProducer> producer;
368  base::WeakPtr<SpdyStream> weak_stream;
369
370  EXPECT_TRUE(queue.Dequeue(&frame_type, &producer, &weak_stream));
371  EXPECT_EQ(string(kRequeued), producer->ProduceBuffer()->GetRemainingData());
372}
373
374}  // namespace
375
376}  // namespace net
377