StreamSplitter.cpp 10 KB


  1. /*
  2. * Copyright 2014 The Android Open Source Project
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <inttypes.h>
  17. #define LOG_TAG "StreamSplitter"
  18. #define ATRACE_TAG ATRACE_TAG_GRAPHICS
  19. //#define LOG_NDEBUG 0
  20. #include <gui/BufferItem.h>
  21. #include <gui/IGraphicBufferConsumer.h>
  22. #include <gui/IGraphicBufferProducer.h>
  23. #include <gui/StreamSplitter.h>
  24. #include <ui/GraphicBuffer.h>
  25. #include <binder/ProcessState.h>
  26. #include <utils/Trace.h>
  27. #include <system/window.h>
  28. namespace android {
  29. status_t StreamSplitter::createSplitter(
  30. const sp<IGraphicBufferConsumer>& inputQueue,
  31. sp<StreamSplitter>* outSplitter) {
  32. if (inputQueue == nullptr) {
  33. ALOGE("createSplitter: inputQueue must not be NULL");
  34. return BAD_VALUE;
  35. }
  36. if (outSplitter == nullptr) {
  37. ALOGE("createSplitter: outSplitter must not be NULL");
  38. return BAD_VALUE;
  39. }
  40. sp<StreamSplitter> splitter(new StreamSplitter(inputQueue));
  41. status_t status = splitter->mInput->consumerConnect(splitter, false);
  42. if (status == NO_ERROR) {
  43. splitter->mInput->setConsumerName(String8("StreamSplitter"));
  44. *outSplitter = splitter;
  45. }
  46. return status;
  47. }
  48. StreamSplitter::StreamSplitter(const sp<IGraphicBufferConsumer>& inputQueue)
  49. : mIsAbandoned(false), mMutex(), mReleaseCondition(),
  50. mOutstandingBuffers(0), mInput(inputQueue), mOutputs(), mBuffers() {}
  51. StreamSplitter::~StreamSplitter() {
  52. mInput->consumerDisconnect();
  53. Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
  54. for (; output != mOutputs.end(); ++output) {
  55. (*output)->disconnect(NATIVE_WINDOW_API_CPU);
  56. }
  57. if (mBuffers.size() > 0) {
  58. ALOGE("%zu buffers still being tracked", mBuffers.size());
  59. }
  60. }
  61. status_t StreamSplitter::addOutput(
  62. const sp<IGraphicBufferProducer>& outputQueue) {
  63. if (outputQueue == nullptr) {
  64. ALOGE("addOutput: outputQueue must not be NULL");
  65. return BAD_VALUE;
  66. }
  67. Mutex::Autolock lock(mMutex);
  68. IGraphicBufferProducer::QueueBufferOutput queueBufferOutput;
  69. sp<OutputListener> listener(new OutputListener(this, outputQueue));
  70. IInterface::asBinder(outputQueue)->linkToDeath(listener);
  71. status_t status = outputQueue->connect(listener, NATIVE_WINDOW_API_CPU,
  72. /* producerControlledByApp */ false, &queueBufferOutput);
  73. if (status != NO_ERROR) {
  74. ALOGE("addOutput: failed to connect (%d)", status);
  75. return status;
  76. }
  77. mOutputs.push_back(outputQueue);
  78. return NO_ERROR;
  79. }
  80. void StreamSplitter::setName(const String8 &name) {
  81. Mutex::Autolock lock(mMutex);
  82. mInput->setConsumerName(name);
  83. }
  84. void StreamSplitter::onFrameAvailable(const BufferItem& /* item */) {
  85. ATRACE_CALL();
  86. Mutex::Autolock lock(mMutex);
  87. // The current policy is that if any one consumer is consuming buffers too
  88. // slowly, the splitter will stall the rest of the outputs by not acquiring
  89. // any more buffers from the input. This will cause back pressure on the
  90. // input queue, slowing down its producer.
  91. // If there are too many outstanding buffers, we block until a buffer is
  92. // released back to the input in onBufferReleased
  93. while (mOutstandingBuffers >= MAX_OUTSTANDING_BUFFERS) {
  94. mReleaseCondition.wait(mMutex);
  95. // If the splitter is abandoned while we are waiting, the release
  96. // condition variable will be broadcast, and we should just return
  97. // without attempting to do anything more (since the input queue will
  98. // also be abandoned).
  99. if (mIsAbandoned) {
  100. return;
  101. }
  102. }
  103. ++mOutstandingBuffers;
  104. // Acquire and detach the buffer from the input
  105. BufferItem bufferItem;
  106. status_t status = mInput->acquireBuffer(&bufferItem, /* presentWhen */ 0);
  107. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  108. "acquiring buffer from input failed (%d)", status);
  109. ALOGV("acquired buffer %#" PRIx64 " from input",
  110. bufferItem.mGraphicBuffer->getId());
  111. status = mInput->detachBuffer(bufferItem.mSlot);
  112. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  113. "detaching buffer from input failed (%d)", status);
  114. // Initialize our reference count for this buffer
  115. mBuffers.add(bufferItem.mGraphicBuffer->getId(),
  116. new BufferTracker(bufferItem.mGraphicBuffer));
  117. IGraphicBufferProducer::QueueBufferInput queueInput(
  118. bufferItem.mTimestamp, bufferItem.mIsAutoTimestamp,
  119. bufferItem.mDataSpace, bufferItem.mCrop,
  120. static_cast<int32_t>(bufferItem.mScalingMode),
  121. bufferItem.mTransform, bufferItem.mFence);
  122. // Attach and queue the buffer to each of the outputs
  123. Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
  124. for (; output != mOutputs.end(); ++output) {
  125. int slot;
  126. status = (*output)->attachBuffer(&slot, bufferItem.mGraphicBuffer);
  127. if (status == NO_INIT) {
  128. // If we just discovered that this output has been abandoned, note
  129. // that, increment the release count so that we still release this
  130. // buffer eventually, and move on to the next output
  131. onAbandonedLocked();
  132. mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
  133. incrementReleaseCountLocked();
  134. continue;
  135. } else {
  136. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  137. "attaching buffer to output failed (%d)", status);
  138. }
  139. IGraphicBufferProducer::QueueBufferOutput queueOutput;
  140. status = (*output)->queueBuffer(slot, queueInput, &queueOutput);
  141. if (status == NO_INIT) {
  142. // If we just discovered that this output has been abandoned, note
  143. // that, increment the release count so that we still release this
  144. // buffer eventually, and move on to the next output
  145. onAbandonedLocked();
  146. mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
  147. incrementReleaseCountLocked();
  148. continue;
  149. } else {
  150. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  151. "queueing buffer to output failed (%d)", status);
  152. }
  153. ALOGV("queued buffer %#" PRIx64 " to output %p",
  154. bufferItem.mGraphicBuffer->getId(), output->get());
  155. }
  156. }
  157. void StreamSplitter::onBufferReleasedByOutput(
  158. const sp<IGraphicBufferProducer>& from) {
  159. ATRACE_CALL();
  160. Mutex::Autolock lock(mMutex);
  161. sp<GraphicBuffer> buffer;
  162. sp<Fence> fence;
  163. status_t status = from->detachNextBuffer(&buffer, &fence);
  164. if (status == NO_INIT) {
  165. // If we just discovered that this output has been abandoned, note that,
  166. // but we can't do anything else, since buffer is invalid
  167. onAbandonedLocked();
  168. return;
  169. } else {
  170. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  171. "detaching buffer from output failed (%d)", status);
  172. }
  173. ALOGV("detached buffer %#" PRIx64 " from output %p",
  174. buffer->getId(), from.get());
  175. const sp<BufferTracker>& tracker = mBuffers.editValueFor(buffer->getId());
  176. // Merge the release fence of the incoming buffer so that the fence we send
  177. // back to the input includes all of the outputs' fences
  178. tracker->mergeFence(fence);
  179. // Check to see if this is the last outstanding reference to this buffer
  180. size_t releaseCount = tracker->incrementReleaseCountLocked();
  181. ALOGV("buffer %#" PRIx64 " reference count %zu (of %zu)", buffer->getId(),
  182. releaseCount, mOutputs.size());
  183. if (releaseCount < mOutputs.size()) {
  184. return;
  185. }
  186. // If we've been abandoned, we can't return the buffer to the input, so just
  187. // stop tracking it and move on
  188. if (mIsAbandoned) {
  189. mBuffers.removeItem(buffer->getId());
  190. return;
  191. }
  192. // Attach and release the buffer back to the input
  193. int consumerSlot;
  194. status = mInput->attachBuffer(&consumerSlot, tracker->getBuffer());
  195. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  196. "attaching buffer to input failed (%d)", status);
  197. status = mInput->releaseBuffer(consumerSlot, /* frameNumber */ 0,
  198. EGL_NO_DISPLAY, EGL_NO_SYNC_KHR, tracker->getMergedFence());
  199. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  200. "releasing buffer to input failed (%d)", status);
  201. ALOGV("released buffer %#" PRIx64 " to input", buffer->getId());
  202. // We no longer need to track the buffer once it has been returned to the
  203. // input
  204. mBuffers.removeItem(buffer->getId());
  205. // Notify any waiting onFrameAvailable calls
  206. --mOutstandingBuffers;
  207. mReleaseCondition.signal();
  208. }
  209. void StreamSplitter::onAbandonedLocked() {
  210. ALOGE("one of my outputs has abandoned me");
  211. if (!mIsAbandoned) {
  212. mInput->consumerDisconnect();
  213. }
  214. mIsAbandoned = true;
  215. mReleaseCondition.broadcast();
  216. }
  217. StreamSplitter::OutputListener::OutputListener(
  218. const sp<StreamSplitter>& splitter,
  219. const sp<IGraphicBufferProducer>& output)
  220. : mSplitter(splitter), mOutput(output) {}
  221. StreamSplitter::OutputListener::~OutputListener() {}
  222. void StreamSplitter::OutputListener::onBufferReleased() {
  223. mSplitter->onBufferReleasedByOutput(mOutput);
  224. }
  225. void StreamSplitter::OutputListener::binderDied(const wp<IBinder>& /* who */) {
  226. Mutex::Autolock lock(mSplitter->mMutex);
  227. mSplitter->onAbandonedLocked();
  228. }
  229. StreamSplitter::BufferTracker::BufferTracker(const sp<GraphicBuffer>& buffer)
  230. : mBuffer(buffer), mMergedFence(Fence::NO_FENCE), mReleaseCount(0) {}
  231. StreamSplitter::BufferTracker::~BufferTracker() {}
  232. void StreamSplitter::BufferTracker::mergeFence(const sp<Fence>& with) {
  233. mMergedFence = Fence::merge(String8("StreamSplitter"), mMergedFence, with);
  234. }
  235. } // namespace android