1#include "consumer_channel.h" 2 3#include <log/log.h> 4#include <utils/Trace.h> 5 6#include <thread> 7 8#include <private/dvr/bufferhub_rpc.h> 9#include "producer_channel.h" 10 11using android::pdx::BorrowedHandle; 12using android::pdx::Channel; 13using android::pdx::ErrorStatus; 14using android::pdx::Message; 15using android::pdx::Status; 16using android::pdx::rpc::DispatchRemoteMethod; 17 18namespace android { 19namespace dvr { 20 21ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id, 22 int channel_id, uint64_t consumer_state_bit, 23 const std::shared_ptr<Channel> producer) 24 : BufferHubChannel(service, buffer_id, channel_id, kConsumerType), 25 consumer_state_bit_(consumer_state_bit), 26 producer_(producer) { 27 GetProducer()->AddConsumer(this); 28} 29 30ConsumerChannel::~ConsumerChannel() { 31 ALOGD_IF(TRACE, 32 "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d", 33 channel_id(), buffer_id()); 34 35 if (auto producer = GetProducer()) { 36 producer->RemoveConsumer(this); 37 } 38} 39 40BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const { 41 BufferHubChannel::BufferInfo info; 42 if (auto producer = GetProducer()) { 43 // If producer has not hung up, copy most buffer info from the producer. 44 info = producer->GetBufferInfo(); 45 } else { 46 info.signaled_mask = consumer_state_bit(); 47 } 48 info.id = buffer_id(); 49 return info; 50} 51 52std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const { 53 return std::static_pointer_cast<ProducerChannel>(producer_.lock()); 54} 55 56void ConsumerChannel::HandleImpulse(Message& message) { 57 ATRACE_NAME("ConsumerChannel::HandleImpulse"); 58 switch (message.GetOp()) { 59 case BufferHubRPC::ConsumerAcquire::Opcode: 60 OnConsumerAcquire(message); 61 break; 62 case BufferHubRPC::ConsumerRelease::Opcode: 63 OnConsumerRelease(message, {}); 64 break; 65 } 66} 67 68bool ConsumerChannel::HandleMessage(Message& message) { 69 ATRACE_NAME("ConsumerChannel::HandleMessage"); 70 auto producer = GetProducer(); 71 if (!producer) 72 REPLY_ERROR_RETURN(message, EPIPE, true); 73 74 switch (message.GetOp()) { 75 case BufferHubRPC::GetBuffer::Opcode: 76 DispatchRemoteMethod<BufferHubRPC::GetBuffer>( 77 *this, &ConsumerChannel::OnGetBuffer, message); 78 return true; 79 80 case BufferHubRPC::NewConsumer::Opcode: 81 DispatchRemoteMethod<BufferHubRPC::NewConsumer>( 82 *producer, &ProducerChannel::OnNewConsumer, message); 83 return true; 84 85 case BufferHubRPC::ConsumerAcquire::Opcode: 86 DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>( 87 *this, &ConsumerChannel::OnConsumerAcquire, message); 88 return true; 89 90 case BufferHubRPC::ConsumerRelease::Opcode: 91 DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>( 92 *this, &ConsumerChannel::OnConsumerRelease, message); 93 return true; 94 95 case BufferHubRPC::ConsumerSetIgnore::Opcode: 96 DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>( 97 *this, &ConsumerChannel::OnConsumerSetIgnore, message); 98 return true; 99 100 default: 101 return false; 102 } 103} 104 105Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer( 106 Message& /*message*/) { 107 ATRACE_NAME("ConsumerChannel::OnGetBuffer"); 108 ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id()); 109 if (auto producer = GetProducer()) { 110 return {producer->GetBuffer(consumer_state_bit_)}; 111 } else { 112 return ErrorStatus(EPIPE); 113 } 114} 115 116Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) { 117 ATRACE_NAME("ConsumerChannel::OnConsumerAcquire"); 118 auto producer = GetProducer(); 119 if (!producer) 120 return ErrorStatus(EPIPE); 121 122 if (acquired_ || released_) { 123 ALOGE( 124 "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: " 125 "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d", 126 ignored_, acquired_, released_, message.GetChannelId(), 127 producer->buffer_id()); 128 return ErrorStatus(EBUSY); 129 } else { 130 auto status = producer->OnConsumerAcquire(message); 131 if (status) { 132 ClearAvailable(); 133 acquired_ = true; 134 } 135 return status; 136 } 137} 138 139Status<void> ConsumerChannel::OnConsumerRelease(Message& message, 140 LocalFence release_fence) { 141 ATRACE_NAME("ConsumerChannel::OnConsumerRelease"); 142 auto producer = GetProducer(); 143 if (!producer) 144 return ErrorStatus(EPIPE); 145 146 if (!acquired_ || released_) { 147 ALOGE( 148 "ConsumerChannel::OnConsumerRelease: Release when not acquired: " 149 "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d", 150 ignored_, acquired_, released_, message.GetChannelId(), 151 producer->buffer_id()); 152 return ErrorStatus(EBUSY); 153 } else { 154 auto status = 155 producer->OnConsumerRelease(message, std::move(release_fence)); 156 if (status) { 157 ClearAvailable(); 158 acquired_ = false; 159 released_ = true; 160 } 161 return status; 162 } 163} 164 165Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) { 166 ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore"); 167 auto producer = GetProducer(); 168 if (!producer) 169 return ErrorStatus(EPIPE); 170 171 ignored_ = ignored; 172 if (ignored_ && acquired_) { 173 // Update the producer if ignore is set after the consumer acquires the 174 // buffer. 175 ClearAvailable(); 176 producer->OnConsumerIgnored(); 177 acquired_ = false; 178 released_ = true; 179 } 180 181 return {}; 182} 183 184bool ConsumerChannel::OnProducerPosted() { 185 if (ignored_) { 186 acquired_ = false; 187 released_ = true; 188 return false; 189 } else { 190 acquired_ = false; 191 released_ = false; 192 SignalAvailable(); 193 return true; 194 } 195} 196 197void ConsumerChannel::OnProducerClosed() { 198 producer_.reset(); 199 Hangup(); 200} 201 202} // namespace dvr 203} // namespace android 204