1#include "consumer_queue_channel.h" 2 3#include <pdx/channel_handle.h> 4 5#include "producer_channel.h" 6 7using android::pdx::ErrorStatus; 8using android::pdx::RemoteChannelHandle; 9using android::pdx::Status; 10using android::pdx::rpc::DispatchRemoteMethod; 11using android::pdx::rpc::RemoteMethodError; 12 13namespace android { 14namespace dvr { 15 16ConsumerQueueChannel::ConsumerQueueChannel( 17 BufferHubService* service, int buffer_id, int channel_id, 18 const std::shared_ptr<Channel>& producer) 19 : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType), 20 producer_(producer), 21 capacity_(0) { 22 GetProducer()->AddConsumer(this); 23} 24 25ConsumerQueueChannel::~ConsumerQueueChannel() { 26 ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d", 27 channel_id()); 28 29 if (auto producer = GetProducer()) { 30 producer->RemoveConsumer(this); 31 } 32} 33 34bool ConsumerQueueChannel::HandleMessage(Message& message) { 35 ATRACE_NAME("ConsumerQueueChannel::HandleMessage"); 36 auto producer = GetProducer(); 37 if (!producer) { 38 RemoteMethodError(message, EPIPE); 39 return true; 40 } 41 42 switch (message.GetOp()) { 43 case BufferHubRPC::CreateConsumerQueue::Opcode: 44 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( 45 *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message); 46 return true; 47 48 case BufferHubRPC::GetQueueInfo::Opcode: 49 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( 50 *producer, &ProducerQueueChannel::OnGetQueueInfo, message); 51 return true; 52 53 case BufferHubRPC::ConsumerQueueImportBuffers::Opcode: 54 DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>( 55 *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message); 56 return true; 57 58 default: 59 return false; 60 } 61} 62 63std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer() 64 const { 65 return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock()); 66} 67 68void ConsumerQueueChannel::HandleImpulse(Message& /* message */) { 69 ATRACE_NAME("ConsumerQueueChannel::HandleImpulse"); 70} 71 72BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { 73 BufferHubChannel::BufferInfo info; 74 if (auto producer = GetProducer()) { 75 // If producer has not hung up, copy most buffer info from the producer. 76 info = producer->GetBufferInfo(); 77 } 78 info.id = buffer_id(); 79 info.capacity = capacity_; 80 return info; 81} 82 83void ConsumerQueueChannel::RegisterNewBuffer( 84 const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) { 85 ALOGD_IF(TRACE, 86 "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu", 87 producer_channel->buffer_id(), slot); 88 pending_buffer_slots_.emplace(producer_channel, slot); 89 90 // Signal the client that there is new buffer available throught POLLIN. 91 SignalAvailable(); 92} 93 94Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> 95ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { 96 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; 97 ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers"); 98 ALOGD_IF( 99 TRACE, 100 "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to " 101 "import: %zu", 102 pending_buffer_slots_.size()); 103 104 while (!pending_buffer_slots_.empty()) { 105 auto producer_channel = pending_buffer_slots_.front().first.lock(); 106 size_t producer_slot = pending_buffer_slots_.front().second; 107 pending_buffer_slots_.pop(); 108 109 // It's possible that the producer channel has expired. When this occurs, 110 // ignore the producer channel. 111 if (producer_channel == nullptr) { 112 ALOGW( 113 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer " 114 "channel has already been expired."); 115 continue; 116 } 117 118 auto status = producer_channel->CreateConsumer(message); 119 120 // If no buffers are imported successfully, clear available and return an 121 // error. Otherwise, return all consumer handles already imported 122 // successfully, but keep available bits on, so that the client can retry 123 // importing remaining consumer buffers. 124 if (!status) { 125 ALOGE( 126 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create " 127 "consumer: %s", 128 status.GetErrorMessage().c_str()); 129 if (buffer_handles.empty()) { 130 ClearAvailable(); 131 return status.error_status(); 132 } else { 133 return {std::move(buffer_handles)}; 134 } 135 } 136 137 buffer_handles.emplace_back(status.take(), producer_slot); 138 } 139 140 ClearAvailable(); 141 return {std::move(buffer_handles)}; 142} 143 144void ConsumerQueueChannel::OnProducerClosed() { 145 ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d", 146 buffer_id()); 147 producer_.reset(); 148 Hangup(); 149} 150 151} // namespace dvr 152} // namespace android 153