1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/basictypes.h"
6#include "base/bind.h"
7#include "base/memory/scoped_ptr.h"
8#include "base/synchronization/waitable_event.h"
9#include "base/threading/thread.h"
10#include "chromecast/media/cma/ipc/media_memory_chunk.h"
11#include "chromecast/media/cma/ipc/media_message.h"
12#include "chromecast/media/cma/ipc/media_message_fifo.h"
13#include "chromecast/media/cma/ipc/media_message_type.h"
14#include "testing/gtest/include/gtest/gtest.h"
15
16namespace chromecast {
17namespace media {
18
19namespace {
20
21class FifoMemoryChunk : public MediaMemoryChunk {
22 public:
23  FifoMemoryChunk(void* mem, size_t size)
24      : mem_(mem), size_(size) {}
25  virtual ~FifoMemoryChunk() {}
26
27  virtual void* data() const OVERRIDE { return mem_; }
28  virtual size_t size() const OVERRIDE { return size_; }
29  virtual bool valid() const OVERRIDE { return true; }
30
31 private:
32  void* mem_;
33  size_t size_;
34
35  DISALLOW_COPY_AND_ASSIGN(FifoMemoryChunk);
36};
37
38void MsgProducer(scoped_ptr<MediaMessageFifo> fifo,
39                 int msg_count,
40                 base::WaitableEvent* event) {
41
42  for (int k = 0; k < msg_count; k++) {
43    uint32 msg_type = 0x2 + (k % 5);
44    uint32 max_msg_content_size = k % 64;
45    do {
46      scoped_ptr<MediaMessage> msg1(
47          MediaMessage::CreateMessage(
48              msg_type,
49              base::Bind(&MediaMessageFifo::ReserveMemory,
50                         base::Unretained(fifo.get())),
51              max_msg_content_size));
52      if (msg1)
53        break;
54      base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));
55    } while(true);
56  }
57
58  fifo.reset();
59
60  event->Signal();
61}
62
63void MsgConsumer(scoped_ptr<MediaMessageFifo> fifo,
64                 int msg_count,
65                 base::WaitableEvent* event) {
66
67  int k = 0;
68  while (k < msg_count) {
69    uint32 msg_type = 0x2 + (k % 5);
70    do {
71      scoped_ptr<MediaMessage> msg2(fifo->Pop());
72      if (msg2) {
73        if (msg2->type() != PaddingMediaMsg) {
74          EXPECT_EQ(msg2->type(), msg_type);
75          k++;
76        }
77        break;
78      }
79      base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));
80    } while(true);
81  }
82
83  fifo.reset();
84
85  event->Signal();
86}
87
88void MsgProducerConsumer(
89    scoped_ptr<MediaMessageFifo> producer_fifo,
90    scoped_ptr<MediaMessageFifo> consumer_fifo,
91    base::WaitableEvent* event) {
92  for (int k = 0; k < 2048; k++) {
93    // Should have enough space to create a message.
94    uint32 msg_type = 0x2 + (k % 5);
95    uint32 max_msg_content_size = k % 64;
96    scoped_ptr<MediaMessage> msg1(
97        MediaMessage::CreateMessage(
98            msg_type,
99            base::Bind(&MediaMessageFifo::ReserveMemory,
100                       base::Unretained(producer_fifo.get())),
101            max_msg_content_size));
102    EXPECT_TRUE(msg1);
103
104    // Make sure the message is commited.
105    msg1.reset();
106
107    // At this point, we should have a message to read.
108    scoped_ptr<MediaMessage> msg2(consumer_fifo->Pop());
109    EXPECT_TRUE(msg2);
110  }
111
112  producer_fifo.reset();
113  consumer_fifo.reset();
114
115  event->Signal();
116}
117
118}  // namespace
119
120TEST(MediaMessageFifoTest, AlternateWriteRead) {
121  size_t buffer_size = 64 * 1024;
122  scoped_ptr<uint64[]> buffer(new uint64[buffer_size / sizeof(uint64)]);
123
124  scoped_ptr<base::Thread> thread(
125      new base::Thread("FeederConsumerThread"));
126  thread->Start();
127
128  scoped_ptr<MediaMessageFifo> producer_fifo(new MediaMessageFifo(
129      scoped_ptr<MediaMemoryChunk>(
130          new FifoMemoryChunk(&buffer[0], buffer_size)),
131      true));
132  scoped_ptr<MediaMessageFifo> consumer_fifo(new MediaMessageFifo(
133      scoped_ptr<MediaMemoryChunk>(
134          new FifoMemoryChunk(&buffer[0], buffer_size)),
135      false));
136
137  base::WaitableEvent event(false, false);
138  thread->message_loop_proxy()->PostTask(
139      FROM_HERE,
140      base::Bind(&MsgProducerConsumer,
141                 base::Passed(&producer_fifo),
142                 base::Passed(&consumer_fifo),
143                 &event));
144  event.Wait();
145
146  thread.reset();
147}
148
149TEST(MediaMessageFifoTest, MultiThreaded) {
150  size_t buffer_size = 64 * 1024;
151  scoped_ptr<uint64[]> buffer(new uint64[buffer_size / sizeof(uint64)]);
152
153  scoped_ptr<base::Thread> producer_thread(
154      new base::Thread("FeederThread"));
155  scoped_ptr<base::Thread> consumer_thread(
156      new base::Thread("ConsumerThread"));
157  producer_thread->Start();
158  consumer_thread->Start();
159
160  scoped_ptr<MediaMessageFifo> producer_fifo(new MediaMessageFifo(
161      scoped_ptr<MediaMemoryChunk>(
162          new FifoMemoryChunk(&buffer[0], buffer_size)),
163      true));
164  scoped_ptr<MediaMessageFifo> consumer_fifo(new MediaMessageFifo(
165      scoped_ptr<MediaMemoryChunk>(
166          new FifoMemoryChunk(&buffer[0], buffer_size)),
167      false));
168
169  base::WaitableEvent producer_event_done(false, false);
170  base::WaitableEvent consumer_event_done(false, false);
171
172  const int msg_count = 2048;
173  producer_thread->message_loop_proxy()->PostTask(
174      FROM_HERE,
175      base::Bind(&MsgProducer,
176                 base::Passed(&producer_fifo),
177                 msg_count,
178                 &producer_event_done));
179  consumer_thread->message_loop_proxy()->PostTask(
180      FROM_HERE,
181      base::Bind(&MsgConsumer,
182                 base::Passed(&consumer_fifo),
183                 msg_count,
184                 &consumer_event_done));
185
186  producer_event_done.Wait();
187  consumer_event_done.Wait();
188
189  producer_thread.reset();
190  consumer_thread.reset();
191}
192
193}  // namespace media
194}  // namespace chromecast
195
196