buffer_hub_queue_client.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. #include "include/private/dvr/buffer_hub_queue_client.h"
  2. #include <inttypes.h>
  3. #include <log/log.h>
  4. #include <poll.h>
  5. #include <sys/epoll.h>
  6. #include <array>
  7. #include <pdx/default_transport/client_channel.h>
  8. #include <pdx/default_transport/client_channel_factory.h>
  9. #include <pdx/file_handle.h>
  10. #include <pdx/trace.h>
  11. #define RETRY_EINTR(fnc_call) \
  12. ([&]() -> decltype(fnc_call) { \
  13. decltype(fnc_call) result; \
  14. do { \
  15. result = (fnc_call); \
  16. } while (result == -1 && errno == EINTR); \
  17. return result; \
  18. })()
  19. using android::pdx::ErrorStatus;
  20. using android::pdx::LocalChannelHandle;
  21. using android::pdx::LocalHandle;
  22. using android::pdx::Status;
  23. namespace android {
  24. namespace dvr {
  25. namespace {
  26. std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
  27. return {static_cast<int32_t>(value >> 32),
  28. static_cast<int32_t>(value & ((1ull << 32) - 1))};
  29. }
  30. uint64_t Stuff(int32_t a, int32_t b) {
  31. const uint32_t ua = static_cast<uint32_t>(a);
  32. const uint32_t ub = static_cast<uint32_t>(b);
  33. return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
  34. }
  35. } // anonymous namespace
  36. BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
  37. : Client{pdx::default_transport::ClientChannel::Create(
  38. std::move(channel_handle))} {
  39. Initialize();
  40. }
  41. BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
  42. : Client{
  43. pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
  44. Initialize();
  45. }
  46. void BufferHubQueue::Initialize() {
  47. int ret = epoll_fd_.Create();
  48. if (ret < 0) {
  49. ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
  50. strerror(-ret));
  51. return;
  52. }
  53. epoll_event event = {
  54. .events = EPOLLIN | EPOLLET,
  55. .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
  56. ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
  57. if (ret < 0) {
  58. ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
  59. strerror(-ret));
  60. }
  61. }
  62. Status<void> BufferHubQueue::ImportQueue() {
  63. auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
  64. if (!status) {
  65. ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
  66. status.GetErrorMessage().c_str());
  67. return ErrorStatus(status.error());
  68. } else {
  69. SetupQueue(status.get());
  70. return {};
  71. }
  72. }
  73. void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
  74. is_async_ = queue_info.producer_config.is_async;
  75. default_width_ = queue_info.producer_config.default_width;
  76. default_height_ = queue_info.producer_config.default_height;
  77. default_format_ = queue_info.producer_config.default_format;
  78. user_metadata_size_ = queue_info.producer_config.user_metadata_size;
  79. id_ = queue_info.id;
  80. }
  81. std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
  82. if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
  83. return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
  84. else
  85. return nullptr;
  86. }
  87. std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
  88. if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
  89. return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
  90. else
  91. return nullptr;
  92. }
  93. Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
  94. bool silent) {
  95. auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
  96. if (!status) {
  97. ALOGE(
  98. "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
  99. "%s",
  100. status.GetErrorMessage().c_str());
  101. return ErrorStatus(status.error());
  102. }
  103. return status;
  104. }
  105. pdx::Status<ConsumerQueueParcelable>
  106. BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
  107. auto status = CreateConsumerQueueHandle(silent);
  108. if (!status)
  109. return status.error_status();
  110. // A temporary consumer queue client to pull its channel parcelable.
  111. auto consumer_queue =
  112. std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
  113. ConsumerQueueParcelable queue_parcelable(
  114. consumer_queue->GetChannel()->TakeChannelParcelable());
  115. if (!queue_parcelable.IsValid()) {
  116. ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
  117. return ErrorStatus(EINVAL);
  118. }
  119. return {std::move(queue_parcelable)};
  120. }
  121. bool BufferHubQueue::WaitForBuffers(int timeout) {
  122. ATRACE_NAME("BufferHubQueue::WaitForBuffers");
  123. std::array<epoll_event, kMaxEvents> events;
  124. // Loop at least once to check for hangups.
  125. do {
  126. ALOGD_IF(
  127. TRACE,
  128. "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
  129. id(), count(), capacity());
  130. // If there is already a buffer then just check for hangup without waiting.
  131. const int ret = epoll_fd_.Wait(events.data(), events.size(),
  132. count() == 0 ? timeout : 0);
  133. if (ret == 0) {
  134. ALOGI_IF(TRACE,
  135. "BufferHubQueue::WaitForBuffers: No events before timeout: "
  136. "queue_id=%d",
  137. id());
  138. return count() != 0;
  139. }
  140. if (ret < 0 && ret != -EINTR) {
  141. ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
  142. return false;
  143. }
  144. const int num_events = ret;
  145. // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
  146. // one for each buffer in the queue, and one extra event for the queue
  147. // client itself.
  148. for (int i = 0; i < num_events; i++) {
  149. int32_t event_fd;
  150. int32_t index;
  151. std::tie(event_fd, index) = Unstuff(events[i].data.u64);
  152. PDX_TRACE_FORMAT(
  153. "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
  154. "slot=%d|",
  155. id(), num_events, i, event_fd, index);
  156. ALOGD_IF(TRACE,
  157. "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
  158. i, event_fd, index);
  159. if (is_buffer_event_index(index)) {
  160. HandleBufferEvent(static_cast<size_t>(index), event_fd,
  161. events[i].events);
  162. } else if (is_queue_event_index(index)) {
  163. HandleQueueEvent(events[i].events);
  164. } else {
  165. ALOGW(
  166. "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
  167. "index=%d",
  168. event_fd, index);
  169. }
  170. }
  171. } while (count() == 0 && capacity() > 0 && !hung_up());
  172. return count() != 0;
  173. }
  174. Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
  175. int poll_events) {
  176. ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
  177. if (!buffers_[slot]) {
  178. ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
  179. return ErrorStatus(ENOENT);
  180. }
  181. auto status = buffers_[slot]->GetEventMask(poll_events);
  182. if (!status) {
  183. ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
  184. status.GetErrorMessage().c_str());
  185. return status.error_status();
  186. }
  187. const int events = status.get();
  188. PDX_TRACE_FORMAT(
  189. "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
  190. "events=%d|",
  191. id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
  192. if (events & EPOLLIN) {
  193. return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
  194. } else if (events & EPOLLHUP) {
  195. ALOGW(
  196. "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
  197. "event_fd=%d buffer_id=%d",
  198. slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
  199. return RemoveBuffer(slot);
  200. } else {
  201. ALOGW(
  202. "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
  203. "events=%d",
  204. slot, events);
  205. }
  206. return {};
  207. }
  208. Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
  209. ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
  210. auto status = GetEventMask(poll_event);
  211. if (!status) {
  212. ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
  213. status.GetErrorMessage().c_str());
  214. return status.error_status();
  215. }
  216. const int events = status.get();
  217. if (events & EPOLLIN) {
  218. // Note that after buffer imports, if |count()| still returns 0, epoll
  219. // wait will be tried again to acquire the newly imported buffer.
  220. auto buffer_status = OnBufferAllocated();
  221. if (!buffer_status) {
  222. ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
  223. buffer_status.GetErrorMessage().c_str());
  224. }
  225. } else if (events & EPOLLHUP) {
  226. ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
  227. hung_up_ = true;
  228. } else {
  229. ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
  230. }
  231. return {};
  232. }
  233. Status<void> BufferHubQueue::AddBuffer(
  234. const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
  235. ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
  236. slot);
  237. if (is_full()) {
  238. ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
  239. return ErrorStatus(E2BIG);
  240. }
  241. if (buffers_[slot]) {
  242. // Replace the buffer if the slot is occupied. This could happen when the
  243. // producer side replaced the slot with a newly allocated buffer. Remove the
  244. // buffer before setting up with the new one.
  245. auto remove_status = RemoveBuffer(slot);
  246. if (!remove_status)
  247. return remove_status.error_status();
  248. }
  249. for (const auto& event_source : buffer->GetEventSources()) {
  250. epoll_event event = {.events = event_source.event_mask | EPOLLET,
  251. .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
  252. const int ret =
  253. epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
  254. if (ret < 0) {
  255. ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
  256. strerror(-ret));
  257. return ErrorStatus(-ret);
  258. }
  259. }
  260. buffers_[slot] = buffer;
  261. capacity_++;
  262. return {};
  263. }
  264. Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
  265. ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);
  266. if (buffers_[slot]) {
  267. for (const auto& event_source : buffers_[slot]->GetEventSources()) {
  268. const int ret =
  269. epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
  270. if (ret < 0) {
  271. ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
  272. strerror(-ret));
  273. return ErrorStatus(-ret);
  274. }
  275. }
  276. // Trigger OnBufferRemoved callback if registered.
  277. if (on_buffer_removed_)
  278. on_buffer_removed_(buffers_[slot]);
  279. buffers_[slot] = nullptr;
  280. capacity_--;
  281. }
  282. return {};
  283. }
  284. Status<void> BufferHubQueue::Enqueue(Entry entry) {
  285. if (!is_full()) {
  286. // Find and remove the enqueued buffer from unavailable_buffers_slot if
  287. // exist.
  288. auto enqueued_buffer_iter = std::find_if(
  289. unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
  290. [&entry](size_t slot) -> bool { return slot == entry.slot; });
  291. if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
  292. unavailable_buffers_slot_.erase(enqueued_buffer_iter);
  293. }
  294. available_buffers_.push(std::move(entry));
  295. // Trigger OnBufferAvailable callback if registered.
  296. if (on_buffer_available_)
  297. on_buffer_available_();
  298. return {};
  299. } else {
  300. ALOGE("%s: Buffer queue is full!", __FUNCTION__);
  301. return ErrorStatus(E2BIG);
  302. }
  303. }
  304. Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
  305. size_t* slot) {
  306. ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);
  307. PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());
  308. if (count() == 0) {
  309. if (!WaitForBuffers(timeout))
  310. return ErrorStatus(ETIMEDOUT);
  311. }
  312. auto& entry = available_buffers_.top();
  313. PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
  314. entry.slot);
  315. std::shared_ptr<BufferHubBase> buffer = std::move(entry.buffer);
  316. *slot = entry.slot;
  317. available_buffers_.pop();
  318. unavailable_buffers_slot_.push_back(*slot);
  319. return {std::move(buffer)};
  320. }
  321. void BufferHubQueue::SetBufferAvailableCallback(
  322. BufferAvailableCallback callback) {
  323. on_buffer_available_ = callback;
  324. }
  325. void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
  326. on_buffer_removed_ = callback;
  327. }
  328. pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
  329. // Clear all available buffers.
  330. while (!available_buffers_.empty())
  331. available_buffers_.pop();
  332. pdx::Status<void> last_error; // No error.
  333. // Clear all buffers this producer queue is tracking.
  334. for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
  335. if (buffers_[slot] != nullptr) {
  336. auto status = RemoveBuffer(slot);
  337. if (!status) {
  338. ALOGE(
  339. "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
  340. "slot=%zu.",
  341. slot);
  342. last_error = status.error_status();
  343. }
  344. }
  345. }
  346. return last_error;
  347. }
  348. ProducerQueue::ProducerQueue(LocalChannelHandle handle)
  349. : BASE(std::move(handle)) {
  350. auto status = ImportQueue();
  351. if (!status) {
  352. ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
  353. status.GetErrorMessage().c_str());
  354. Close(-status.error());
  355. }
  356. }
  357. ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
  358. const UsagePolicy& usage)
  359. : BASE(BufferHubRPC::kClientPath) {
  360. auto status =
  361. InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
  362. if (!status) {
  363. ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
  364. status.GetErrorMessage().c_str());
  365. Close(-status.error());
  366. return;
  367. }
  368. SetupQueue(status.get());
  369. }
  370. Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
  371. uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
  372. uint64_t usage, size_t buffer_count) {
  373. if (buffer_count == 0) {
  374. return {std::vector<size_t>()};
  375. }
  376. if (capacity() + buffer_count > kMaxQueueCapacity) {
  377. ALOGE(
  378. "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
  379. "allocate %zu more buffer(s).",
  380. capacity(), buffer_count);
  381. return ErrorStatus(E2BIG);
  382. }
  383. Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
  384. InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
  385. width, height, layer_count, format, usage, buffer_count);
  386. if (!status) {
  387. ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
  388. status.GetErrorMessage().c_str());
  389. return status.error_status();
  390. }
  391. auto buffer_handle_slots = status.take();
  392. LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
  393. "BufferHubRPC::ProducerQueueAllocateBuffers should "
  394. "return %zu buffer handle(s), but returned %zu instead.",
  395. buffer_count, buffer_handle_slots.size());
  396. std::vector<size_t> buffer_slots;
  397. buffer_slots.reserve(buffer_count);
  398. // Bookkeeping for each buffer.
  399. for (auto& hs : buffer_handle_slots) {
  400. auto& buffer_handle = hs.first;
  401. size_t buffer_slot = hs.second;
  402. // Note that import might (though very unlikely) fail. If so, buffer_handle
  403. // will be closed and included in returned buffer_slots.
  404. if (AddBuffer(ProducerBuffer::Import(std::move(buffer_handle)),
  405. buffer_slot)) {
  406. ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
  407. buffer_slot);
  408. buffer_slots.push_back(buffer_slot);
  409. }
  410. }
  411. if (buffer_slots.size() != buffer_count) {
  412. // Error out if the count of imported buffer(s) is not correct.
  413. ALOGE(
  414. "ProducerQueue::AllocateBuffers: requested to import %zu "
  415. "buffers, but actually imported %zu buffers.",
  416. buffer_count, buffer_slots.size());
  417. return ErrorStatus(ENOMEM);
  418. }
  419. return {std::move(buffer_slots)};
  420. }
  421. Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
  422. uint32_t layer_count,
  423. uint32_t format, uint64_t usage) {
  424. // We only allocate one buffer at a time.
  425. constexpr size_t buffer_count = 1;
  426. auto status =
  427. AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
  428. if (!status) {
  429. ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
  430. status.GetErrorMessage().c_str());
  431. return status.error_status();
  432. }
  433. return {status.get()[0]};
  434. }
  435. Status<void> ProducerQueue::AddBuffer(
  436. const std::shared_ptr<ProducerBuffer>& buffer, size_t slot) {
  437. ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
  438. id(), buffer->id(), slot);
  439. // For producer buffer, we need to enqueue the newly added buffer
  440. // immediately. Producer queue starts with all buffers in available state.
  441. auto status = BufferHubQueue::AddBuffer(buffer, slot);
  442. if (!status)
  443. return status;
  444. return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
  445. }
  446. Status<size_t> ProducerQueue::InsertBuffer(
  447. const std::shared_ptr<ProducerBuffer>& buffer) {
  448. if (buffer == nullptr ||
  449. !BufferHubDefs::isClientGained(buffer->buffer_state(),
  450. buffer->client_state_mask())) {
  451. ALOGE(
  452. "ProducerQueue::InsertBuffer: Can only insert a buffer when it's in "
  453. "gained state.");
  454. return ErrorStatus(EINVAL);
  455. }
  456. auto status_or_slot =
  457. InvokeRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
  458. buffer->cid());
  459. if (!status_or_slot) {
  460. ALOGE(
  461. "ProducerQueue::InsertBuffer: Failed to insert producer buffer: "
  462. "buffer_cid=%d, error: %s.",
  463. buffer->cid(), status_or_slot.GetErrorMessage().c_str());
  464. return status_or_slot.error_status();
  465. }
  466. size_t slot = status_or_slot.get();
  467. // Note that we are calling AddBuffer() from the base class to explicitly
  468. // avoid Enqueue() the ProducerBuffer.
  469. auto status = BufferHubQueue::AddBuffer(buffer, slot);
  470. if (!status) {
  471. ALOGE("ProducerQueue::InsertBuffer: Failed to add buffer: %s.",
  472. status.GetErrorMessage().c_str());
  473. return status.error_status();
  474. }
  475. return {slot};
  476. }
  477. Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
  478. auto status =
  479. InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
  480. if (!status) {
  481. ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
  482. status.GetErrorMessage().c_str());
  483. return status.error_status();
  484. }
  485. return BufferHubQueue::RemoveBuffer(slot);
  486. }
  487. Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
  488. int timeout, size_t* slot, LocalHandle* release_fence) {
  489. DvrNativeBufferMetadata canonical_meta;
  490. return Dequeue(timeout, slot, &canonical_meta, release_fence);
  491. }
  492. pdx::Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
  493. int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
  494. pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
  495. ATRACE_NAME("ProducerQueue::Dequeue");
  496. if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
  497. ALOGE("%s: Invalid parameter.", __FUNCTION__);
  498. return ErrorStatus(EINVAL);
  499. }
  500. std::shared_ptr<ProducerBuffer> buffer;
  501. Status<std::shared_ptr<BufferHubBase>> dequeue_status =
  502. BufferHubQueue::Dequeue(timeout, slot);
  503. if (dequeue_status.ok()) {
  504. buffer = std::static_pointer_cast<ProducerBuffer>(dequeue_status.take());
  505. } else {
  506. if (gain_posted_buffer) {
  507. Status<std::shared_ptr<ProducerBuffer>> dequeue_unacquired_status =
  508. ProducerQueue::DequeueUnacquiredBuffer(slot);
  509. if (!dequeue_unacquired_status.ok()) {
  510. ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
  511. dequeue_unacquired_status.error());
  512. return dequeue_unacquired_status.error_status();
  513. }
  514. buffer = dequeue_unacquired_status.take();
  515. } else {
  516. return dequeue_status.error_status();
  517. }
  518. }
  519. const int ret =
  520. buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
  521. if (ret < 0 && ret != -EALREADY)
  522. return ErrorStatus(-ret);
  523. return {std::move(buffer)};
  524. }
  525. Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::DequeueUnacquiredBuffer(
  526. size_t* slot) {
  527. if (unavailable_buffers_slot_.size() < 1) {
  528. ALOGE(
  529. "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
  530. "acquired state if exist.",
  531. __FUNCTION__);
  532. return ErrorStatus(ENOMEM);
  533. }
  534. // Find the first buffer that is not in acquired state from
  535. // unavailable_buffers_slot_.
  536. for (auto iter = unavailable_buffers_slot_.begin();
  537. iter != unavailable_buffers_slot_.end(); iter++) {
  538. std::shared_ptr<ProducerBuffer> buffer = ProducerQueue::GetBuffer(*iter);
  539. if (buffer == nullptr) {
  540. ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__,
  541. static_cast<int>(*slot));
  542. return ErrorStatus(EIO);
  543. }
  544. if (!BufferHubDefs::isAnyClientAcquired(buffer->buffer_state())) {
  545. *slot = *iter;
  546. unavailable_buffers_slot_.erase(iter);
  547. unavailable_buffers_slot_.push_back(*slot);
  548. ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
  549. __FUNCTION__, static_cast<int>(*slot));
  550. return {std::move(buffer)};
  551. }
  552. }
  553. ALOGE(
  554. "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
  555. __FUNCTION__);
  556. return ErrorStatus(EBUSY);
  557. }
  558. pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
  559. if (capacity() != 0) {
  560. ALOGE(
  561. "%s: producer queue can only be taken out as a parcelable when empty. "
  562. "Current queue capacity: %zu",
  563. __FUNCTION__, capacity());
  564. return ErrorStatus(EINVAL);
  565. }
  566. std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
  567. ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
  568. // Here the queue parcelable is returned and holds the underlying system
  569. // resources backing the queue; while the original client channel of this
  570. // producer queue is destroyed in place so that this client can no longer
  571. // provide producer operations.
  572. return {std::move(queue_parcelable)};
  573. }
  574. /*static */
  575. std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
  576. LocalChannelHandle handle) {
  577. return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
  578. }
  579. ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
  580. : BufferHubQueue(std::move(handle)) {
  581. auto status = ImportQueue();
  582. if (!status) {
  583. ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
  584. status.GetErrorMessage().c_str());
  585. Close(-status.error());
  586. }
  587. auto import_status = ImportBuffers();
  588. if (import_status) {
  589. ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
  590. } else {
  591. ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
  592. import_status.GetErrorMessage().c_str());
  593. }
  594. }
  595. Status<size_t> ConsumerQueue::ImportBuffers() {
  596. auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
  597. if (!status) {
  598. if (status.error() == EBADR) {
  599. ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
  600. return {0};
  601. } else {
  602. ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
  603. status.GetErrorMessage().c_str());
  604. return status.error_status();
  605. }
  606. }
  607. int ret;
  608. Status<void> last_error;
  609. size_t imported_buffers_count = 0;
  610. auto buffer_handle_slots = status.take();
  611. for (auto& buffer_handle_slot : buffer_handle_slots) {
  612. ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
  613. buffer_handle_slot.first.value());
  614. std::unique_ptr<ConsumerBuffer> consumer_buffer =
  615. ConsumerBuffer::Import(std::move(buffer_handle_slot.first));
  616. if (!consumer_buffer) {
  617. ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
  618. buffer_handle_slot.second);
  619. last_error = ErrorStatus(EPIPE);
  620. continue;
  621. }
  622. auto add_status =
  623. AddBuffer(std::move(consumer_buffer), buffer_handle_slot.second);
  624. if (!add_status) {
  625. ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
  626. add_status.GetErrorMessage().c_str());
  627. last_error = add_status;
  628. } else {
  629. imported_buffers_count++;
  630. }
  631. }
  632. if (imported_buffers_count > 0)
  633. return {imported_buffers_count};
  634. else
  635. return last_error.error_status();
  636. }
  637. Status<void> ConsumerQueue::AddBuffer(
  638. const std::shared_ptr<ConsumerBuffer>& buffer, size_t slot) {
  639. ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
  640. buffer->id(), slot);
  641. return BufferHubQueue::AddBuffer(buffer, slot);
  642. }
  643. Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
  644. int timeout, size_t* slot, void* meta, size_t user_metadata_size,
  645. LocalHandle* acquire_fence) {
  646. if (user_metadata_size != user_metadata_size_) {
  647. ALOGE(
  648. "%s: Metadata size (%zu) for the dequeuing buffer does not match "
  649. "metadata size (%zu) for the queue.",
  650. __FUNCTION__, user_metadata_size, user_metadata_size_);
  651. return ErrorStatus(EINVAL);
  652. }
  653. DvrNativeBufferMetadata canonical_meta;
  654. auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
  655. if (!status)
  656. return status.error_status();
  657. if (meta && user_metadata_size) {
  658. void* metadata_src =
  659. reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
  660. if (metadata_src) {
  661. memcpy(meta, metadata_src, user_metadata_size);
  662. } else {
  663. ALOGW("%s: no user-defined metadata.", __FUNCTION__);
  664. }
  665. }
  666. return status;
  667. }
  668. Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
  669. int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
  670. pdx::LocalHandle* acquire_fence) {
  671. ATRACE_NAME("ConsumerQueue::Dequeue");
  672. if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
  673. ALOGE("%s: Invalid parameter.", __FUNCTION__);
  674. return ErrorStatus(EINVAL);
  675. }
  676. auto status = BufferHubQueue::Dequeue(timeout, slot);
  677. if (!status)
  678. return status.error_status();
  679. auto buffer = std::static_pointer_cast<ConsumerBuffer>(status.take());
  680. const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
  681. if (ret < 0)
  682. return ErrorStatus(-ret);
  683. return {std::move(buffer)};
  684. }
  685. Status<void> ConsumerQueue::OnBufferAllocated() {
  686. ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());
  687. auto status = ImportBuffers();
  688. if (!status) {
  689. ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
  690. status.GetErrorMessage().c_str());
  691. return ErrorStatus(status.error());
  692. } else if (status.get() == 0) {
  693. ALOGW("%s: No new buffers allocated!", __FUNCTION__);
  694. return ErrorStatus(ENOBUFS);
  695. } else {
  696. ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
  697. status.get());
  698. return {};
  699. }
  700. }
  701. } // namespace dvr
  702. } // namespace android