1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chromecast/media/cma/ipc/media_message_fifo.h" 6 7#include "base/atomicops.h" 8#include "base/bind.h" 9#include "base/location.h" 10#include "base/logging.h" 11#include "base/message_loop/message_loop_proxy.h" 12#include "chromecast/media/cma/base/cma_logging.h" 13#include "chromecast/media/cma/ipc/media_memory_chunk.h" 14#include "chromecast/media/cma/ipc/media_message.h" 15#include "chromecast/media/cma/ipc/media_message_type.h" 16 17namespace chromecast { 18namespace media { 19 20class MediaMessageFlag 21 : public base::RefCountedThreadSafe<MediaMessageFlag> { 22 public: 23 // |offset| is the offset in the fifo of the media message. 24 explicit MediaMessageFlag(size_t offset); 25 26 bool IsValid() const; 27 28 void Invalidate(); 29 30 size_t offset() const { return offset_; } 31 32 private: 33 friend class base::RefCountedThreadSafe<MediaMessageFlag>; 34 virtual ~MediaMessageFlag(); 35 36 const size_t offset_; 37 bool flag_; 38 39 DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag); 40}; 41 42MediaMessageFlag::MediaMessageFlag(size_t offset) 43 : offset_(offset), 44 flag_(true) { 45} 46 47MediaMessageFlag::~MediaMessageFlag() { 48} 49 50bool MediaMessageFlag::IsValid() const { 51 return flag_; 52} 53 54void MediaMessageFlag::Invalidate() { 55 flag_ = false; 56} 57 58class FifoOwnedMemory : public MediaMemoryChunk { 59 public: 60 FifoOwnedMemory(void* data, size_t size, 61 const scoped_refptr<MediaMessageFlag>& flag, 62 const base::Closure& release_msg_cb); 63 virtual ~FifoOwnedMemory(); 64 65 // MediaMemoryChunk implementation. 66 virtual void* data() const OVERRIDE { return data_; } 67 virtual size_t size() const OVERRIDE { return size_; } 68 virtual bool valid() const OVERRIDE { return flag_->IsValid(); } 69 70 private: 71 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 72 base::Closure release_msg_cb_; 73 74 void* const data_; 75 const size_t size_; 76 scoped_refptr<MediaMessageFlag> flag_; 77 78 DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory); 79}; 80 81FifoOwnedMemory::FifoOwnedMemory( 82 void* data, size_t size, 83 const scoped_refptr<MediaMessageFlag>& flag, 84 const base::Closure& release_msg_cb) 85 : task_runner_(base::MessageLoopProxy::current()), 86 release_msg_cb_(release_msg_cb), 87 data_(data), 88 size_(size), 89 flag_(flag) { 90} 91 92FifoOwnedMemory::~FifoOwnedMemory() { 93 // Release the flag before notifying that the message has been released. 94 flag_ = scoped_refptr<MediaMessageFlag>(); 95 if (!release_msg_cb_.is_null()) { 96 if (task_runner_->BelongsToCurrentThread()) { 97 release_msg_cb_.Run(); 98 } else { 99 task_runner_->PostTask(FROM_HERE, release_msg_cb_); 100 } 101 } 102} 103 104MediaMessageFifo::MediaMessageFifo( 105 scoped_ptr<MediaMemoryChunk> mem, bool init) 106 : mem_(mem.Pass()), 107 weak_factory_(this) { 108 CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor), 109 0u); 110 CHECK_GE(mem_->size(), sizeof(Descriptor)); 111 Descriptor* desc = static_cast<Descriptor*>(mem_->data()); 112 base_ = static_cast<void*>(&desc->first_item); 113 114 // TODO(damienv): remove cast when atomic size_t is defined in Chrome. 115 // Currently, the sign differs. 116 rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset)); 117 wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset)); 118 119 size_t max_size = mem_->size() - 120 (static_cast<char*>(base_) - static_cast<char*>(mem_->data())); 121 if (init) { 122 size_ = max_size; 123 desc->size = size_; 124 internal_rd_offset_ = 0; 125 internal_wr_offset_ = 0; 126 base::subtle::Acquire_Store(rd_offset_, 0); 127 base::subtle::Acquire_Store(wr_offset_, 0); 128 } else { 129 size_ = desc->size; 130 CHECK_LE(size_, max_size); 131 internal_rd_offset_ = current_rd_offset(); 132 internal_wr_offset_ = current_wr_offset(); 133 } 134 CMALOG(kLogControl) 135 << "MediaMessageFifo:" << " init=" << init << " size=" << size_; 136 CHECK_GT(size_, 0) << size_; 137 138 weak_this_ = weak_factory_.GetWeakPtr(); 139 thread_checker_.DetachFromThread(); 140} 141 142MediaMessageFifo::~MediaMessageFifo() { 143 DCHECK(thread_checker_.CalledOnValidThread()); 144} 145 146void MediaMessageFifo::ObserveReadActivity( 147 const base::Closure& read_event_cb) { 148 read_event_cb_ = read_event_cb; 149} 150 151void MediaMessageFifo::ObserveWriteActivity( 152 const base::Closure& write_event_cb) { 153 write_event_cb_ = write_event_cb; 154} 155 156scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory( 157 size_t size_to_reserve) { 158 DCHECK(thread_checker_.CalledOnValidThread()); 159 160 // Capture first both the read and write offsets. 161 // and exit right away if not enough free space. 162 size_t wr_offset = internal_wr_offset(); 163 size_t rd_offset = current_rd_offset(); 164 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_; 165 size_t free_size = size_ - 1 - allocated_size; 166 if (free_size < size_to_reserve) 167 return scoped_ptr<MediaMemoryChunk>(); 168 CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve); 169 170 // Note: in the next 2 conditions, we have: 171 // trailing_byte_count < size_to_reserve 172 // and since at this stage: size_to_reserve <= free_size 173 // we also have trailing_byte_count <= free_size 174 // which means that all the trailing bytes are free space in the fifo. 175 size_t trailing_byte_count = size_ - wr_offset; 176 if (trailing_byte_count < MediaMessage::minimum_msg_size()) { 177 // If there is no space to even write the smallest message, 178 // skip the trailing bytes and come back to the beginning of the fifo. 179 // (no way to insert a padding message). 180 if (free_size < trailing_byte_count) 181 return scoped_ptr<MediaMemoryChunk>(); 182 wr_offset = 0; 183 CommitInternalWrite(wr_offset); 184 185 } else if (trailing_byte_count < size_to_reserve) { 186 // At this point, we know we have at least the space to write a message. 187 // However, to avoid splitting a message, a padding message is needed. 188 scoped_ptr<MediaMemoryChunk> mem( 189 ReserveMemoryNoCheck(trailing_byte_count)); 190 scoped_ptr<MediaMessage> padding_message( 191 MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass())); 192 } 193 194 // Recalculate the free size and exit if not enough free space. 195 wr_offset = internal_wr_offset(); 196 allocated_size = (size_ + wr_offset - rd_offset) % size_; 197 free_size = size_ - 1 - allocated_size; 198 if (free_size < size_to_reserve) 199 return scoped_ptr<MediaMemoryChunk>(); 200 201 return ReserveMemoryNoCheck(size_to_reserve); 202} 203 204scoped_ptr<MediaMessage> MediaMessageFifo::Pop() { 205 DCHECK(thread_checker_.CalledOnValidThread()); 206 207 // Capture the read and write offsets. 208 size_t rd_offset = internal_rd_offset(); 209 size_t wr_offset = current_wr_offset(); 210 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_; 211 212 if (allocated_size < MediaMessage::minimum_msg_size()) 213 return scoped_ptr<MediaMessage>(); 214 215 size_t trailing_byte_count = size_ - rd_offset; 216 if (trailing_byte_count < MediaMessage::minimum_msg_size()) { 217 // If there is no space to even have the smallest message, 218 // skip the trailing bytes and come back to the beginning of the fifo. 219 // Note: all the trailing bytes correspond to allocated bytes since: 220 // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size 221 rd_offset = 0; 222 allocated_size -= trailing_byte_count; 223 trailing_byte_count = size_; 224 CommitInternalRead(rd_offset); 225 } 226 227 // The message should not be longer than the allocated size 228 // but since a message is a contiguous area of memory, it should also be 229 // smaller than |trailing_byte_count|. 230 size_t max_msg_size = std::min(allocated_size, trailing_byte_count); 231 if (max_msg_size < MediaMessage::minimum_msg_size()) 232 return scoped_ptr<MediaMessage>(); 233 void* msg_src = static_cast<uint8*>(base_) + rd_offset; 234 235 // Create a flag to protect the serialized structure of the message 236 // from being overwritten. 237 // The serialized structure starts at offset |rd_offset|. 238 scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset)); 239 rd_flags_.push_back(rd_flag); 240 scoped_ptr<MediaMemoryChunk> mem( 241 new FifoOwnedMemory( 242 msg_src, max_msg_size, rd_flag, 243 base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_))); 244 245 // Create the message which wraps its the serialized structure. 246 scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass())); 247 CHECK(message); 248 249 // Update the internal read pointer. 250 rd_offset = (rd_offset + message->size()) % size_; 251 CommitInternalRead(rd_offset); 252 253 return message.Pass(); 254} 255 256void MediaMessageFifo::Flush() { 257 DCHECK(thread_checker_.CalledOnValidThread()); 258 259 size_t wr_offset = current_wr_offset(); 260 261 // Invalidate every memory region before flushing. 262 while (!rd_flags_.empty()) { 263 CMALOG(kLogControl) << "Invalidate flag"; 264 rd_flags_.front()->Invalidate(); 265 rd_flags_.pop_front(); 266 } 267 268 // Flush by setting the read pointer to the value of the write pointer. 269 // Update first the internal read pointer then the public one. 270 CommitInternalRead(wr_offset); 271 CommitRead(wr_offset); 272} 273 274scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck( 275 size_t size_to_reserve) { 276 size_t wr_offset = internal_wr_offset(); 277 278 // Memory block corresponding to the serialized structure of the message. 279 void* msg_start = static_cast<uint8*>(base_) + wr_offset; 280 scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset)); 281 wr_flags_.push_back(wr_flag); 282 scoped_ptr<MediaMemoryChunk> mem( 283 new FifoOwnedMemory( 284 msg_start, size_to_reserve, wr_flag, 285 base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_))); 286 287 // Update the internal write pointer. 288 wr_offset = (wr_offset + size_to_reserve) % size_; 289 CommitInternalWrite(wr_offset); 290 291 return mem.Pass(); 292} 293 294void MediaMessageFifo::OnWrMemoryReleased() { 295 DCHECK(thread_checker_.CalledOnValidThread()); 296 297 if (wr_flags_.empty()) { 298 // Sanity check: when there is no protected memory area, 299 // the external write offset has no reason to be different from 300 // the internal write offset. 301 DCHECK_EQ(current_wr_offset(), internal_wr_offset()); 302 return; 303 } 304 305 // Update the external write offset. 306 while (!wr_flags_.empty() && 307 (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) { 308 // TODO(damienv): Could add a sanity check to make sure the offset is 309 // between the external write offset and the read offset (not included). 310 wr_flags_.pop_front(); 311 } 312 313 // Update the read offset to the first locked memory area 314 // or to the internal read pointer if nothing prevents it. 315 size_t external_wr_offset = internal_wr_offset(); 316 if (!wr_flags_.empty()) 317 external_wr_offset = wr_flags_.front()->offset(); 318 CommitWrite(external_wr_offset); 319} 320 321void MediaMessageFifo::OnRdMemoryReleased() { 322 DCHECK(thread_checker_.CalledOnValidThread()); 323 324 if (rd_flags_.empty()) { 325 // Sanity check: when there is no protected memory area, 326 // the external read offset has no reason to be different from 327 // the internal read offset. 328 DCHECK_EQ(current_rd_offset(), internal_rd_offset()); 329 return; 330 } 331 332 // Update the external read offset. 333 while (!rd_flags_.empty() && 334 (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) { 335 // TODO(damienv): Could add a sanity check to make sure the offset is 336 // between the external read offset and the write offset. 337 rd_flags_.pop_front(); 338 } 339 340 // Update the read offset to the first locked memory area 341 // or to the internal read pointer if nothing prevents it. 342 size_t external_rd_offset = internal_rd_offset(); 343 if (!rd_flags_.empty()) 344 external_rd_offset = rd_flags_.front()->offset(); 345 CommitRead(external_rd_offset); 346} 347 348size_t MediaMessageFifo::current_rd_offset() const { 349 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize)); 350 size_t rd_offset = base::subtle::Acquire_Load(rd_offset_); 351 CHECK_LT(rd_offset, size_); 352 return rd_offset; 353} 354 355size_t MediaMessageFifo::current_wr_offset() const { 356 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize)); 357 358 // When the fifo consumer acquires the write offset, 359 // we have to make sure that any possible following reads are actually 360 // returning results at least inline with the memory snapshot taken 361 // when the write offset was sampled. 362 // That's why an Acquire_Load is used here. 363 size_t wr_offset = base::subtle::Acquire_Load(wr_offset_); 364 CHECK_LT(wr_offset, size_); 365 return wr_offset; 366} 367 368void MediaMessageFifo::CommitRead(size_t new_rd_offset) { 369 // Add a memory fence to ensure the message content is completely read 370 // before updating the read offset. 371 base::subtle::Release_Store(rd_offset_, new_rd_offset); 372 373 // Make sure the read pointer has been updated before sending a notification. 374 if (!read_event_cb_.is_null()) { 375 base::subtle::MemoryBarrier(); 376 read_event_cb_.Run(); 377 } 378} 379 380void MediaMessageFifo::CommitWrite(size_t new_wr_offset) { 381 // Add a memory fence to ensure the message content is written 382 // before updating the write offset. 383 base::subtle::Release_Store(wr_offset_, new_wr_offset); 384 385 // Make sure the write pointer has been updated before sending a notification. 386 if (!write_event_cb_.is_null()) { 387 base::subtle::MemoryBarrier(); 388 write_event_cb_.Run(); 389 } 390} 391 392void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) { 393 internal_rd_offset_ = new_rd_offset; 394} 395 396void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) { 397 internal_wr_offset_ = new_wr_offset; 398} 399 400} // namespace media 401} // namespace chromecast 402