consumer_channel.cpp revision 4fe60582f314e381098f8f3bc2e39c5880e9243a
1#include "consumer_channel.h"
2
3#include <log/log.h>
4#include <utils/Trace.h>
5
6#include <thread>
7
8#include <private/dvr/bufferhub_rpc.h>
9#include "producer_channel.h"
10
11using android::pdx::BorrowedHandle;
12using android::pdx::Channel;
13using android::pdx::Message;
14using android::pdx::rpc::DispatchRemoteMethod;
15
16namespace android {
17namespace dvr {
18
19ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
20                                 int channel_id,
21                                 const std::shared_ptr<Channel> producer)
22    : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
23      handled_(true),
24      ignored_(false),
25      producer_(producer) {
26  GetProducer()->AddConsumer(this);
27}
28
29ConsumerChannel::~ConsumerChannel() {
30  ALOGD_IF(TRACE, "ConsumerChannel::~ConsumerChannel: channel_id=%d",
31           channel_id());
32
33  if (auto producer = GetProducer()) {
34    if (!handled_)  // Producer is waiting for our Release.
35      producer->OnConsumerIgnored();
36    producer->RemoveConsumer(this);
37  }
38}
39
40BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
41  BufferHubChannel::BufferInfo info;
42  if (auto producer = GetProducer()) {
43    // If producer has not hung up, copy most buffer info from the producer.
44    info = producer->GetBufferInfo();
45  }
46  info.id = buffer_id();
47  return info;
48}
49
50std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
51  return std::static_pointer_cast<ProducerChannel>(producer_.lock());
52}
53
54void ConsumerChannel::HandleImpulse(Message& message) {
55  ATRACE_NAME("ConsumerChannel::HandleImpulse");
56  switch (message.GetOp()) {
57    case BufferHubRPC::ConsumerRelease::Opcode:
58      OnConsumerRelease(message, {});
59      break;
60  }
61}
62
63bool ConsumerChannel::HandleMessage(Message& message) {
64  ATRACE_NAME("ConsumerChannel::HandleMessage");
65  auto producer = GetProducer();
66  if (!producer)
67    REPLY_ERROR_RETURN(message, EPIPE, true);
68
69  switch (message.GetOp()) {
70    case BufferHubRPC::GetBuffer::Opcode:
71      DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
72          *producer, &ProducerChannel::OnGetBuffer, message);
73      return true;
74
75    case BufferHubRPC::GetBuffers::Opcode:
76      DispatchRemoteMethod<BufferHubRPC::GetBuffers>(
77          *producer, &ProducerChannel::OnGetBuffers, message);
78      return true;
79
80    case BufferHubRPC::NewConsumer::Opcode:
81      DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
82          *producer, &ProducerChannel::OnNewConsumer, message);
83      return true;
84
85    case BufferHubRPC::ConsumerAcquire::Opcode:
86      DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
87          *this, &ConsumerChannel::OnConsumerAcquire, message);
88      return true;
89
90    case BufferHubRPC::ConsumerRelease::Opcode:
91      DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
92          *this, &ConsumerChannel::OnConsumerRelease, message);
93      return true;
94
95    case BufferHubRPC::ConsumerSetIgnore::Opcode:
96      DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
97          *this, &ConsumerChannel::OnConsumerSetIgnore, message);
98      return true;
99
100    default:
101      return false;
102  }
103}
104
105std::pair<BorrowedFence, ConsumerChannel::MetaData>
106ConsumerChannel::OnConsumerAcquire(Message& message,
107                                   std::size_t metadata_size) {
108  ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
109  auto producer = GetProducer();
110  if (!producer)
111    REPLY_ERROR_RETURN(message, EPIPE, {});
112
113  if (ignored_ || handled_) {
114    ALOGE(
115        "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
116        "ignored=%d handled=%d channel_id=%d buffer_id=%d",
117        ignored_, handled_, message.GetChannelId(), producer->buffer_id());
118    REPLY_ERROR_RETURN(message, EBUSY, {});
119  } else {
120    ClearAvailable();
121    return producer->OnConsumerAcquire(message, metadata_size);
122  }
123}
124
125int ConsumerChannel::OnConsumerRelease(Message& message,
126                                       LocalFence release_fence) {
127  ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
128  auto producer = GetProducer();
129  if (!producer)
130    return -EPIPE;
131
132  if (ignored_ || handled_) {
133    ALOGE(
134        "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
135        "ignored=%d handled=%d channel_id=%d buffer_id=%d",
136        ignored_, handled_, message.GetChannelId(), producer->buffer_id());
137    return -EBUSY;
138  } else {
139    ClearAvailable();
140    const int ret =
141        producer->OnConsumerRelease(message, std::move(release_fence));
142    handled_ = ret == 0;
143    return ret;
144  }
145}
146
147int ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
148  ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
149  auto producer = GetProducer();
150  if (!producer)
151    return -EPIPE;
152
153  ignored_ = ignored;
154  if (ignored_ && !handled_) {
155    // Update the producer if ignore is set after the consumer acquires the
156    // buffer.
157    ClearAvailable();
158    producer->OnConsumerIgnored();
159    handled_ = false;
160  }
161
162  return 0;
163}
164
165bool ConsumerChannel::OnProducerPosted() {
166  if (ignored_) {
167    handled_ = true;
168    return false;
169  } else {
170    handled_ = false;
171    SignalAvailable();
172    return true;
173  }
174}
175
176void ConsumerChannel::OnProducerClosed() {
177  producer_.reset();
178  Hangup();
179}
180
181}  // namespace dvr
182}  // namespace android
183