123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823 |
- #include "include/private/dvr/buffer_hub_queue_client.h"
- #include <inttypes.h>
- #include <log/log.h>
- #include <poll.h>
- #include <sys/epoll.h>
- #include <array>
- #include <pdx/default_transport/client_channel.h>
- #include <pdx/default_transport/client_channel_factory.h>
- #include <pdx/file_handle.h>
- #include <pdx/trace.h>
- #define RETRY_EINTR(fnc_call) \
- ([&]() -> decltype(fnc_call) { \
- decltype(fnc_call) result; \
- do { \
- result = (fnc_call); \
- } while (result == -1 && errno == EINTR); \
- return result; \
- })()
- using android::pdx::ErrorStatus;
- using android::pdx::LocalChannelHandle;
- using android::pdx::LocalHandle;
- using android::pdx::Status;
- namespace android {
- namespace dvr {
- namespace {
- std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
- return {static_cast<int32_t>(value >> 32),
- static_cast<int32_t>(value & ((1ull << 32) - 1))};
- }
- uint64_t Stuff(int32_t a, int32_t b) {
- const uint32_t ua = static_cast<uint32_t>(a);
- const uint32_t ub = static_cast<uint32_t>(b);
- return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
- }
- } // anonymous namespace
- BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
- : Client{pdx::default_transport::ClientChannel::Create(
- std::move(channel_handle))} {
- Initialize();
- }
- BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
- : Client{
- pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
- Initialize();
- }
- void BufferHubQueue::Initialize() {
- int ret = epoll_fd_.Create();
- if (ret < 0) {
- ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
- strerror(-ret));
- return;
- }
- epoll_event event = {
- .events = EPOLLIN | EPOLLET,
- .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
- ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
- if (ret < 0) {
- ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
- strerror(-ret));
- }
- }
- Status<void> BufferHubQueue::ImportQueue() {
- auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
- if (!status) {
- ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
- status.GetErrorMessage().c_str());
- return ErrorStatus(status.error());
- } else {
- SetupQueue(status.get());
- return {};
- }
- }
- void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
- is_async_ = queue_info.producer_config.is_async;
- default_width_ = queue_info.producer_config.default_width;
- default_height_ = queue_info.producer_config.default_height;
- default_format_ = queue_info.producer_config.default_format;
- user_metadata_size_ = queue_info.producer_config.user_metadata_size;
- id_ = queue_info.id;
- }
- std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
- if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
- return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
- else
- return nullptr;
- }
- std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
- if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
- return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
- else
- return nullptr;
- }
- Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
- bool silent) {
- auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
- if (!status) {
- ALOGE(
- "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
- "%s",
- status.GetErrorMessage().c_str());
- return ErrorStatus(status.error());
- }
- return status;
- }
- pdx::Status<ConsumerQueueParcelable>
- BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
- auto status = CreateConsumerQueueHandle(silent);
- if (!status)
- return status.error_status();
- // A temporary consumer queue client to pull its channel parcelable.
- auto consumer_queue =
- std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
- ConsumerQueueParcelable queue_parcelable(
- consumer_queue->GetChannel()->TakeChannelParcelable());
- if (!queue_parcelable.IsValid()) {
- ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
- return ErrorStatus(EINVAL);
- }
- return {std::move(queue_parcelable)};
- }
- bool BufferHubQueue::WaitForBuffers(int timeout) {
- ATRACE_NAME("BufferHubQueue::WaitForBuffers");
- std::array<epoll_event, kMaxEvents> events;
- // Loop at least once to check for hangups.
- do {
- ALOGD_IF(
- TRACE,
- "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
- id(), count(), capacity());
- // If there is already a buffer then just check for hangup without waiting.
- const int ret = epoll_fd_.Wait(events.data(), events.size(),
- count() == 0 ? timeout : 0);
- if (ret == 0) {
- ALOGI_IF(TRACE,
- "BufferHubQueue::WaitForBuffers: No events before timeout: "
- "queue_id=%d",
- id());
- return count() != 0;
- }
- if (ret < 0 && ret != -EINTR) {
- ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
- return false;
- }
- const int num_events = ret;
- // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
- // one for each buffer in the queue, and one extra event for the queue
- // client itself.
- for (int i = 0; i < num_events; i++) {
- int32_t event_fd;
- int32_t index;
- std::tie(event_fd, index) = Unstuff(events[i].data.u64);
- PDX_TRACE_FORMAT(
- "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
- "slot=%d|",
- id(), num_events, i, event_fd, index);
- ALOGD_IF(TRACE,
- "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
- i, event_fd, index);
- if (is_buffer_event_index(index)) {
- HandleBufferEvent(static_cast<size_t>(index), event_fd,
- events[i].events);
- } else if (is_queue_event_index(index)) {
- HandleQueueEvent(events[i].events);
- } else {
- ALOGW(
- "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
- "index=%d",
- event_fd, index);
- }
- }
- } while (count() == 0 && capacity() > 0 && !hung_up());
- return count() != 0;
- }
- Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
- int poll_events) {
- ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
- if (!buffers_[slot]) {
- ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
- return ErrorStatus(ENOENT);
- }
- auto status = buffers_[slot]->GetEventMask(poll_events);
- if (!status) {
- ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- const int events = status.get();
- PDX_TRACE_FORMAT(
- "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
- "events=%d|",
- id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
- if (events & EPOLLIN) {
- return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
- } else if (events & EPOLLHUP) {
- ALOGW(
- "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
- "event_fd=%d buffer_id=%d",
- slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
- return RemoveBuffer(slot);
- } else {
- ALOGW(
- "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
- "events=%d",
- slot, events);
- }
- return {};
- }
- Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
- ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
- auto status = GetEventMask(poll_event);
- if (!status) {
- ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- const int events = status.get();
- if (events & EPOLLIN) {
- // Note that after buffer imports, if |count()| still returns 0, epoll
- // wait will be tried again to acquire the newly imported buffer.
- auto buffer_status = OnBufferAllocated();
- if (!buffer_status) {
- ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
- buffer_status.GetErrorMessage().c_str());
- }
- } else if (events & EPOLLHUP) {
- ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
- hung_up_ = true;
- } else {
- ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
- }
- return {};
- }
- Status<void> BufferHubQueue::AddBuffer(
- const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
- ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
- slot);
- if (is_full()) {
- ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
- return ErrorStatus(E2BIG);
- }
- if (buffers_[slot]) {
- // Replace the buffer if the slot is occupied. This could happen when the
- // producer side replaced the slot with a newly allocated buffer. Remove the
- // buffer before setting up with the new one.
- auto remove_status = RemoveBuffer(slot);
- if (!remove_status)
- return remove_status.error_status();
- }
- for (const auto& event_source : buffer->GetEventSources()) {
- epoll_event event = {.events = event_source.event_mask | EPOLLET,
- .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
- const int ret =
- epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
- if (ret < 0) {
- ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
- strerror(-ret));
- return ErrorStatus(-ret);
- }
- }
- buffers_[slot] = buffer;
- capacity_++;
- return {};
- }
- Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
- ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);
- if (buffers_[slot]) {
- for (const auto& event_source : buffers_[slot]->GetEventSources()) {
- const int ret =
- epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
- if (ret < 0) {
- ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
- strerror(-ret));
- return ErrorStatus(-ret);
- }
- }
- // Trigger OnBufferRemoved callback if registered.
- if (on_buffer_removed_)
- on_buffer_removed_(buffers_[slot]);
- buffers_[slot] = nullptr;
- capacity_--;
- }
- return {};
- }
- Status<void> BufferHubQueue::Enqueue(Entry entry) {
- if (!is_full()) {
- // Find and remove the enqueued buffer from unavailable_buffers_slot if
- // exist.
- auto enqueued_buffer_iter = std::find_if(
- unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
- [&entry](size_t slot) -> bool { return slot == entry.slot; });
- if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
- unavailable_buffers_slot_.erase(enqueued_buffer_iter);
- }
- available_buffers_.push(std::move(entry));
- // Trigger OnBufferAvailable callback if registered.
- if (on_buffer_available_)
- on_buffer_available_();
- return {};
- } else {
- ALOGE("%s: Buffer queue is full!", __FUNCTION__);
- return ErrorStatus(E2BIG);
- }
- }
- Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
- size_t* slot) {
- ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);
- PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());
- if (count() == 0) {
- if (!WaitForBuffers(timeout))
- return ErrorStatus(ETIMEDOUT);
- }
- auto& entry = available_buffers_.top();
- PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
- entry.slot);
- std::shared_ptr<BufferHubBase> buffer = std::move(entry.buffer);
- *slot = entry.slot;
- available_buffers_.pop();
- unavailable_buffers_slot_.push_back(*slot);
- return {std::move(buffer)};
- }
- void BufferHubQueue::SetBufferAvailableCallback(
- BufferAvailableCallback callback) {
- on_buffer_available_ = callback;
- }
- void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
- on_buffer_removed_ = callback;
- }
- pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
- // Clear all available buffers.
- while (!available_buffers_.empty())
- available_buffers_.pop();
- pdx::Status<void> last_error; // No error.
- // Clear all buffers this producer queue is tracking.
- for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
- if (buffers_[slot] != nullptr) {
- auto status = RemoveBuffer(slot);
- if (!status) {
- ALOGE(
- "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
- "slot=%zu.",
- slot);
- last_error = status.error_status();
- }
- }
- }
- return last_error;
- }
- ProducerQueue::ProducerQueue(LocalChannelHandle handle)
- : BASE(std::move(handle)) {
- auto status = ImportQueue();
- if (!status) {
- ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
- status.GetErrorMessage().c_str());
- Close(-status.error());
- }
- }
- ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
- const UsagePolicy& usage)
- : BASE(BufferHubRPC::kClientPath) {
- auto status =
- InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
- if (!status) {
- ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
- status.GetErrorMessage().c_str());
- Close(-status.error());
- return;
- }
- SetupQueue(status.get());
- }
- Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
- uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
- uint64_t usage, size_t buffer_count) {
- if (buffer_count == 0) {
- return {std::vector<size_t>()};
- }
- if (capacity() + buffer_count > kMaxQueueCapacity) {
- ALOGE(
- "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
- "allocate %zu more buffer(s).",
- capacity(), buffer_count);
- return ErrorStatus(E2BIG);
- }
- Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
- InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
- width, height, layer_count, format, usage, buffer_count);
- if (!status) {
- ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- auto buffer_handle_slots = status.take();
- LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
- "BufferHubRPC::ProducerQueueAllocateBuffers should "
- "return %zu buffer handle(s), but returned %zu instead.",
- buffer_count, buffer_handle_slots.size());
- std::vector<size_t> buffer_slots;
- buffer_slots.reserve(buffer_count);
- // Bookkeeping for each buffer.
- for (auto& hs : buffer_handle_slots) {
- auto& buffer_handle = hs.first;
- size_t buffer_slot = hs.second;
- // Note that import might (though very unlikely) fail. If so, buffer_handle
- // will be closed and included in returned buffer_slots.
- if (AddBuffer(ProducerBuffer::Import(std::move(buffer_handle)),
- buffer_slot)) {
- ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
- buffer_slot);
- buffer_slots.push_back(buffer_slot);
- }
- }
- if (buffer_slots.size() != buffer_count) {
- // Error out if the count of imported buffer(s) is not correct.
- ALOGE(
- "ProducerQueue::AllocateBuffers: requested to import %zu "
- "buffers, but actually imported %zu buffers.",
- buffer_count, buffer_slots.size());
- return ErrorStatus(ENOMEM);
- }
- return {std::move(buffer_slots)};
- }
- Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
- uint32_t layer_count,
- uint32_t format, uint64_t usage) {
- // We only allocate one buffer at a time.
- constexpr size_t buffer_count = 1;
- auto status =
- AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
- if (!status) {
- ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- return {status.get()[0]};
- }
- Status<void> ProducerQueue::AddBuffer(
- const std::shared_ptr<ProducerBuffer>& buffer, size_t slot) {
- ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
- id(), buffer->id(), slot);
- // For producer buffer, we need to enqueue the newly added buffer
- // immediately. Producer queue starts with all buffers in available state.
- auto status = BufferHubQueue::AddBuffer(buffer, slot);
- if (!status)
- return status;
- return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
- }
- Status<size_t> ProducerQueue::InsertBuffer(
- const std::shared_ptr<ProducerBuffer>& buffer) {
- if (buffer == nullptr ||
- !BufferHubDefs::isClientGained(buffer->buffer_state(),
- buffer->client_state_mask())) {
- ALOGE(
- "ProducerQueue::InsertBuffer: Can only insert a buffer when it's in "
- "gained state.");
- return ErrorStatus(EINVAL);
- }
- auto status_or_slot =
- InvokeRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
- buffer->cid());
- if (!status_or_slot) {
- ALOGE(
- "ProducerQueue::InsertBuffer: Failed to insert producer buffer: "
- "buffer_cid=%d, error: %s.",
- buffer->cid(), status_or_slot.GetErrorMessage().c_str());
- return status_or_slot.error_status();
- }
- size_t slot = status_or_slot.get();
- // Note that we are calling AddBuffer() from the base class to explicitly
- // avoid Enqueue() the ProducerBuffer.
- auto status = BufferHubQueue::AddBuffer(buffer, slot);
- if (!status) {
- ALOGE("ProducerQueue::InsertBuffer: Failed to add buffer: %s.",
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- return {slot};
- }
- Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
- auto status =
- InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
- if (!status) {
- ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- return BufferHubQueue::RemoveBuffer(slot);
- }
- Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
- int timeout, size_t* slot, LocalHandle* release_fence) {
- DvrNativeBufferMetadata canonical_meta;
- return Dequeue(timeout, slot, &canonical_meta, release_fence);
- }
- pdx::Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
- int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
- pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
- ATRACE_NAME("ProducerQueue::Dequeue");
- if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
- ALOGE("%s: Invalid parameter.", __FUNCTION__);
- return ErrorStatus(EINVAL);
- }
- std::shared_ptr<ProducerBuffer> buffer;
- Status<std::shared_ptr<BufferHubBase>> dequeue_status =
- BufferHubQueue::Dequeue(timeout, slot);
- if (dequeue_status.ok()) {
- buffer = std::static_pointer_cast<ProducerBuffer>(dequeue_status.take());
- } else {
- if (gain_posted_buffer) {
- Status<std::shared_ptr<ProducerBuffer>> dequeue_unacquired_status =
- ProducerQueue::DequeueUnacquiredBuffer(slot);
- if (!dequeue_unacquired_status.ok()) {
- ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
- dequeue_unacquired_status.error());
- return dequeue_unacquired_status.error_status();
- }
- buffer = dequeue_unacquired_status.take();
- } else {
- return dequeue_status.error_status();
- }
- }
- const int ret =
- buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
- if (ret < 0 && ret != -EALREADY)
- return ErrorStatus(-ret);
- return {std::move(buffer)};
- }
- Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::DequeueUnacquiredBuffer(
- size_t* slot) {
- if (unavailable_buffers_slot_.size() < 1) {
- ALOGE(
- "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
- "acquired state if exist.",
- __FUNCTION__);
- return ErrorStatus(ENOMEM);
- }
- // Find the first buffer that is not in acquired state from
- // unavailable_buffers_slot_.
- for (auto iter = unavailable_buffers_slot_.begin();
- iter != unavailable_buffers_slot_.end(); iter++) {
- std::shared_ptr<ProducerBuffer> buffer = ProducerQueue::GetBuffer(*iter);
- if (buffer == nullptr) {
- ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__,
- static_cast<int>(*slot));
- return ErrorStatus(EIO);
- }
- if (!BufferHubDefs::isAnyClientAcquired(buffer->buffer_state())) {
- *slot = *iter;
- unavailable_buffers_slot_.erase(iter);
- unavailable_buffers_slot_.push_back(*slot);
- ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
- __FUNCTION__, static_cast<int>(*slot));
- return {std::move(buffer)};
- }
- }
- ALOGE(
- "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
- __FUNCTION__);
- return ErrorStatus(EBUSY);
- }
- pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
- if (capacity() != 0) {
- ALOGE(
- "%s: producer queue can only be taken out as a parcelable when empty. "
- "Current queue capacity: %zu",
- __FUNCTION__, capacity());
- return ErrorStatus(EINVAL);
- }
- std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
- ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
- // Here the queue parcelable is returned and holds the underlying system
- // resources backing the queue; while the original client channel of this
- // producer queue is destroyed in place so that this client can no longer
- // provide producer operations.
- return {std::move(queue_parcelable)};
- }
- /*static */
- std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
- LocalChannelHandle handle) {
- return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
- }
- ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
- : BufferHubQueue(std::move(handle)) {
- auto status = ImportQueue();
- if (!status) {
- ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
- status.GetErrorMessage().c_str());
- Close(-status.error());
- }
- auto import_status = ImportBuffers();
- if (import_status) {
- ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
- } else {
- ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
- import_status.GetErrorMessage().c_str());
- }
- }
- Status<size_t> ConsumerQueue::ImportBuffers() {
- auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
- if (!status) {
- if (status.error() == EBADR) {
- ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
- return {0};
- } else {
- ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
- status.GetErrorMessage().c_str());
- return status.error_status();
- }
- }
- int ret;
- Status<void> last_error;
- size_t imported_buffers_count = 0;
- auto buffer_handle_slots = status.take();
- for (auto& buffer_handle_slot : buffer_handle_slots) {
- ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
- buffer_handle_slot.first.value());
- std::unique_ptr<ConsumerBuffer> consumer_buffer =
- ConsumerBuffer::Import(std::move(buffer_handle_slot.first));
- if (!consumer_buffer) {
- ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
- buffer_handle_slot.second);
- last_error = ErrorStatus(EPIPE);
- continue;
- }
- auto add_status =
- AddBuffer(std::move(consumer_buffer), buffer_handle_slot.second);
- if (!add_status) {
- ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
- add_status.GetErrorMessage().c_str());
- last_error = add_status;
- } else {
- imported_buffers_count++;
- }
- }
- if (imported_buffers_count > 0)
- return {imported_buffers_count};
- else
- return last_error.error_status();
- }
- Status<void> ConsumerQueue::AddBuffer(
- const std::shared_ptr<ConsumerBuffer>& buffer, size_t slot) {
- ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
- buffer->id(), slot);
- return BufferHubQueue::AddBuffer(buffer, slot);
- }
- Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
- int timeout, size_t* slot, void* meta, size_t user_metadata_size,
- LocalHandle* acquire_fence) {
- if (user_metadata_size != user_metadata_size_) {
- ALOGE(
- "%s: Metadata size (%zu) for the dequeuing buffer does not match "
- "metadata size (%zu) for the queue.",
- __FUNCTION__, user_metadata_size, user_metadata_size_);
- return ErrorStatus(EINVAL);
- }
- DvrNativeBufferMetadata canonical_meta;
- auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
- if (!status)
- return status.error_status();
- if (meta && user_metadata_size) {
- void* metadata_src =
- reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
- if (metadata_src) {
- memcpy(meta, metadata_src, user_metadata_size);
- } else {
- ALOGW("%s: no user-defined metadata.", __FUNCTION__);
- }
- }
- return status;
- }
- Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
- int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
- pdx::LocalHandle* acquire_fence) {
- ATRACE_NAME("ConsumerQueue::Dequeue");
- if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
- ALOGE("%s: Invalid parameter.", __FUNCTION__);
- return ErrorStatus(EINVAL);
- }
- auto status = BufferHubQueue::Dequeue(timeout, slot);
- if (!status)
- return status.error_status();
- auto buffer = std::static_pointer_cast<ConsumerBuffer>(status.take());
- const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
- if (ret < 0)
- return ErrorStatus(-ret);
- return {std::move(buffer)};
- }
- Status<void> ConsumerQueue::OnBufferAllocated() {
- ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());
- auto status = ImportBuffers();
- if (!status) {
- ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
- status.GetErrorMessage().c_str());
- return ErrorStatus(status.error());
- } else if (status.get() == 0) {
- ALOGW("%s: No new buffers allocated!", __FUNCTION__);
- return ErrorStatus(ENOBUFS);
- } else {
- ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
- status.get());
- return {};
- }
- }
- } // namespace dvr
- } // namespace android
|