1#include "include/private/dvr/buffer_hub_queue_client.h"
2
3#include <inttypes.h>
4#include <log/log.h>
5#include <poll.h>
6#include <sys/epoll.h>
7
8#include <array>
9
10#include <pdx/default_transport/client_channel.h>
11#include <pdx/default_transport/client_channel_factory.h>
12#include <pdx/file_handle.h>
13#include <pdx/trace.h>
14
15#define RETRY_EINTR(fnc_call)                 \
16  ([&]() -> decltype(fnc_call) {              \
17    decltype(fnc_call) result;                \
18    do {                                      \
19      result = (fnc_call);                    \
20    } while (result == -1 && errno == EINTR); \
21    return result;                            \
22  })()
23
24using android::pdx::ErrorStatus;
25using android::pdx::LocalChannelHandle;
26using android::pdx::LocalHandle;
27using android::pdx::Status;
28
29namespace android {
30namespace dvr {
31
32namespace {
33
34std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
35  return {static_cast<int32_t>(value >> 32),
36          static_cast<int32_t>(value & ((1ull << 32) - 1))};
37}
38
39uint64_t Stuff(int32_t a, int32_t b) {
40  const uint32_t ua = static_cast<uint32_t>(a);
41  const uint32_t ub = static_cast<uint32_t>(b);
42  return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
43}
44
45}  // anonymous namespace
46
47BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
48    : Client{pdx::default_transport::ClientChannel::Create(
49          std::move(channel_handle))} {
50  Initialize();
51}
52
53BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
54    : Client{
55          pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
56  Initialize();
57}
58
59void BufferHubQueue::Initialize() {
60  int ret = epoll_fd_.Create();
61  if (ret < 0) {
62    ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
63          strerror(-ret));
64    return;
65  }
66
67  epoll_event event = {
68      .events = EPOLLIN | EPOLLET,
69      .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
70  ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
71  if (ret < 0) {
72    ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
73          strerror(-ret));
74  }
75}
76
77Status<void> BufferHubQueue::ImportQueue() {
78  auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
79  if (!status) {
80    ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
81          status.GetErrorMessage().c_str());
82    return ErrorStatus(status.error());
83  } else {
84    SetupQueue(status.get());
85    return {};
86  }
87}
88
89void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
90  is_async_ = queue_info.producer_config.is_async;
91  default_width_ = queue_info.producer_config.default_width;
92  default_height_ = queue_info.producer_config.default_height;
93  default_format_ = queue_info.producer_config.default_format;
94  user_metadata_size_ = queue_info.producer_config.user_metadata_size;
95  id_ = queue_info.id;
96}
97
98std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
99  if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
100    return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
101  else
102    return nullptr;
103}
104
105std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
106  if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
107    return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
108  else
109    return nullptr;
110}
111
112Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
113    bool silent) {
114  auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
115  if (!status) {
116    ALOGE(
117        "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
118        "%s",
119        status.GetErrorMessage().c_str());
120    return ErrorStatus(status.error());
121  }
122
123  return status;
124}
125
126pdx::Status<ConsumerQueueParcelable>
127BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
128  auto status = CreateConsumerQueueHandle(silent);
129  if (!status)
130    return status.error_status();
131
132  // A temporary consumer queue client to pull its channel parcelable.
133  auto consumer_queue =
134      std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
135  ConsumerQueueParcelable queue_parcelable(
136      consumer_queue->GetChannel()->TakeChannelParcelable());
137
138  if (!queue_parcelable.IsValid()) {
139    ALOGE(
140        "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create "
141        "consumer queue parcelable.");
142    return ErrorStatus(EINVAL);
143  }
144
145  return {std::move(queue_parcelable)};
146}
147
148bool BufferHubQueue::WaitForBuffers(int timeout) {
149  ATRACE_NAME("BufferHubQueue::WaitForBuffers");
150  std::array<epoll_event, kMaxEvents> events;
151
152  // Loop at least once to check for hangups.
153  do {
154    ALOGD_IF(
155        TRACE,
156        "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
157        id(), count(), capacity());
158
159    // If there is already a buffer then just check for hangup without waiting.
160    const int ret = epoll_fd_.Wait(events.data(), events.size(),
161                                   count() == 0 ? timeout : 0);
162
163    if (ret == 0) {
164      ALOGI_IF(TRACE,
165               "BufferHubQueue::WaitForBuffers: No events before timeout: "
166               "queue_id=%d",
167               id());
168      return count() != 0;
169    }
170
171    if (ret < 0 && ret != -EINTR) {
172      ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
173            strerror(-ret));
174      return false;
175    }
176
177    const int num_events = ret;
178
179    // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
180    // one for each buffer in the queue, and one extra event for the queue
181    // client itself.
182    for (int i = 0; i < num_events; i++) {
183      int32_t event_fd;
184      int32_t index;
185      std::tie(event_fd, index) = Unstuff(events[i].data.u64);
186
187      PDX_TRACE_FORMAT(
188          "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
189          "slot=%d|",
190          id(), num_events, i, event_fd, index);
191
192      ALOGD_IF(TRACE,
193               "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
194               i, event_fd, index);
195
196      if (is_buffer_event_index(index)) {
197        HandleBufferEvent(static_cast<size_t>(index), event_fd,
198                          events[i].events);
199      } else if (is_queue_event_index(index)) {
200        HandleQueueEvent(events[i].events);
201      } else {
202        ALOGW(
203            "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
204            "index=%d",
205            event_fd, index);
206      }
207    }
208  } while (count() == 0 && capacity() > 0 && !hung_up());
209
210  return count() != 0;
211}
212
213Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
214                                               int poll_events) {
215  ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
216  if (!buffers_[slot]) {
217    ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
218    return ErrorStatus(ENOENT);
219  }
220
221  auto status = buffers_[slot]->GetEventMask(poll_events);
222  if (!status) {
223    ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
224          status.GetErrorMessage().c_str());
225    return status.error_status();
226  }
227
228  const int events = status.get();
229  PDX_TRACE_FORMAT(
230      "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
231      "events=%d|",
232      id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
233
234  if (events & EPOLLIN) {
235    return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
236  } else if (events & EPOLLHUP) {
237    ALOGW(
238        "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
239        "event_fd=%d buffer_id=%d",
240        slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
241    return RemoveBuffer(slot);
242  } else {
243    ALOGW(
244        "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
245        "events=%d",
246        slot, events);
247  }
248
249  return {};
250}
251
252Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
253  ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
254  auto status = GetEventMask(poll_event);
255  if (!status) {
256    ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
257          status.GetErrorMessage().c_str());
258    return status.error_status();
259  }
260
261  const int events = status.get();
262  if (events & EPOLLIN) {
263    // Note that after buffer imports, if |count()| still returns 0, epoll
264    // wait will be tried again to acquire the newly imported buffer.
265    auto buffer_status = OnBufferAllocated();
266    if (!buffer_status) {
267      ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
268            buffer_status.GetErrorMessage().c_str());
269    }
270  } else if (events & EPOLLHUP) {
271    ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
272    hung_up_ = true;
273  } else {
274    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
275  }
276
277  return {};
278}
279
280Status<void> BufferHubQueue::AddBuffer(
281    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
282  ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
283           buffer->id(), slot);
284
285  if (is_full()) {
286    ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
287          capacity_);
288    return ErrorStatus(E2BIG);
289  }
290
291  if (buffers_[slot]) {
292    // Replace the buffer if the slot is occupied. This could happen when the
293    // producer side replaced the slot with a newly allocated buffer. Remove the
294    // buffer before setting up with the new one.
295    auto remove_status = RemoveBuffer(slot);
296    if (!remove_status)
297      return remove_status.error_status();
298  }
299
300  for (const auto& event_source : buffer->GetEventSources()) {
301    epoll_event event = {.events = event_source.event_mask | EPOLLET,
302                         .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
303    const int ret =
304        epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
305    if (ret < 0) {
306      ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
307            strerror(-ret));
308      return ErrorStatus(-ret);
309    }
310  }
311
312  buffers_[slot] = buffer;
313  capacity_++;
314  return {};
315}
316
317Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
318  ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
319
320  if (buffers_[slot]) {
321    for (const auto& event_source : buffers_[slot]->GetEventSources()) {
322      const int ret =
323          epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
324      if (ret < 0) {
325        ALOGE(
326            "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
327            "set: %s",
328            strerror(-ret));
329        return ErrorStatus(-ret);
330      }
331    }
332
333    // Trigger OnBufferRemoved callback if registered.
334    if (on_buffer_removed_)
335      on_buffer_removed_(buffers_[slot]);
336
337    buffers_[slot] = nullptr;
338    capacity_--;
339  }
340
341  return {};
342}
343
344Status<void> BufferHubQueue::Enqueue(Entry entry) {
345  if (!is_full()) {
346    available_buffers_.push(std::move(entry));
347
348    // Trigger OnBufferAvailable callback if registered.
349    if (on_buffer_available_)
350      on_buffer_available_();
351
352    return {};
353  } else {
354    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
355    return ErrorStatus(E2BIG);
356  }
357}
358
359Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
360                                                                 size_t* slot) {
361  ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
362           timeout);
363
364  PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
365
366  if (count() == 0) {
367    if (!WaitForBuffers(timeout))
368      return ErrorStatus(ETIMEDOUT);
369  }
370
371  auto& entry = available_buffers_.top();
372  PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
373                   entry.slot);
374
375  std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
376  *slot = entry.slot;
377
378  available_buffers_.pop();
379
380  return {std::move(buffer)};
381}
382
383void BufferHubQueue::SetBufferAvailableCallback(
384    BufferAvailableCallback callback) {
385  on_buffer_available_ = callback;
386}
387
388void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
389  on_buffer_removed_ = callback;
390}
391
392pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
393  // Clear all available buffers.
394  while (!available_buffers_.empty())
395    available_buffers_.pop();
396
397  pdx::Status<void> last_error;  // No error.
398  // Clear all buffers this producer queue is tracking.
399  for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
400    if (buffers_[slot] != nullptr) {
401      auto status = RemoveBuffer(slot);
402      if (!status) {
403        ALOGE(
404            "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
405            "slot=%zu.",
406            slot);
407        last_error = status.error_status();
408      }
409    }
410  }
411
412  return last_error;
413}
414
415ProducerQueue::ProducerQueue(LocalChannelHandle handle)
416    : BASE(std::move(handle)) {
417  auto status = ImportQueue();
418  if (!status) {
419    ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
420          status.GetErrorMessage().c_str());
421    Close(-status.error());
422  }
423}
424
425ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
426                             const UsagePolicy& usage)
427    : BASE(BufferHubRPC::kClientPath) {
428  auto status =
429      InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
430  if (!status) {
431    ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
432          status.GetErrorMessage().c_str());
433    Close(-status.error());
434    return;
435  }
436
437  SetupQueue(status.get());
438}
439
440Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
441    uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
442    uint64_t usage, size_t buffer_count) {
443  if (capacity() + buffer_count > kMaxQueueCapacity) {
444    ALOGE(
445        "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
446        "allocate %zu more buffer(s).",
447        capacity(), buffer_count);
448    return ErrorStatus(E2BIG);
449  }
450
451  Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
452      InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
453          width, height, layer_count, format, usage, buffer_count);
454  if (!status) {
455    ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
456          status.GetErrorMessage().c_str());
457    return status.error_status();
458  }
459
460  auto buffer_handle_slots = status.take();
461  LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
462                      "BufferHubRPC::ProducerQueueAllocateBuffers should "
463                      "return %zu buffer handle(s), but returned %zu instead.",
464                      buffer_count, buffer_handle_slots.size());
465
466  std::vector<size_t> buffer_slots;
467  buffer_slots.reserve(buffer_count);
468
469  // Bookkeeping for each buffer.
470  for (auto& hs : buffer_handle_slots) {
471    auto& buffer_handle = hs.first;
472    size_t buffer_slot = hs.second;
473
474    // Note that import might (though very unlikely) fail. If so, buffer_handle
475    // will be closed and included in returned buffer_slots.
476    if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
477                  buffer_slot)) {
478      ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
479               buffer_slot);
480      buffer_slots.push_back(buffer_slot);
481    }
482  }
483
484  if (buffer_slots.size() == 0) {
485    // Error out if no buffer is allocated and improted.
486    ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated.");
487    ErrorStatus(ENOMEM);
488  }
489
490  return {std::move(buffer_slots)};
491}
492
493Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
494                                             uint32_t layer_count,
495                                             uint32_t format, uint64_t usage) {
496  // We only allocate one buffer at a time.
497  constexpr size_t buffer_count = 1;
498  auto status =
499      AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
500  if (!status) {
501    ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
502          status.GetErrorMessage().c_str());
503    return status.error_status();
504  }
505
506  if (status.get().size() == 0) {
507    ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated.");
508    ErrorStatus(ENOMEM);
509  }
510
511  return {status.get()[0]};
512}
513
514Status<void> ProducerQueue::AddBuffer(
515    const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
516  ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
517           id(), buffer->id(), slot);
518  // For producer buffer, we need to enqueue the newly added buffer
519  // immediately. Producer queue starts with all buffers in available state.
520  auto status = BufferHubQueue::AddBuffer(buffer, slot);
521  if (!status)
522    return status;
523
524  return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
525}
526
527Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
528  auto status =
529      InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
530  if (!status) {
531    ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
532          status.GetErrorMessage().c_str());
533    return status.error_status();
534  }
535
536  return BufferHubQueue::RemoveBuffer(slot);
537}
538
539Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
540    int timeout, size_t* slot, LocalHandle* release_fence) {
541  DvrNativeBufferMetadata canonical_meta;
542  return Dequeue(timeout, slot, &canonical_meta, release_fence);
543}
544
545pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
546    int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
547    pdx::LocalHandle* release_fence) {
548  ATRACE_NAME("ProducerQueue::Dequeue");
549  if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
550    ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
551    return ErrorStatus(EINVAL);
552  }
553
554  auto status = BufferHubQueue::Dequeue(timeout, slot);
555  if (!status)
556    return status.error_status();
557
558  auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
559  const int ret = buffer->GainAsync(out_meta, release_fence);
560  if (ret < 0 && ret != -EALREADY)
561    return ErrorStatus(-ret);
562
563  return {std::move(buffer)};
564}
565
566pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
567  if (capacity() != 0) {
568    ALOGE(
569        "ProducerQueue::TakeAsParcelable: producer queue can only be taken out"
570        " as a parcelable when empty. Current queue capacity: %zu",
571        capacity());
572    return ErrorStatus(EINVAL);
573  }
574
575  std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
576  ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
577
578  // Here the queue parcelable is returned and holds the underlying system
579  // resources backing the queue; while the original client channel of this
580  // producer queue is destroyed in place so that this client can no longer
581  // provide producer operations.
582  return {std::move(queue_parcelable)};
583}
584
585/*static */
586std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
587    LocalChannelHandle handle) {
588  return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
589}
590
591ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
592    : BufferHubQueue(std::move(handle)) {
593  auto status = ImportQueue();
594  if (!status) {
595    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
596          status.GetErrorMessage().c_str());
597    Close(-status.error());
598  }
599
600  auto import_status = ImportBuffers();
601  if (import_status) {
602    ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
603          import_status.get());
604  } else {
605    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
606          import_status.GetErrorMessage().c_str());
607  }
608}
609
610Status<size_t> ConsumerQueue::ImportBuffers() {
611  auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
612  if (!status) {
613    if (status.error() == EBADR) {
614      ALOGI(
615          "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
616          "imported.");
617      return {0};
618    } else {
619      ALOGE(
620          "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
621          status.GetErrorMessage().c_str());
622      return status.error_status();
623    }
624  }
625
626  int ret;
627  Status<void> last_error;
628  size_t imported_buffers_count = 0;
629
630  auto buffer_handle_slots = status.take();
631  for (auto& buffer_handle_slot : buffer_handle_slots) {
632    ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
633             buffer_handle_slot.first.value());
634
635    std::unique_ptr<BufferConsumer> buffer_consumer =
636        BufferConsumer::Import(std::move(buffer_handle_slot.first));
637    if (!buffer_consumer) {
638      ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
639            buffer_handle_slot.second);
640      last_error = ErrorStatus(EPIPE);
641      continue;
642    }
643
644    auto add_status =
645        AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
646    if (!add_status) {
647      ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
648            add_status.GetErrorMessage().c_str());
649      last_error = add_status;
650    } else {
651      imported_buffers_count++;
652    }
653  }
654
655  if (imported_buffers_count > 0)
656    return {imported_buffers_count};
657  else
658    return last_error.error_status();
659}
660
661Status<void> ConsumerQueue::AddBuffer(
662    const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
663  ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
664           id(), buffer->id(), slot);
665  return BufferHubQueue::AddBuffer(buffer, slot);
666}
667
668Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
669    int timeout, size_t* slot, void* meta, size_t user_metadata_size,
670    LocalHandle* acquire_fence) {
671  if (user_metadata_size != user_metadata_size_) {
672    ALOGE(
673        "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
674        "does not match metadata size (%zu) for the queue.",
675        user_metadata_size, user_metadata_size_);
676    return ErrorStatus(EINVAL);
677  }
678
679  DvrNativeBufferMetadata canonical_meta;
680  auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
681  if (!status)
682    return status.error_status();
683
684  if (meta && user_metadata_size) {
685    void* metadata_src =
686        reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
687    if (metadata_src) {
688      memcpy(meta, metadata_src, user_metadata_size);
689    } else {
690      ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
691    }
692  }
693
694  return status;
695}
696
697Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
698    int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
699    pdx::LocalHandle* acquire_fence) {
700  ATRACE_NAME("ConsumerQueue::Dequeue");
701  if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
702    ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
703    return ErrorStatus(EINVAL);
704  }
705
706  auto status = BufferHubQueue::Dequeue(timeout, slot);
707  if (!status)
708    return status.error_status();
709
710  auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
711  const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
712  if (ret < 0)
713    return ErrorStatus(-ret);
714
715  return {std::move(buffer)};
716}
717
718Status<void> ConsumerQueue::OnBufferAllocated() {
719  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
720
721  auto status = ImportBuffers();
722  if (!status) {
723    ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
724          status.GetErrorMessage().c_str());
725    return ErrorStatus(status.error());
726  } else if (status.get() == 0) {
727    ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
728    return ErrorStatus(ENOBUFS);
729  } else {
730    ALOGD_IF(TRACE,
731             "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
732             status.get());
733    return {};
734  }
735}
736
737}  // namespace dvr
738}  // namespace android
739