1#include "include/private/dvr/buffer_hub_queue_client.h" 2 3#include <inttypes.h> 4#include <log/log.h> 5#include <poll.h> 6#include <sys/epoll.h> 7 8#include <array> 9 10#include <pdx/default_transport/client_channel.h> 11#include <pdx/default_transport/client_channel_factory.h> 12#include <pdx/file_handle.h> 13#include <pdx/trace.h> 14 15#define RETRY_EINTR(fnc_call) \ 16 ([&]() -> decltype(fnc_call) { \ 17 decltype(fnc_call) result; \ 18 do { \ 19 result = (fnc_call); \ 20 } while (result == -1 && errno == EINTR); \ 21 return result; \ 22 })() 23 24using android::pdx::ErrorStatus; 25using android::pdx::LocalChannelHandle; 26using android::pdx::LocalHandle; 27using android::pdx::Status; 28 29namespace android { 30namespace dvr { 31 32namespace { 33 34std::pair<int32_t, int32_t> Unstuff(uint64_t value) { 35 return {static_cast<int32_t>(value >> 32), 36 static_cast<int32_t>(value & ((1ull << 32) - 1))}; 37} 38 39uint64_t Stuff(int32_t a, int32_t b) { 40 const uint32_t ua = static_cast<uint32_t>(a); 41 const uint32_t ub = static_cast<uint32_t>(b); 42 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub); 43} 44 45} // anonymous namespace 46 47BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle) 48 : Client{pdx::default_transport::ClientChannel::Create( 49 std::move(channel_handle))} { 50 Initialize(); 51} 52 53BufferHubQueue::BufferHubQueue(const std::string& endpoint_path) 54 : Client{ 55 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} { 56 Initialize(); 57} 58 59void BufferHubQueue::Initialize() { 60 int ret = epoll_fd_.Create(); 61 if (ret < 0) { 62 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s", 63 strerror(-ret)); 64 return; 65 } 66 67 epoll_event event = { 68 .events = EPOLLIN | EPOLLET, 69 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}}; 70 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); 71 if (ret < 0) { 72 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s", 73 strerror(-ret)); 74 } 75} 76 77Status<void> BufferHubQueue::ImportQueue() { 78 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>(); 79 if (!status) { 80 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s", 81 status.GetErrorMessage().c_str()); 82 return ErrorStatus(status.error()); 83 } else { 84 SetupQueue(status.get()); 85 return {}; 86 } 87} 88 89void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) { 90 is_async_ = queue_info.producer_config.is_async; 91 default_width_ = queue_info.producer_config.default_width; 92 default_height_ = queue_info.producer_config.default_height; 93 default_format_ = queue_info.producer_config.default_format; 94 user_metadata_size_ = queue_info.producer_config.user_metadata_size; 95 id_ = queue_info.id; 96} 97 98std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() { 99 if (auto status = CreateConsumerQueueHandle(/*silent*/ false)) 100 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 101 else 102 return nullptr; 103} 104 105std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() { 106 if (auto status = CreateConsumerQueueHandle(/*silent*/ true)) 107 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 108 else 109 return nullptr; 110} 111 112Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle( 113 bool silent) { 114 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent); 115 if (!status) { 116 ALOGE( 117 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: " 118 "%s", 119 status.GetErrorMessage().c_str()); 120 return ErrorStatus(status.error()); 121 } 122 123 return status; 124} 125 126pdx::Status<ConsumerQueueParcelable> 127BufferHubQueue::CreateConsumerQueueParcelable(bool silent) { 128 auto status = CreateConsumerQueueHandle(silent); 129 if (!status) 130 return status.error_status(); 131 132 // A temporary consumer queue client to pull its channel parcelable. 133 auto consumer_queue = 134 std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 135 ConsumerQueueParcelable queue_parcelable( 136 consumer_queue->GetChannel()->TakeChannelParcelable()); 137 138 if (!queue_parcelable.IsValid()) { 139 ALOGE( 140 "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create " 141 "consumer queue parcelable."); 142 return ErrorStatus(EINVAL); 143 } 144 145 return {std::move(queue_parcelable)}; 146} 147 148bool BufferHubQueue::WaitForBuffers(int timeout) { 149 ATRACE_NAME("BufferHubQueue::WaitForBuffers"); 150 std::array<epoll_event, kMaxEvents> events; 151 152 // Loop at least once to check for hangups. 153 do { 154 ALOGD_IF( 155 TRACE, 156 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu", 157 id(), count(), capacity()); 158 159 // If there is already a buffer then just check for hangup without waiting. 160 const int ret = epoll_fd_.Wait(events.data(), events.size(), 161 count() == 0 ? timeout : 0); 162 163 if (ret == 0) { 164 ALOGI_IF(TRACE, 165 "BufferHubQueue::WaitForBuffers: No events before timeout: " 166 "queue_id=%d", 167 id()); 168 return count() != 0; 169 } 170 171 if (ret < 0 && ret != -EINTR) { 172 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s", 173 strerror(-ret)); 174 return false; 175 } 176 177 const int num_events = ret; 178 179 // A BufferQueue's epoll fd tracks N+1 events, where there are N events, 180 // one for each buffer in the queue, and one extra event for the queue 181 // client itself. 182 for (int i = 0; i < num_events; i++) { 183 int32_t event_fd; 184 int32_t index; 185 std::tie(event_fd, index) = Unstuff(events[i].data.u64); 186 187 PDX_TRACE_FORMAT( 188 "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;" 189 "slot=%d|", 190 id(), num_events, i, event_fd, index); 191 192 ALOGD_IF(TRACE, 193 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d", 194 i, event_fd, index); 195 196 if (is_buffer_event_index(index)) { 197 HandleBufferEvent(static_cast<size_t>(index), event_fd, 198 events[i].events); 199 } else if (is_queue_event_index(index)) { 200 HandleQueueEvent(events[i].events); 201 } else { 202 ALOGW( 203 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d " 204 "index=%d", 205 event_fd, index); 206 } 207 } 208 } while (count() == 0 && capacity() > 0 && !hung_up()); 209 210 return count() != 0; 211} 212 213Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd, 214 int poll_events) { 215 ATRACE_NAME("BufferHubQueue::HandleBufferEvent"); 216 if (!buffers_[slot]) { 217 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot); 218 return ErrorStatus(ENOENT); 219 } 220 221 auto status = buffers_[slot]->GetEventMask(poll_events); 222 if (!status) { 223 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s", 224 status.GetErrorMessage().c_str()); 225 return status.error_status(); 226 } 227 228 const int events = status.get(); 229 PDX_TRACE_FORMAT( 230 "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;" 231 "events=%d|", 232 id(), buffers_[slot]->id(), slot, event_fd, poll_events, events); 233 234 if (events & EPOLLIN) { 235 return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()}); 236 } else if (events & EPOLLHUP) { 237 ALOGW( 238 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu " 239 "event_fd=%d buffer_id=%d", 240 slot, buffers_[slot]->event_fd(), buffers_[slot]->id()); 241 return RemoveBuffer(slot); 242 } else { 243 ALOGW( 244 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll " 245 "events=%d", 246 slot, events); 247 } 248 249 return {}; 250} 251 252Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { 253 ATRACE_NAME("BufferHubQueue::HandleQueueEvent"); 254 auto status = GetEventMask(poll_event); 255 if (!status) { 256 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s", 257 status.GetErrorMessage().c_str()); 258 return status.error_status(); 259 } 260 261 const int events = status.get(); 262 if (events & EPOLLIN) { 263 // Note that after buffer imports, if |count()| still returns 0, epoll 264 // wait will be tried again to acquire the newly imported buffer. 265 auto buffer_status = OnBufferAllocated(); 266 if (!buffer_status) { 267 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s", 268 buffer_status.GetErrorMessage().c_str()); 269 } 270 } else if (events & EPOLLHUP) { 271 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!"); 272 hung_up_ = true; 273 } else { 274 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events); 275 } 276 277 return {}; 278} 279 280Status<void> BufferHubQueue::AddBuffer( 281 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) { 282 ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu", 283 buffer->id(), slot); 284 285 if (is_full()) { 286 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu", 287 capacity_); 288 return ErrorStatus(E2BIG); 289 } 290 291 if (buffers_[slot]) { 292 // Replace the buffer if the slot is occupied. This could happen when the 293 // producer side replaced the slot with a newly allocated buffer. Remove the 294 // buffer before setting up with the new one. 295 auto remove_status = RemoveBuffer(slot); 296 if (!remove_status) 297 return remove_status.error_status(); 298 } 299 300 for (const auto& event_source : buffer->GetEventSources()) { 301 epoll_event event = {.events = event_source.event_mask | EPOLLET, 302 .data = {.u64 = Stuff(buffer->event_fd(), slot)}}; 303 const int ret = 304 epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); 305 if (ret < 0) { 306 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", 307 strerror(-ret)); 308 return ErrorStatus(-ret); 309 } 310 } 311 312 buffers_[slot] = buffer; 313 capacity_++; 314 return {}; 315} 316 317Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { 318 ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot); 319 320 if (buffers_[slot]) { 321 for (const auto& event_source : buffers_[slot]->GetEventSources()) { 322 const int ret = 323 epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); 324 if (ret < 0) { 325 ALOGE( 326 "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " 327 "set: %s", 328 strerror(-ret)); 329 return ErrorStatus(-ret); 330 } 331 } 332 333 // Trigger OnBufferRemoved callback if registered. 334 if (on_buffer_removed_) 335 on_buffer_removed_(buffers_[slot]); 336 337 buffers_[slot] = nullptr; 338 capacity_--; 339 } 340 341 return {}; 342} 343 344Status<void> BufferHubQueue::Enqueue(Entry entry) { 345 if (!is_full()) { 346 available_buffers_.push(std::move(entry)); 347 348 // Trigger OnBufferAvailable callback if registered. 349 if (on_buffer_available_) 350 on_buffer_available_(); 351 352 return {}; 353 } else { 354 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!"); 355 return ErrorStatus(E2BIG); 356 } 357} 358 359Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout, 360 size_t* slot) { 361 ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(), 362 timeout); 363 364 PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count()); 365 366 if (count() == 0) { 367 if (!WaitForBuffers(timeout)) 368 return ErrorStatus(ETIMEDOUT); 369 } 370 371 auto& entry = available_buffers_.top(); 372 PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(), 373 entry.slot); 374 375 std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer); 376 *slot = entry.slot; 377 378 available_buffers_.pop(); 379 380 return {std::move(buffer)}; 381} 382 383void BufferHubQueue::SetBufferAvailableCallback( 384 BufferAvailableCallback callback) { 385 on_buffer_available_ = callback; 386} 387 388void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) { 389 on_buffer_removed_ = callback; 390} 391 392pdx::Status<void> BufferHubQueue::FreeAllBuffers() { 393 // Clear all available buffers. 394 while (!available_buffers_.empty()) 395 available_buffers_.pop(); 396 397 pdx::Status<void> last_error; // No error. 398 // Clear all buffers this producer queue is tracking. 399 for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) { 400 if (buffers_[slot] != nullptr) { 401 auto status = RemoveBuffer(slot); 402 if (!status) { 403 ALOGE( 404 "ProducerQueue::FreeAllBuffers: Failed to remove buffer at " 405 "slot=%zu.", 406 slot); 407 last_error = status.error_status(); 408 } 409 } 410 } 411 412 return last_error; 413} 414 415ProducerQueue::ProducerQueue(LocalChannelHandle handle) 416 : BASE(std::move(handle)) { 417 auto status = ImportQueue(); 418 if (!status) { 419 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s", 420 status.GetErrorMessage().c_str()); 421 Close(-status.error()); 422 } 423} 424 425ProducerQueue::ProducerQueue(const ProducerQueueConfig& config, 426 const UsagePolicy& usage) 427 : BASE(BufferHubRPC::kClientPath) { 428 auto status = 429 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage); 430 if (!status) { 431 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s", 432 status.GetErrorMessage().c_str()); 433 Close(-status.error()); 434 return; 435 } 436 437 SetupQueue(status.get()); 438} 439 440Status<std::vector<size_t>> ProducerQueue::AllocateBuffers( 441 uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, 442 uint64_t usage, size_t buffer_count) { 443 if (capacity() + buffer_count > kMaxQueueCapacity) { 444 ALOGE( 445 "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot " 446 "allocate %zu more buffer(s).", 447 capacity(), buffer_count); 448 return ErrorStatus(E2BIG); 449 } 450 451 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status = 452 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>( 453 width, height, layer_count, format, usage, buffer_count); 454 if (!status) { 455 ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s", 456 status.GetErrorMessage().c_str()); 457 return status.error_status(); 458 } 459 460 auto buffer_handle_slots = status.take(); 461 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count, 462 "BufferHubRPC::ProducerQueueAllocateBuffers should " 463 "return %zu buffer handle(s), but returned %zu instead.", 464 buffer_count, buffer_handle_slots.size()); 465 466 std::vector<size_t> buffer_slots; 467 buffer_slots.reserve(buffer_count); 468 469 // Bookkeeping for each buffer. 470 for (auto& hs : buffer_handle_slots) { 471 auto& buffer_handle = hs.first; 472 size_t buffer_slot = hs.second; 473 474 // Note that import might (though very unlikely) fail. If so, buffer_handle 475 // will be closed and included in returned buffer_slots. 476 if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)), 477 buffer_slot)) { 478 ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu", 479 buffer_slot); 480 buffer_slots.push_back(buffer_slot); 481 } 482 } 483 484 if (buffer_slots.size() == 0) { 485 // Error out if no buffer is allocated and improted. 486 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated."); 487 ErrorStatus(ENOMEM); 488 } 489 490 return {std::move(buffer_slots)}; 491} 492 493Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height, 494 uint32_t layer_count, 495 uint32_t format, uint64_t usage) { 496 // We only allocate one buffer at a time. 497 constexpr size_t buffer_count = 1; 498 auto status = 499 AllocateBuffers(width, height, layer_count, format, usage, buffer_count); 500 if (!status) { 501 ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s", 502 status.GetErrorMessage().c_str()); 503 return status.error_status(); 504 } 505 506 if (status.get().size() == 0) { 507 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated."); 508 ErrorStatus(ENOMEM); 509 } 510 511 return {status.get()[0]}; 512} 513 514Status<void> ProducerQueue::AddBuffer( 515 const std::shared_ptr<BufferProducer>& buffer, size_t slot) { 516 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", 517 id(), buffer->id(), slot); 518 // For producer buffer, we need to enqueue the newly added buffer 519 // immediately. Producer queue starts with all buffers in available state. 520 auto status = BufferHubQueue::AddBuffer(buffer, slot); 521 if (!status) 522 return status; 523 524 return BufferHubQueue::Enqueue({buffer, slot, 0ULL}); 525} 526 527Status<void> ProducerQueue::RemoveBuffer(size_t slot) { 528 auto status = 529 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot); 530 if (!status) { 531 ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s", 532 status.GetErrorMessage().c_str()); 533 return status.error_status(); 534 } 535 536 return BufferHubQueue::RemoveBuffer(slot); 537} 538 539Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( 540 int timeout, size_t* slot, LocalHandle* release_fence) { 541 DvrNativeBufferMetadata canonical_meta; 542 return Dequeue(timeout, slot, &canonical_meta, release_fence); 543} 544 545pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( 546 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, 547 pdx::LocalHandle* release_fence) { 548 ATRACE_NAME("ProducerQueue::Dequeue"); 549 if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { 550 ALOGE("ProducerQueue::Dequeue: Invalid parameter."); 551 return ErrorStatus(EINVAL); 552 } 553 554 auto status = BufferHubQueue::Dequeue(timeout, slot); 555 if (!status) 556 return status.error_status(); 557 558 auto buffer = std::static_pointer_cast<BufferProducer>(status.take()); 559 const int ret = buffer->GainAsync(out_meta, release_fence); 560 if (ret < 0 && ret != -EALREADY) 561 return ErrorStatus(-ret); 562 563 return {std::move(buffer)}; 564} 565 566pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() { 567 if (capacity() != 0) { 568 ALOGE( 569 "ProducerQueue::TakeAsParcelable: producer queue can only be taken out" 570 " as a parcelable when empty. Current queue capacity: %zu", 571 capacity()); 572 return ErrorStatus(EINVAL); 573 } 574 575 std::unique_ptr<pdx::ClientChannel> channel = TakeChannel(); 576 ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable()); 577 578 // Here the queue parcelable is returned and holds the underlying system 579 // resources backing the queue; while the original client channel of this 580 // producer queue is destroyed in place so that this client can no longer 581 // provide producer operations. 582 return {std::move(queue_parcelable)}; 583} 584 585/*static */ 586std::unique_ptr<ConsumerQueue> ConsumerQueue::Import( 587 LocalChannelHandle handle) { 588 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle))); 589} 590 591ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) 592 : BufferHubQueue(std::move(handle)) { 593 auto status = ImportQueue(); 594 if (!status) { 595 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s", 596 status.GetErrorMessage().c_str()); 597 Close(-status.error()); 598 } 599 600 auto import_status = ImportBuffers(); 601 if (import_status) { 602 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.", 603 import_status.get()); 604 } else { 605 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s", 606 import_status.GetErrorMessage().c_str()); 607 } 608} 609 610Status<size_t> ConsumerQueue::ImportBuffers() { 611 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); 612 if (!status) { 613 if (status.error() == EBADR) { 614 ALOGI( 615 "ConsumerQueue::ImportBuffers: Queue is silent, no buffers " 616 "imported."); 617 return {0}; 618 } else { 619 ALOGE( 620 "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", 621 status.GetErrorMessage().c_str()); 622 return status.error_status(); 623 } 624 } 625 626 int ret; 627 Status<void> last_error; 628 size_t imported_buffers_count = 0; 629 630 auto buffer_handle_slots = status.take(); 631 for (auto& buffer_handle_slot : buffer_handle_slots) { 632 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d", 633 buffer_handle_slot.first.value()); 634 635 std::unique_ptr<BufferConsumer> buffer_consumer = 636 BufferConsumer::Import(std::move(buffer_handle_slot.first)); 637 if (!buffer_consumer) { 638 ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu", 639 buffer_handle_slot.second); 640 last_error = ErrorStatus(EPIPE); 641 continue; 642 } 643 644 auto add_status = 645 AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); 646 if (!add_status) { 647 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s", 648 add_status.GetErrorMessage().c_str()); 649 last_error = add_status; 650 } else { 651 imported_buffers_count++; 652 } 653 } 654 655 if (imported_buffers_count > 0) 656 return {imported_buffers_count}; 657 else 658 return last_error.error_status(); 659} 660 661Status<void> ConsumerQueue::AddBuffer( 662 const std::shared_ptr<BufferConsumer>& buffer, size_t slot) { 663 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", 664 id(), buffer->id(), slot); 665 return BufferHubQueue::AddBuffer(buffer, slot); 666} 667 668Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( 669 int timeout, size_t* slot, void* meta, size_t user_metadata_size, 670 LocalHandle* acquire_fence) { 671 if (user_metadata_size != user_metadata_size_) { 672 ALOGE( 673 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer " 674 "does not match metadata size (%zu) for the queue.", 675 user_metadata_size, user_metadata_size_); 676 return ErrorStatus(EINVAL); 677 } 678 679 DvrNativeBufferMetadata canonical_meta; 680 auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence); 681 if (!status) 682 return status.error_status(); 683 684 if (meta && user_metadata_size) { 685 void* metadata_src = 686 reinterpret_cast<void*>(canonical_meta.user_metadata_ptr); 687 if (metadata_src) { 688 memcpy(meta, metadata_src, user_metadata_size); 689 } else { 690 ALOGW("ConsumerQueue::Dequeue: no user-defined metadata."); 691 } 692 } 693 694 return status; 695} 696 697Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( 698 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, 699 pdx::LocalHandle* acquire_fence) { 700 ATRACE_NAME("ConsumerQueue::Dequeue"); 701 if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { 702 ALOGE("ConsumerQueue::Dequeue: Invalid parameter."); 703 return ErrorStatus(EINVAL); 704 } 705 706 auto status = BufferHubQueue::Dequeue(timeout, slot); 707 if (!status) 708 return status.error_status(); 709 710 auto buffer = std::static_pointer_cast<BufferConsumer>(status.take()); 711 const int ret = buffer->AcquireAsync(out_meta, acquire_fence); 712 if (ret < 0) 713 return ErrorStatus(-ret); 714 715 return {std::move(buffer)}; 716} 717 718Status<void> ConsumerQueue::OnBufferAllocated() { 719 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id()); 720 721 auto status = ImportBuffers(); 722 if (!status) { 723 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s", 724 status.GetErrorMessage().c_str()); 725 return ErrorStatus(status.error()); 726 } else if (status.get() == 0) { 727 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!"); 728 return ErrorStatus(ENOBUFS); 729 } else { 730 ALOGD_IF(TRACE, 731 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.", 732 status.get()); 733 return {}; 734 } 735} 736 737} // namespace dvr 738} // namespace android 739