1#include "consumer_queue_channel.h"
2
3#include <pdx/channel_handle.h>
4
5#include "producer_channel.h"
6
7using android::pdx::ErrorStatus;
8using android::pdx::RemoteChannelHandle;
9using android::pdx::Status;
10using android::pdx::rpc::DispatchRemoteMethod;
11using android::pdx::rpc::RemoteMethodError;
12
13namespace android {
14namespace dvr {
15
16ConsumerQueueChannel::ConsumerQueueChannel(
17    BufferHubService* service, int buffer_id, int channel_id,
18    const std::shared_ptr<Channel>& producer)
19    : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
20      producer_(producer),
21      capacity_(0) {
22  GetProducer()->AddConsumer(this);
23}
24
25ConsumerQueueChannel::~ConsumerQueueChannel() {
26  ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
27           channel_id());
28
29  if (auto producer = GetProducer()) {
30    producer->RemoveConsumer(this);
31  }
32}
33
34bool ConsumerQueueChannel::HandleMessage(Message& message) {
35  ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
36  auto producer = GetProducer();
37  if (!producer) {
38    RemoteMethodError(message, EPIPE);
39    return true;
40  }
41
42  switch (message.GetOp()) {
43    case BufferHubRPC::CreateConsumerQueue::Opcode:
44      DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
45          *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
46      return true;
47
48    case BufferHubRPC::GetQueueInfo::Opcode:
49      DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
50          *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
51      return true;
52
53    case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
54      DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
55          *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
56      return true;
57
58    default:
59      return false;
60  }
61}
62
63std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
64    const {
65  return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
66}
67
68void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
69  ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
70}
71
72BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
73  BufferHubChannel::BufferInfo info;
74  if (auto producer = GetProducer()) {
75    // If producer has not hung up, copy most buffer info from the producer.
76    info = producer->GetBufferInfo();
77  }
78  info.id = buffer_id();
79  info.capacity = capacity_;
80  return info;
81}
82
83void ConsumerQueueChannel::RegisterNewBuffer(
84    const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
85  ALOGD_IF(TRACE,
86           "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
87           producer_channel->buffer_id(), slot);
88  pending_buffer_slots_.emplace(producer_channel, slot);
89
90  // Signal the client that there is new buffer available throught POLLIN.
91  SignalAvailable();
92}
93
94Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
95ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
96  std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
97  ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
98  ALOGD_IF(
99      TRACE,
100      "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
101      "import: %zu",
102      pending_buffer_slots_.size());
103
104  while (!pending_buffer_slots_.empty()) {
105    auto producer_channel = pending_buffer_slots_.front().first.lock();
106    size_t producer_slot = pending_buffer_slots_.front().second;
107    pending_buffer_slots_.pop();
108
109    // It's possible that the producer channel has expired. When this occurs,
110    // ignore the producer channel.
111    if (producer_channel == nullptr) {
112      ALOGW(
113          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
114          "channel has already been expired.");
115      continue;
116    }
117
118    auto status = producer_channel->CreateConsumer(message);
119
120    // If no buffers are imported successfully, clear available and return an
121    // error. Otherwise, return all consumer handles already imported
122    // successfully, but keep available bits on, so that the client can retry
123    // importing remaining consumer buffers.
124    if (!status) {
125      ALOGE(
126          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
127          "consumer: %s",
128          status.GetErrorMessage().c_str());
129      if (buffer_handles.empty()) {
130        ClearAvailable();
131        return status.error_status();
132      } else {
133        return {std::move(buffer_handles)};
134      }
135    }
136
137    buffer_handles.emplace_back(status.take(), producer_slot);
138  }
139
140  ClearAvailable();
141  return {std::move(buffer_handles)};
142}
143
144void ConsumerQueueChannel::OnProducerClosed() {
145  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
146           buffer_id());
147  producer_.reset();
148  Hangup();
149}
150
151}  // namespace dvr
152}  // namespace android
153