1#include "consumer_queue_channel.h" 2 3#include <pdx/channel_handle.h> 4 5#include "producer_channel.h" 6 7using android::pdx::ErrorStatus; 8using android::pdx::RemoteChannelHandle; 9using android::pdx::Status; 10using android::pdx::rpc::DispatchRemoteMethod; 11using android::pdx::rpc::RemoteMethodError; 12 13namespace android { 14namespace dvr { 15 16ConsumerQueueChannel::ConsumerQueueChannel( 17 BufferHubService* service, int buffer_id, int channel_id, 18 const std::shared_ptr<Channel>& producer, bool silent) 19 : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType), 20 producer_(producer), 21 capacity_(0), 22 silent_(silent) { 23 GetProducer()->AddConsumer(this); 24} 25 26ConsumerQueueChannel::~ConsumerQueueChannel() { 27 ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d", 28 channel_id()); 29 30 if (auto producer = GetProducer()) { 31 producer->RemoveConsumer(this); 32 } 33} 34 35bool ConsumerQueueChannel::HandleMessage(Message& message) { 36 ATRACE_NAME("ConsumerQueueChannel::HandleMessage"); 37 auto producer = GetProducer(); 38 if (!producer) { 39 RemoteMethodError(message, EPIPE); 40 return true; 41 } 42 43 switch (message.GetOp()) { 44 case BufferHubRPC::CreateConsumerQueue::Opcode: 45 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( 46 *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message); 47 return true; 48 49 case BufferHubRPC::GetQueueInfo::Opcode: 50 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( 51 *producer, &ProducerQueueChannel::OnGetQueueInfo, message); 52 return true; 53 54 case BufferHubRPC::ConsumerQueueImportBuffers::Opcode: 55 DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>( 56 *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message); 57 return true; 58 59 default: 60 return false; 61 } 62} 63 64std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer() 65 const { 66 return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock()); 67} 68 69void ConsumerQueueChannel::HandleImpulse(Message& /* message */) { 70 ATRACE_NAME("ConsumerQueueChannel::HandleImpulse"); 71} 72 73BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { 74 BufferHubChannel::BufferInfo info; 75 if (auto producer = GetProducer()) { 76 // If producer has not hung up, copy most buffer info from the producer. 77 info = producer->GetBufferInfo(); 78 } 79 info.id = buffer_id(); 80 info.capacity = capacity_; 81 return info; 82} 83 84void ConsumerQueueChannel::RegisterNewBuffer( 85 const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) { 86 ALOGD_IF(TRACE, 87 "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d " 88 "slot=%zu silent=%d", 89 buffer_id(), producer_channel->buffer_id(), slot, silent_); 90 // Only register buffers if the queue is not silent. 91 if (!silent_) { 92 pending_buffer_slots_.emplace(producer_channel, slot); 93 94 // Signal the client that there is new buffer available. 95 SignalAvailable(); 96 } 97} 98 99Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> 100ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { 101 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; 102 ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers"); 103 ALOGD_IF(TRACE, 104 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: " 105 "pending_buffer_slots=%zu", 106 pending_buffer_slots_.size()); 107 108 // Indicate this is a silent queue that will not import buffers. 109 if (silent_) 110 return ErrorStatus(EBADR); 111 112 while (!pending_buffer_slots_.empty()) { 113 auto producer_channel = pending_buffer_slots_.front().first.lock(); 114 size_t producer_slot = pending_buffer_slots_.front().second; 115 pending_buffer_slots_.pop(); 116 117 // It's possible that the producer channel has expired. When this occurs, 118 // ignore the producer channel. 119 if (producer_channel == nullptr) { 120 ALOGW( 121 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer " 122 "channel has already been expired."); 123 continue; 124 } 125 126 auto status = producer_channel->CreateConsumer(message); 127 128 // If no buffers are imported successfully, clear available and return an 129 // error. Otherwise, return all consumer handles already imported 130 // successfully, but keep available bits on, so that the client can retry 131 // importing remaining consumer buffers. 132 if (!status) { 133 ALOGE( 134 "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create " 135 "consumer: %s", 136 status.GetErrorMessage().c_str()); 137 if (buffer_handles.empty()) { 138 ClearAvailable(); 139 return status.error_status(); 140 } else { 141 return {std::move(buffer_handles)}; 142 } 143 } 144 145 buffer_handles.emplace_back(status.take(), producer_slot); 146 } 147 148 ClearAvailable(); 149 return {std::move(buffer_handles)}; 150} 151 152void ConsumerQueueChannel::OnProducerClosed() { 153 ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d", 154 buffer_id()); 155 producer_.reset(); 156 Hangup(); 157} 158 159} // namespace dvr 160} // namespace android 161