consumer_channel.cpp revision d53870c58cbb3671c2efdae3cc53850b962aa9dc
1#include "consumer_channel.h"
2
3#include <log/log.h>
4#include <utils/Trace.h>
5
6#include <thread>
7
8#include <private/dvr/bufferhub_rpc.h>
9#include "producer_channel.h"
10
11using android::pdx::BorrowedHandle;
12using android::pdx::Channel;
13using android::pdx::ErrorStatus;
14using android::pdx::Message;
15using android::pdx::Status;
16using android::pdx::rpc::DispatchRemoteMethod;
17
18namespace android {
19namespace dvr {
20
21ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
22                                 int channel_id,
23                                 const std::shared_ptr<Channel> producer)
24    : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
25      producer_(producer) {
26  GetProducer()->AddConsumer(this);
27}
28
29ConsumerChannel::~ConsumerChannel() {
30  ALOGD_IF(TRACE,
31           "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
32           channel_id(), buffer_id());
33
34  if (auto producer = GetProducer()) {
35    if (!released_)  // Producer is waiting for our Release.
36      producer->OnConsumerIgnored();
37    producer->RemoveConsumer(this);
38  }
39}
40
41BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
42  BufferHubChannel::BufferInfo info;
43  if (auto producer = GetProducer()) {
44    // If producer has not hung up, copy most buffer info from the producer.
45    info = producer->GetBufferInfo();
46  }
47  info.id = buffer_id();
48  return info;
49}
50
51std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
52  return std::static_pointer_cast<ProducerChannel>(producer_.lock());
53}
54
55void ConsumerChannel::HandleImpulse(Message& message) {
56  ATRACE_NAME("ConsumerChannel::HandleImpulse");
57  switch (message.GetOp()) {
58    case BufferHubRPC::ConsumerRelease::Opcode:
59      OnConsumerRelease(message, {});
60      break;
61  }
62}
63
64bool ConsumerChannel::HandleMessage(Message& message) {
65  ATRACE_NAME("ConsumerChannel::HandleMessage");
66  auto producer = GetProducer();
67  if (!producer)
68    REPLY_ERROR_RETURN(message, EPIPE, true);
69
70  switch (message.GetOp()) {
71    case BufferHubRPC::GetBuffer::Opcode:
72      DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
73          *producer, &ProducerChannel::OnGetBuffer, message);
74      return true;
75
76    case BufferHubRPC::NewConsumer::Opcode:
77      DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
78          *producer, &ProducerChannel::OnNewConsumer, message);
79      return true;
80
81    case BufferHubRPC::ConsumerAcquire::Opcode:
82      DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
83          *this, &ConsumerChannel::OnConsumerAcquire, message);
84      return true;
85
86    case BufferHubRPC::ConsumerRelease::Opcode:
87      DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
88          *this, &ConsumerChannel::OnConsumerRelease, message);
89      return true;
90
91    case BufferHubRPC::ConsumerSetIgnore::Opcode:
92      DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
93          *this, &ConsumerChannel::OnConsumerSetIgnore, message);
94      return true;
95
96    default:
97      return false;
98  }
99}
100
101Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>>
102ConsumerChannel::OnConsumerAcquire(Message& message,
103                                   std::size_t metadata_size) {
104  ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
105  auto producer = GetProducer();
106  if (!producer)
107    return ErrorStatus(EPIPE);
108
109  if (acquired_ || released_) {
110    ALOGE(
111        "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
112        "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
113        ignored_, acquired_, released_, message.GetChannelId(),
114        producer->buffer_id());
115    return ErrorStatus(EBUSY);
116  } else {
117    auto status = producer->OnConsumerAcquire(message, metadata_size);
118    if (status) {
119      ClearAvailable();
120      acquired_ = true;
121    }
122    return status;
123  }
124}
125
126Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
127                                                LocalFence release_fence) {
128  ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
129  auto producer = GetProducer();
130  if (!producer)
131    return ErrorStatus(EPIPE);
132
133  if (!acquired_ || released_) {
134    ALOGE(
135        "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
136        "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
137        ignored_, acquired_, released_, message.GetChannelId(),
138        producer->buffer_id());
139    return ErrorStatus(EBUSY);
140  } else {
141    auto status =
142        producer->OnConsumerRelease(message, std::move(release_fence));
143    if (status) {
144      ClearAvailable();
145      acquired_ = false;
146      released_ = true;
147    }
148    return status;
149  }
150}
151
152Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
153  ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
154  auto producer = GetProducer();
155  if (!producer)
156    return ErrorStatus(EPIPE);
157
158  ignored_ = ignored;
159  if (ignored_ && acquired_) {
160    // Update the producer if ignore is set after the consumer acquires the
161    // buffer.
162    ClearAvailable();
163    producer->OnConsumerIgnored();
164    acquired_ = false;
165    released_ = true;
166  }
167
168  return {};
169}
170
171bool ConsumerChannel::OnProducerPosted() {
172  if (ignored_) {
173    acquired_ = false;
174    released_ = true;
175    return false;
176  } else {
177    acquired_ = false;
178    released_ = false;
179    SignalAvailable();
180    return true;
181  }
182}
183
184void ConsumerChannel::OnProducerClosed() {
185  producer_.reset();
186  Hangup();
187}
188
189}  // namespace dvr
190}  // namespace android
191