media_message_fifo.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chromecast/media/cma/ipc/media_message_fifo.h"
6
7#include "base/atomicops.h"
8#include "base/bind.h"
9#include "base/location.h"
10#include "base/logging.h"
11#include "base/message_loop/message_loop_proxy.h"
12#include "chromecast/media/cma/base/cma_logging.h"
13#include "chromecast/media/cma/ipc/media_memory_chunk.h"
14#include "chromecast/media/cma/ipc/media_message.h"
15#include "chromecast/media/cma/ipc/media_message_type.h"
16
17namespace chromecast {
18namespace media {
19
20class MediaMessageFlag
21    : public base::RefCountedThreadSafe<MediaMessageFlag> {
22 public:
23  // |offset| is the offset in the fifo of the media message.
24  explicit MediaMessageFlag(size_t offset);
25
26  bool IsValid() const;
27
28  void Invalidate();
29
30  size_t offset() const { return offset_; }
31
32 private:
33  friend class base::RefCountedThreadSafe<MediaMessageFlag>;
34  virtual ~MediaMessageFlag();
35
36  const size_t offset_;
37  bool flag_;
38
39  DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
40};
41
42MediaMessageFlag::MediaMessageFlag(size_t offset)
43  : offset_(offset),
44    flag_(true) {
45}
46
47MediaMessageFlag::~MediaMessageFlag() {
48}
49
50bool MediaMessageFlag::IsValid() const {
51  return flag_;
52}
53
54void MediaMessageFlag::Invalidate() {
55  flag_ = false;
56}
57
58class FifoOwnedMemory : public MediaMemoryChunk {
59 public:
60  FifoOwnedMemory(void* data, size_t size,
61                  const scoped_refptr<MediaMessageFlag>& flag,
62                  const base::Closure& release_msg_cb);
63  virtual ~FifoOwnedMemory();
64
65  // MediaMemoryChunk implementation.
66  virtual void* data() const OVERRIDE { return data_; }
67  virtual size_t size() const OVERRIDE { return size_; }
68  virtual bool valid() const OVERRIDE { return flag_->IsValid(); }
69
70 private:
71  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
72  base::Closure release_msg_cb_;
73
74  void* const data_;
75  const size_t size_;
76  scoped_refptr<MediaMessageFlag> flag_;
77
78  DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
79};
80
81FifoOwnedMemory::FifoOwnedMemory(
82    void* data, size_t size,
83    const scoped_refptr<MediaMessageFlag>& flag,
84    const base::Closure& release_msg_cb)
85  : task_runner_(base::MessageLoopProxy::current()),
86    release_msg_cb_(release_msg_cb),
87    data_(data),
88    size_(size),
89    flag_(flag) {
90}
91
92FifoOwnedMemory::~FifoOwnedMemory() {
93  // Release the flag before notifying that the message has been released.
94  flag_ = scoped_refptr<MediaMessageFlag>();
95  if (!release_msg_cb_.is_null()) {
96    if (task_runner_->BelongsToCurrentThread()) {
97      release_msg_cb_.Run();
98    } else {
99      task_runner_->PostTask(FROM_HERE, release_msg_cb_);
100    }
101  }
102}
103
104MediaMessageFifo::MediaMessageFifo(
105    scoped_ptr<MediaMemoryChunk> mem, bool init)
106  : mem_(mem.Pass()),
107    weak_factory_(this) {
108  CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
109           0u);
110  CHECK_GE(mem_->size(), sizeof(Descriptor));
111  Descriptor* desc = static_cast<Descriptor*>(mem_->data());
112  base_ = static_cast<void*>(&desc->first_item);
113
114  // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
115  // Currently, the sign differs.
116  rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
117  wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
118
119  size_t max_size = mem_->size() -
120      (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
121  if (init) {
122    size_ = max_size;
123    desc->size = size_;
124    internal_rd_offset_ = 0;
125    internal_wr_offset_ = 0;
126    base::subtle::Acquire_Store(rd_offset_, 0);
127    base::subtle::Acquire_Store(wr_offset_, 0);
128  } else {
129    size_ = desc->size;
130    CHECK_LE(size_, max_size);
131    internal_rd_offset_ = current_rd_offset();
132    internal_wr_offset_ = current_wr_offset();
133  }
134  CMALOG(kLogControl)
135      << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
136  CHECK_GT(size_, 0) << size_;
137
138  weak_this_ = weak_factory_.GetWeakPtr();
139  thread_checker_.DetachFromThread();
140}
141
142MediaMessageFifo::~MediaMessageFifo() {
143  DCHECK(thread_checker_.CalledOnValidThread());
144}
145
146void MediaMessageFifo::ObserveReadActivity(
147    const base::Closure& read_event_cb) {
148  read_event_cb_ = read_event_cb;
149}
150
151void MediaMessageFifo::ObserveWriteActivity(
152    const base::Closure& write_event_cb) {
153  write_event_cb_ = write_event_cb;
154}
155
156scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
157    size_t size_to_reserve) {
158  DCHECK(thread_checker_.CalledOnValidThread());
159
160  // Capture first both the read and write offsets.
161  // and exit right away if not enough free space.
162  size_t wr_offset = internal_wr_offset();
163  size_t rd_offset = current_rd_offset();
164  size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
165  size_t free_size = size_ - 1 - allocated_size;
166  if (free_size < size_to_reserve)
167    return scoped_ptr<MediaMemoryChunk>();
168  CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
169
170  // Note: in the next 2 conditions, we have:
171  // trailing_byte_count < size_to_reserve
172  // and since at this stage: size_to_reserve <= free_size
173  // we also have trailing_byte_count <= free_size
174  // which means that all the trailing bytes are free space in the fifo.
175  size_t trailing_byte_count = size_ - wr_offset;
176  if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
177    // If there is no space to even write the smallest message,
178    // skip the trailing bytes and come back to the beginning of the fifo.
179    // (no way to insert a padding message).
180    if (free_size < trailing_byte_count)
181      return scoped_ptr<MediaMemoryChunk>();
182    wr_offset = 0;
183    CommitInternalWrite(wr_offset);
184
185  } else if (trailing_byte_count < size_to_reserve) {
186    // At this point, we know we have at least the space to write a message.
187    // However, to avoid splitting a message, a padding message is needed.
188    scoped_ptr<MediaMemoryChunk> mem(
189        ReserveMemoryNoCheck(trailing_byte_count));
190    scoped_ptr<MediaMessage> padding_message(
191        MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
192  }
193
194  // Recalculate the free size and exit if not enough free space.
195  wr_offset = internal_wr_offset();
196  allocated_size = (size_ + wr_offset - rd_offset) % size_;
197  free_size = size_ - 1 - allocated_size;
198  if (free_size < size_to_reserve)
199    return scoped_ptr<MediaMemoryChunk>();
200
201  return ReserveMemoryNoCheck(size_to_reserve);
202}
203
204scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
205  DCHECK(thread_checker_.CalledOnValidThread());
206
207  // Capture the read and write offsets.
208  size_t rd_offset = internal_rd_offset();
209  size_t wr_offset = current_wr_offset();
210  size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
211
212  if (allocated_size < MediaMessage::minimum_msg_size())
213    return scoped_ptr<MediaMessage>();
214
215  size_t trailing_byte_count = size_ - rd_offset;
216  if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
217    // If there is no space to even have the smallest message,
218    // skip the trailing bytes and come back to the beginning of the fifo.
219    // Note: all the trailing bytes correspond to allocated bytes since:
220    // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
221    rd_offset = 0;
222    allocated_size -= trailing_byte_count;
223    trailing_byte_count = size_;
224    CommitInternalRead(rd_offset);
225  }
226
227  // The message should not be longer than the allocated size
228  // but since a message is a contiguous area of memory, it should also be
229  // smaller than |trailing_byte_count|.
230  size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
231  if (max_msg_size < MediaMessage::minimum_msg_size())
232    return scoped_ptr<MediaMessage>();
233  void* msg_src = static_cast<uint8*>(base_) + rd_offset;
234
235  // Create a flag to protect the serialized structure of the message
236  // from being overwritten.
237  // The serialized structure starts at offset |rd_offset|.
238  scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
239  rd_flags_.push_back(rd_flag);
240  scoped_ptr<MediaMemoryChunk> mem(
241      new FifoOwnedMemory(
242          msg_src, max_msg_size, rd_flag,
243          base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
244
245  // Create the message which wraps its the serialized structure.
246  scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
247  CHECK(message);
248
249  // Update the internal read pointer.
250  rd_offset = (rd_offset + message->size()) % size_;
251  CommitInternalRead(rd_offset);
252
253  return message.Pass();
254}
255
256void MediaMessageFifo::Flush() {
257  DCHECK(thread_checker_.CalledOnValidThread());
258
259  size_t wr_offset = current_wr_offset();
260
261  // Invalidate every memory region before flushing.
262  while (!rd_flags_.empty()) {
263    CMALOG(kLogControl) << "Invalidate flag";
264    rd_flags_.front()->Invalidate();
265    rd_flags_.pop_front();
266  }
267
268  // Flush by setting the read pointer to the value of the write pointer.
269  // Update first the internal read pointer then the public one.
270  CommitInternalRead(wr_offset);
271  CommitRead(wr_offset);
272}
273
274scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
275    size_t size_to_reserve) {
276  size_t wr_offset = internal_wr_offset();
277
278  // Memory block corresponding to the serialized structure of the message.
279  void* msg_start = static_cast<uint8*>(base_) + wr_offset;
280  scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
281  wr_flags_.push_back(wr_flag);
282  scoped_ptr<MediaMemoryChunk> mem(
283      new FifoOwnedMemory(
284          msg_start, size_to_reserve, wr_flag,
285          base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
286
287  // Update the internal write pointer.
288  wr_offset = (wr_offset + size_to_reserve) % size_;
289  CommitInternalWrite(wr_offset);
290
291  return mem.Pass();
292}
293
294void MediaMessageFifo::OnWrMemoryReleased() {
295  DCHECK(thread_checker_.CalledOnValidThread());
296
297  if (wr_flags_.empty()) {
298    // Sanity check: when there is no protected memory area,
299    // the external write offset has no reason to be different from
300    // the internal write offset.
301    DCHECK_EQ(current_wr_offset(), internal_wr_offset());
302    return;
303  }
304
305  // Update the external write offset.
306  while (!wr_flags_.empty() &&
307         (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
308    // TODO(damienv): Could add a sanity check to make sure the offset is
309    // between the external write offset and the read offset (not included).
310    wr_flags_.pop_front();
311  }
312
313  // Update the read offset to the first locked memory area
314  // or to the internal read pointer if nothing prevents it.
315  size_t external_wr_offset = internal_wr_offset();
316  if (!wr_flags_.empty())
317    external_wr_offset = wr_flags_.front()->offset();
318  CommitWrite(external_wr_offset);
319}
320
321void MediaMessageFifo::OnRdMemoryReleased() {
322  DCHECK(thread_checker_.CalledOnValidThread());
323
324  if (rd_flags_.empty()) {
325    // Sanity check: when there is no protected memory area,
326    // the external read offset has no reason to be different from
327    // the internal read offset.
328    DCHECK_EQ(current_rd_offset(), internal_rd_offset());
329    return;
330  }
331
332  // Update the external read offset.
333  while (!rd_flags_.empty() &&
334         (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
335    // TODO(damienv): Could add a sanity check to make sure the offset is
336    // between the external read offset and the write offset.
337    rd_flags_.pop_front();
338  }
339
340  // Update the read offset to the first locked memory area
341  // or to the internal read pointer if nothing prevents it.
342  size_t external_rd_offset = internal_rd_offset();
343  if (!rd_flags_.empty())
344    external_rd_offset = rd_flags_.front()->offset();
345  CommitRead(external_rd_offset);
346}
347
348size_t MediaMessageFifo::current_rd_offset() const {
349  DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
350  size_t rd_offset = base::subtle::Acquire_Load(rd_offset_);
351  CHECK_LT(rd_offset, size_);
352  return rd_offset;
353}
354
355size_t MediaMessageFifo::current_wr_offset() const {
356  DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
357
358  // When the fifo consumer acquires the write offset,
359  // we have to make sure that any possible following reads are actually
360  // returning results at least inline with the memory snapshot taken
361  // when the write offset was sampled.
362  // That's why an Acquire_Load is used here.
363  size_t wr_offset = base::subtle::Acquire_Load(wr_offset_);
364  CHECK_LT(wr_offset, size_);
365  return wr_offset;
366}
367
368void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
369  // Add a memory fence to ensure the message content is completely read
370  // before updating the read offset.
371  base::subtle::Release_Store(rd_offset_, new_rd_offset);
372
373  // Make sure the read pointer has been updated before sending a notification.
374  if (!read_event_cb_.is_null()) {
375    base::subtle::MemoryBarrier();
376    read_event_cb_.Run();
377  }
378}
379
380void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
381  // Add a memory fence to ensure the message content is written
382  // before updating the write offset.
383  base::subtle::Release_Store(wr_offset_, new_wr_offset);
384
385  // Make sure the write pointer has been updated before sending a notification.
386  if (!write_event_cb_.is_null()) {
387    base::subtle::MemoryBarrier();
388    write_event_cb_.Run();
389  }
390}
391
392void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
393  internal_rd_offset_ = new_rd_offset;
394}
395
396void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
397  internal_wr_offset_ = new_wr_offset;
398}
399
400}  // namespace media
401}  // namespace chromecast
402