1#include "consumer_queue_channel.h"
2
3#include <pdx/channel_handle.h>
4
5#include "producer_channel.h"
6
7using android::pdx::ErrorStatus;
8using android::pdx::RemoteChannelHandle;
9using android::pdx::Status;
10using android::pdx::rpc::DispatchRemoteMethod;
11using android::pdx::rpc::RemoteMethodError;
12
13namespace android {
14namespace dvr {
15
16ConsumerQueueChannel::ConsumerQueueChannel(
17    BufferHubService* service, int buffer_id, int channel_id,
18    const std::shared_ptr<Channel>& producer, bool silent)
19    : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
20      producer_(producer),
21      capacity_(0),
22      silent_(silent) {
23  GetProducer()->AddConsumer(this);
24}
25
26ConsumerQueueChannel::~ConsumerQueueChannel() {
27  ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
28           channel_id());
29
30  if (auto producer = GetProducer()) {
31    producer->RemoveConsumer(this);
32  }
33}
34
35bool ConsumerQueueChannel::HandleMessage(Message& message) {
36  ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
37  auto producer = GetProducer();
38  if (!producer) {
39    RemoteMethodError(message, EPIPE);
40    return true;
41  }
42
43  switch (message.GetOp()) {
44    case BufferHubRPC::CreateConsumerQueue::Opcode:
45      DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
46          *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
47      return true;
48
49    case BufferHubRPC::GetQueueInfo::Opcode:
50      DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
51          *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
52      return true;
53
54    case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
55      DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
56          *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
57      return true;
58
59    default:
60      return false;
61  }
62}
63
64std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
65    const {
66  return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
67}
68
69void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
70  ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
71}
72
73BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
74  BufferHubChannel::BufferInfo info;
75  if (auto producer = GetProducer()) {
76    // If producer has not hung up, copy most buffer info from the producer.
77    info = producer->GetBufferInfo();
78  }
79  info.id = buffer_id();
80  info.capacity = capacity_;
81  return info;
82}
83
84void ConsumerQueueChannel::RegisterNewBuffer(
85    const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
86  ALOGD_IF(TRACE,
87           "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
88           "slot=%zu silent=%d",
89           buffer_id(), producer_channel->buffer_id(), slot, silent_);
90  // Only register buffers if the queue is not silent.
91  if (!silent_) {
92    pending_buffer_slots_.emplace(producer_channel, slot);
93
94    // Signal the client that there is new buffer available.
95    SignalAvailable();
96  }
97}
98
99Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
100ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
101  std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
102  ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
103  ALOGD_IF(TRACE,
104           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
105           "pending_buffer_slots=%zu",
106           pending_buffer_slots_.size());
107
108  // Indicate this is a silent queue that will not import buffers.
109  if (silent_)
110    return ErrorStatus(EBADR);
111
112  while (!pending_buffer_slots_.empty()) {
113    auto producer_channel = pending_buffer_slots_.front().first.lock();
114    size_t producer_slot = pending_buffer_slots_.front().second;
115    pending_buffer_slots_.pop();
116
117    // It's possible that the producer channel has expired. When this occurs,
118    // ignore the producer channel.
119    if (producer_channel == nullptr) {
120      ALOGW(
121          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
122          "channel has already been expired.");
123      continue;
124    }
125
126    auto status = producer_channel->CreateConsumer(message);
127
128    // If no buffers are imported successfully, clear available and return an
129    // error. Otherwise, return all consumer handles already imported
130    // successfully, but keep available bits on, so that the client can retry
131    // importing remaining consumer buffers.
132    if (!status) {
133      ALOGE(
134          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
135          "consumer: %s",
136          status.GetErrorMessage().c_str());
137      if (buffer_handles.empty()) {
138        ClearAvailable();
139        return status.error_status();
140      } else {
141        return {std::move(buffer_handles)};
142      }
143    }
144
145    buffer_handles.emplace_back(status.take(), producer_slot);
146  }
147
148  ClearAvailable();
149  return {std::move(buffer_handles)};
150}
151
152void ConsumerQueueChannel::OnProducerClosed() {
153  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
154           buffer_id());
155  producer_.reset();
156  Hangup();
157}
158
159}  // namespace dvr
160}  // namespace android
161