1#include "consumer_channel.h"
2
3#include <log/log.h>
4#include <utils/Trace.h>
5
6#include <thread>
7
8#include <private/dvr/bufferhub_rpc.h>
9#include "producer_channel.h"
10
11using android::pdx::BorrowedHandle;
12using android::pdx::Channel;
13using android::pdx::ErrorStatus;
14using android::pdx::Message;
15using android::pdx::Status;
16using android::pdx::rpc::DispatchRemoteMethod;
17
18namespace android {
19namespace dvr {
20
21ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
22                                 int channel_id, uint64_t consumer_state_bit,
23                                 const std::shared_ptr<Channel> producer)
24    : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
25      consumer_state_bit_(consumer_state_bit),
26      producer_(producer) {
27  GetProducer()->AddConsumer(this);
28}
29
30ConsumerChannel::~ConsumerChannel() {
31  ALOGD_IF(TRACE,
32           "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
33           channel_id(), buffer_id());
34
35  if (auto producer = GetProducer()) {
36    producer->RemoveConsumer(this);
37  }
38}
39
40BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
41  BufferHubChannel::BufferInfo info;
42  if (auto producer = GetProducer()) {
43    // If producer has not hung up, copy most buffer info from the producer.
44    info = producer->GetBufferInfo();
45  } else {
46    info.signaled_mask = consumer_state_bit();
47  }
48  info.id = buffer_id();
49  return info;
50}
51
52std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
53  return std::static_pointer_cast<ProducerChannel>(producer_.lock());
54}
55
56void ConsumerChannel::HandleImpulse(Message& message) {
57  ATRACE_NAME("ConsumerChannel::HandleImpulse");
58  switch (message.GetOp()) {
59    case BufferHubRPC::ConsumerAcquire::Opcode:
60      OnConsumerAcquire(message);
61      break;
62    case BufferHubRPC::ConsumerRelease::Opcode:
63      OnConsumerRelease(message, {});
64      break;
65  }
66}
67
68bool ConsumerChannel::HandleMessage(Message& message) {
69  ATRACE_NAME("ConsumerChannel::HandleMessage");
70  auto producer = GetProducer();
71  if (!producer)
72    REPLY_ERROR_RETURN(message, EPIPE, true);
73
74  switch (message.GetOp()) {
75    case BufferHubRPC::GetBuffer::Opcode:
76      DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
77          *this, &ConsumerChannel::OnGetBuffer, message);
78      return true;
79
80    case BufferHubRPC::NewConsumer::Opcode:
81      DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
82          *producer, &ProducerChannel::OnNewConsumer, message);
83      return true;
84
85    case BufferHubRPC::ConsumerAcquire::Opcode:
86      DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
87          *this, &ConsumerChannel::OnConsumerAcquire, message);
88      return true;
89
90    case BufferHubRPC::ConsumerRelease::Opcode:
91      DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
92          *this, &ConsumerChannel::OnConsumerRelease, message);
93      return true;
94
95    case BufferHubRPC::ConsumerSetIgnore::Opcode:
96      DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
97          *this, &ConsumerChannel::OnConsumerSetIgnore, message);
98      return true;
99
100    default:
101      return false;
102  }
103}
104
105Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer(
106    Message& /*message*/) {
107  ATRACE_NAME("ConsumerChannel::OnGetBuffer");
108  ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id());
109  if (auto producer = GetProducer()) {
110    return {producer->GetBuffer(consumer_state_bit_)};
111  } else {
112    return ErrorStatus(EPIPE);
113  }
114}
115
116Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) {
117  ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
118  auto producer = GetProducer();
119  if (!producer)
120    return ErrorStatus(EPIPE);
121
122  if (acquired_ || released_) {
123    ALOGE(
124        "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
125        "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
126        ignored_, acquired_, released_, message.GetChannelId(),
127        producer->buffer_id());
128    return ErrorStatus(EBUSY);
129  } else {
130    auto status = producer->OnConsumerAcquire(message);
131    if (status) {
132      ClearAvailable();
133      acquired_ = true;
134    }
135    return status;
136  }
137}
138
139Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
140                                                LocalFence release_fence) {
141  ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
142  auto producer = GetProducer();
143  if (!producer)
144    return ErrorStatus(EPIPE);
145
146  if (!acquired_ || released_) {
147    ALOGE(
148        "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
149        "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
150        ignored_, acquired_, released_, message.GetChannelId(),
151        producer->buffer_id());
152    return ErrorStatus(EBUSY);
153  } else {
154    auto status =
155        producer->OnConsumerRelease(message, std::move(release_fence));
156    if (status) {
157      ClearAvailable();
158      acquired_ = false;
159      released_ = true;
160    }
161    return status;
162  }
163}
164
165Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
166  ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
167  auto producer = GetProducer();
168  if (!producer)
169    return ErrorStatus(EPIPE);
170
171  ignored_ = ignored;
172  if (ignored_ && acquired_) {
173    // Update the producer if ignore is set after the consumer acquires the
174    // buffer.
175    ClearAvailable();
176    producer->OnConsumerIgnored();
177    acquired_ = false;
178    released_ = true;
179  }
180
181  return {};
182}
183
184bool ConsumerChannel::OnProducerPosted() {
185  if (ignored_) {
186    acquired_ = false;
187    released_ = true;
188    return false;
189  } else {
190    acquired_ = false;
191    released_ = false;
192    SignalAvailable();
193    return true;
194  }
195}
196
197void ConsumerChannel::OnProducerClosed() {
198  producer_.reset();
199  Hangup();
200}
201
202}  // namespace dvr
203}  // namespace android
204