consumer_channel.cpp revision d53870c58cbb3671c2efdae3cc53850b962aa9dc
1#include "consumer_channel.h" 2 3#include <log/log.h> 4#include <utils/Trace.h> 5 6#include <thread> 7 8#include <private/dvr/bufferhub_rpc.h> 9#include "producer_channel.h" 10 11using android::pdx::BorrowedHandle; 12using android::pdx::Channel; 13using android::pdx::ErrorStatus; 14using android::pdx::Message; 15using android::pdx::Status; 16using android::pdx::rpc::DispatchRemoteMethod; 17 18namespace android { 19namespace dvr { 20 21ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id, 22 int channel_id, 23 const std::shared_ptr<Channel> producer) 24 : BufferHubChannel(service, buffer_id, channel_id, kConsumerType), 25 producer_(producer) { 26 GetProducer()->AddConsumer(this); 27} 28 29ConsumerChannel::~ConsumerChannel() { 30 ALOGD_IF(TRACE, 31 "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d", 32 channel_id(), buffer_id()); 33 34 if (auto producer = GetProducer()) { 35 if (!released_) // Producer is waiting for our Release. 36 producer->OnConsumerIgnored(); 37 producer->RemoveConsumer(this); 38 } 39} 40 41BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const { 42 BufferHubChannel::BufferInfo info; 43 if (auto producer = GetProducer()) { 44 // If producer has not hung up, copy most buffer info from the producer. 45 info = producer->GetBufferInfo(); 46 } 47 info.id = buffer_id(); 48 return info; 49} 50 51std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const { 52 return std::static_pointer_cast<ProducerChannel>(producer_.lock()); 53} 54 55void ConsumerChannel::HandleImpulse(Message& message) { 56 ATRACE_NAME("ConsumerChannel::HandleImpulse"); 57 switch (message.GetOp()) { 58 case BufferHubRPC::ConsumerRelease::Opcode: 59 OnConsumerRelease(message, {}); 60 break; 61 } 62} 63 64bool ConsumerChannel::HandleMessage(Message& message) { 65 ATRACE_NAME("ConsumerChannel::HandleMessage"); 66 auto producer = GetProducer(); 67 if (!producer) 68 REPLY_ERROR_RETURN(message, EPIPE, true); 69 70 switch (message.GetOp()) { 71 case BufferHubRPC::GetBuffer::Opcode: 72 DispatchRemoteMethod<BufferHubRPC::GetBuffer>( 73 *producer, &ProducerChannel::OnGetBuffer, message); 74 return true; 75 76 case BufferHubRPC::NewConsumer::Opcode: 77 DispatchRemoteMethod<BufferHubRPC::NewConsumer>( 78 *producer, &ProducerChannel::OnNewConsumer, message); 79 return true; 80 81 case BufferHubRPC::ConsumerAcquire::Opcode: 82 DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>( 83 *this, &ConsumerChannel::OnConsumerAcquire, message); 84 return true; 85 86 case BufferHubRPC::ConsumerRelease::Opcode: 87 DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>( 88 *this, &ConsumerChannel::OnConsumerRelease, message); 89 return true; 90 91 case BufferHubRPC::ConsumerSetIgnore::Opcode: 92 DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>( 93 *this, &ConsumerChannel::OnConsumerSetIgnore, message); 94 return true; 95 96 default: 97 return false; 98 } 99} 100 101Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>> 102ConsumerChannel::OnConsumerAcquire(Message& message, 103 std::size_t metadata_size) { 104 ATRACE_NAME("ConsumerChannel::OnConsumerAcquire"); 105 auto producer = GetProducer(); 106 if (!producer) 107 return ErrorStatus(EPIPE); 108 109 if (acquired_ || released_) { 110 ALOGE( 111 "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: " 112 "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d", 113 ignored_, acquired_, released_, message.GetChannelId(), 114 producer->buffer_id()); 115 return ErrorStatus(EBUSY); 116 } else { 117 auto status = producer->OnConsumerAcquire(message, metadata_size); 118 if (status) { 119 ClearAvailable(); 120 acquired_ = true; 121 } 122 return status; 123 } 124} 125 126Status<void> ConsumerChannel::OnConsumerRelease(Message& message, 127 LocalFence release_fence) { 128 ATRACE_NAME("ConsumerChannel::OnConsumerRelease"); 129 auto producer = GetProducer(); 130 if (!producer) 131 return ErrorStatus(EPIPE); 132 133 if (!acquired_ || released_) { 134 ALOGE( 135 "ConsumerChannel::OnConsumerRelease: Release when not acquired: " 136 "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d", 137 ignored_, acquired_, released_, message.GetChannelId(), 138 producer->buffer_id()); 139 return ErrorStatus(EBUSY); 140 } else { 141 auto status = 142 producer->OnConsumerRelease(message, std::move(release_fence)); 143 if (status) { 144 ClearAvailable(); 145 acquired_ = false; 146 released_ = true; 147 } 148 return status; 149 } 150} 151 152Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) { 153 ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore"); 154 auto producer = GetProducer(); 155 if (!producer) 156 return ErrorStatus(EPIPE); 157 158 ignored_ = ignored; 159 if (ignored_ && acquired_) { 160 // Update the producer if ignore is set after the consumer acquires the 161 // buffer. 162 ClearAvailable(); 163 producer->OnConsumerIgnored(); 164 acquired_ = false; 165 released_ = true; 166 } 167 168 return {}; 169} 170 171bool ConsumerChannel::OnProducerPosted() { 172 if (ignored_) { 173 acquired_ = false; 174 released_ = true; 175 return false; 176 } else { 177 acquired_ = false; 178 released_ = false; 179 SignalAvailable(); 180 return true; 181 } 182} 183 184void ConsumerChannel::OnProducerClosed() { 185 producer_.reset(); 186 Hangup(); 187} 188 189} // namespace dvr 190} // namespace android 191