Changeset 236547 in webkit
- Timestamp:
- Sep 27, 2018 9:03:48 AM (6 years ago)
- Location:
- trunk/Source/WebCore
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Source/WebCore/ChangeLog
r236546 r236547 1 2018-09-27 Alicia Boya García <aboya@igalia.com> 2 3 [MSE][GStreamer] Use sentinel buffer to detect end of append 4 https://bugs.webkit.org/show_bug.cgi?id=189924 5 6 Reviewed by Philippe Normand. 7 8 This patch introduces a new mechanism to detect when an append has 9 been consumed completely by the demuxer. It takes advantage of the 10 fact that buffer pushing is synchronous: both the appsrc and the 11 demuxer live in the same streaming thread. When appsrc pushes a 12 buffer, it's actually making a qtdemux function call (it calls its 13 "chain" function). The demuxer will return from that call when it has 14 finished processing that buffer; only then the control returns to 15 appsrc, that can push the next buffer. 16 17 By pushing an additional buffer and capturing it in a probe we can 18 detect reliably when the previous buffer has been processed. 19 Because the pipeline only has one thread, at this point no more frames 20 can arrive to the appsink. 21 22 This replaces the old method of detecting end of append which relied 23 on the `need-data` event, which is more difficult to handle correctly 24 because it fires whenever the appsrc is empty (or below a given 25 level), which also happens when a buffer has not been pushed yet or 26 in response to a flush. 27 28 * platform/graphics/gstreamer/mse/AppendPipeline.cpp: 29 (WebCore::EndOfAppendMeta::init): 30 (WebCore::EndOfAppendMeta::transform): 31 (WebCore::EndOfAppendMeta::free): 32 (WebCore::AppendPipeline::staticInitialization): 33 (WebCore::AppendPipeline::AppendPipeline): 34 (WebCore::AppendPipeline::~AppendPipeline): 35 (WebCore::AppendPipeline::appsrcEndOfAppendCheckerProbe): 36 (WebCore::AppendPipeline::handleApplicationMessage): 37 (WebCore::AppendPipeline::handleEndOfAppend): 38 (WebCore::AppendPipeline::consumeAppsinkAvailableSamples): 39 (WebCore::AppendPipeline::resetPipeline): 40 (WebCore::AppendPipeline::pushNewBuffer): 41 (WebCore::AppendPipeline::handleAppsrcNeedDataReceived): Deleted.: 42 (WebCore::AppendPipeline::handleAppsrcAtLeastABufferLeft): Deleted. 43 (WebCore::AppendPipeline::checkEndOfAppend): Deleted. 44 (WebCore::AppendPipeline::setAppsrcDataLeavingProbe): Deleted. 45 (WebCore::AppendPipeline::removeAppsrcDataLeavingProbe): Deleted. 46 (WebCore::AppendPipeline::reportAppsrcAtLeastABufferLeft): Deleted. 47 (WebCore::AppendPipeline::reportAppsrcNeedDataReceived): Deleted. 48 (WebCore::appendPipelineAppsrcDataLeaving): Deleted. 49 (WebCore::appendPipelineAppsrcNeedData): Deleted. 50 * platform/graphics/gstreamer/mse/AppendPipeline.h: 51 1 52 2018-09-27 Chris Dumez <cdumez@apple.com> 2 53 -
trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp
r236409 r236547 47 47 namespace WebCore { 48 48 49 GType AppendPipeline::s_endOfAppendMetaType = 0; 50 const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr; 51 std::once_flag AppendPipeline::s_staticInitializationFlag; 52 53 struct EndOfAppendMeta { 54 GstMeta base; 55 static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; } 56 static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); } 57 static void free(GstMeta*, GstBuffer*) { } 58 }; 59 60 void AppendPipeline::staticInitialization() 61 { 62 ASSERT(WTF::isMainThread()); 63 64 const char* tags[] = { nullptr }; 65 s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags); 66 s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform); 67 } 68 49 69 static const char* dumpAppendState(AppendPipeline::AppendState appendState) 50 70 { … … 69 89 } 70 90 71 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);72 91 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*); 73 92 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*); 74 93 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*); 75 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);76 94 #if !LOG_DISABLED 77 95 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*); … … 111 129 , m_id(0) 112 130 , m_wasBusAlreadyNotifiedOfAvailableSamples(false) 113 , m_appsrcAtLeastABufferLeft(false)114 , m_appsrcNeedDataReceived(false)115 , m_appsrcDataLeavingProbeId(0)116 131 , m_appendState(AppendState::NotStarted) 117 132 , m_abortPending(false) … … 119 134 { 120 135 ASSERT(WTF::isMainThread()); 136 std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization); 121 137 122 138 GST_TRACE("Creating AppendPipeline (%p)", this); … … 137 153 // below will already take the initial reference and we need an additional one for us. 138 154 m_appsrc = gst_element_factory_make("appsrc", nullptr); 155 156 GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src")); 157 gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) { 158 return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo); 159 }, this, nullptr); 139 160 140 161 const String& type = m_sourceBufferPrivate->type().containerType(); … … 154 175 GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink")); 155 176 g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this); 156 157 setAppsrcDataLeavingProbe();158 177 159 178 #if !LOG_DISABLED … … 174 193 175 194 // These signals won't be connected outside of the lifetime of "this". 176 g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);177 195 g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this); 178 196 g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this); … … 214 232 215 233 if (m_appsrc) { 216 removeAppsrcDataLeavingProbe();217 234 g_signal_handlers_disconnect_by_data(m_appsrc.get(), this); 218 235 m_appsrc = nullptr; … … 248 265 }; 249 266 267 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo) 268 { 269 ASSERT(!WTF::isMainThread()); 270 m_streamingThread = &WTF::Thread::current(); 271 272 GstBuffer* buffer = GST_BUFFER(padProbeInfo->data); 273 ASSERT(GST_IS_BUFFER(buffer)); 274 275 EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType)); 276 if (!endOfAppendMeta) { 277 // Normal buffer, nothing to do. 278 return GST_PAD_PROBE_OK; 279 } 280 281 GST_TRACE_OBJECT(this, "posting end-of-append request to bus"); 282 GstStructure* structure = gst_structure_new_empty("end-of-append"); 283 GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure); 284 gst_bus_post(m_bus.get(), message); 285 return GST_PAD_PROBE_DROP; 286 } 287 250 288 void AppendPipeline::clearPlayerPrivate() 251 289 { … … 285 323 286 324 const GstStructure* structure = gst_message_get_structure(message); 287 288 if (gst_structure_has_name(structure, "appsrc-need-data")) {289 handleAppsrcNeedDataReceived();290 return;291 }292 293 if (gst_structure_has_name(structure, "appsrc-buffer-left")) {294 handleAppsrcAtLeastABufferLeft();295 return;296 }297 325 298 326 if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) { … … 322 350 if (gst_structure_has_name(structure, "demuxer-no-more-pads")) { 323 351 demuxerNoMorePads(); 352 return; 353 } 354 355 if (gst_structure_has_name(structure, "end-of-append")) { 356 handleEndOfAppend(); 324 357 return; 325 358 } … … 354 387 } 355 388 356 void AppendPipeline::handleAppsrcNeedDataReceived()357 {358 if (!m_appsrcAtLeastABufferLeft) {359 GST_TRACE("discarding until at least a buffer leaves appsrc");360 return;361 }362 363 ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);364 ASSERT(!m_appsrcNeedDataReceived);365 366 GST_TRACE("received need-data from appsrc");367 368 m_appsrcNeedDataReceived = true;369 checkEndOfAppend();370 }371 372 void AppendPipeline::handleAppsrcAtLeastABufferLeft()373 {374 m_appsrcAtLeastABufferLeft = true;375 GST_TRACE("received buffer-left from appsrc");376 #if LOG_DISABLED377 removeAppsrcDataLeavingProbe();378 #endif379 }380 381 389 gint AppendPipeline::id() 382 390 { … … 439 447 ok = true; 440 448 if (m_pendingBuffer) { 441 GST_TRACE("pushing pending buffer % p", m_pendingBuffer.get());449 GST_TRACE("pushing pending buffer %" GST_PTR_FORMAT, m_pendingBuffer.get()); 442 450 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef()); 443 451 nextAppendState = AppendState::Ongoing; … … 604 612 } 605 613 606 void AppendPipeline::checkEndOfAppend() 607 { 608 ASSERT(WTF::isMainThread()); 609 610 if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling)) 611 return; 612 613 GST_TRACE("end of append data mark was received"); 614 614 void AppendPipeline::handleEndOfAppend() 615 { 616 ASSERT(WTF::isMainThread()); 617 GST_TRACE_OBJECT(this, "received end-of-append"); 618 619 // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called. 615 620 switch (m_appendState) { 616 621 case AppendState::Ongoing: 617 622 GST_TRACE("DataStarve"); 618 m_appsrcNeedDataReceived = false;619 623 setAppendState(AppendState::DataStarve); 620 624 break; 621 625 case AppendState::Sampling: 622 626 GST_TRACE("LastSample"); 623 m_appsrcNeedDataReceived = false;624 627 setAppendState(AppendState::LastSample); 625 628 break; … … 633 636 { 634 637 ASSERT(WTF::isMainThread()); 635 636 // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.637 if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {638 GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));639 // FIXME: Return ERROR and find a more robust way to detect that all the640 // data has been processed, so we don't need to resort to these hacks.641 return;642 }643 638 644 639 if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) { … … 749 744 750 745 GST_TRACE_OBJECT(this, "batchedSampleCount = %d", batchedSampleCount); 751 752 if (batchedSampleCount > 0)753 checkEndOfAppend();754 746 } 755 747 … … 758 750 ASSERT(WTF::isMainThread()); 759 751 GST_DEBUG("resetting pipeline"); 760 m_appsrcAtLeastABufferLeft = false;761 setAppsrcDataLeavingProbe();762 752 763 753 gst_element_set_state(m_pipeline.get(), GST_STATE_READY); … … 775 765 } 776 766 777 void AppendPipeline::setAppsrcDataLeavingProbe()778 {779 if (m_appsrcDataLeavingProbeId)780 return;781 782 GST_TRACE("setting appsrc data leaving probe");783 784 GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));785 m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);786 }787 788 void AppendPipeline::removeAppsrcDataLeavingProbe()789 {790 if (!m_appsrcDataLeavingProbeId)791 return;792 793 GST_TRACE("removing appsrc data leaving probe");794 795 GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));796 gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);797 m_appsrcDataLeavingProbeId = 0;798 }799 800 767 void AppendPipeline::abort() 801 768 { … … 817 784 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer) 818 785 { 819 GstFlowReturn result;820 821 786 if (m_abortPending) { 822 787 m_pendingBuffer = adoptGRef(buffer); 823 result = GST_FLOW_OK; 824 } else { 825 setAppendState(AppendPipeline::AppendState::Ongoing); 826 GST_TRACE("pushing new buffer %p", buffer); 827 result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer); 828 } 829 830 return result; 831 } 832 833 void AppendPipeline::reportAppsrcAtLeastABufferLeft() 834 { 835 GST_TRACE("buffer left appsrc, reposting to bus"); 836 GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left"); 837 GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure); 838 gst_bus_post(m_bus.get(), message); 839 } 840 841 void AppendPipeline::reportAppsrcNeedDataReceived() 842 { 843 GST_TRACE("received need-data signal at appsrc, reposting to bus"); 844 GstStructure* structure = gst_structure_new_empty("appsrc-need-data"); 845 GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure); 846 gst_bus_post(m_bus.get(), message); 788 return GST_FLOW_OK; 789 } 790 791 setAppendState(AppendPipeline::AppendState::Ongoing); 792 793 GST_TRACE_OBJECT(this, "pushing data buffer %" GST_PTR_FORMAT, buffer); 794 GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer); 795 // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should 796 // be true at this point. 797 g_return_val_if_fail(pushDataBufferRet == GST_FLOW_OK, GST_FLOW_ERROR); 798 799 // Push an additional empty buffer that marks the end of the append. 800 // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful 801 // completion of the append. 802 // 803 // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the 804 // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case 805 // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous 806 // buffer has completed. 807 808 GstBuffer* endOfAppendBuffer = gst_buffer_new(); 809 gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr); 810 811 GST_TRACE_OBJECT(this, "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer); 812 GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer); 813 g_return_val_if_fail(pushEndOfAppendBufferRet == GST_FLOW_OK, GST_FLOW_ERROR); 814 815 return GST_FLOW_OK; 847 816 } 848 817 … … 850 819 { 851 820 ASSERT(!WTF::isMainThread()); 821 if (&WTF::Thread::current() != m_streamingThread) { 822 // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe(). 823 // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first. 824 // This error will only raise if someone modifies the pipeline to include more than one streaming thread or 825 // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken. 826 // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has 827 // been demuxed completely.; 828 g_critical("Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run."); 829 RELEASE_ASSERT_NOT_REACHED(); 830 } 852 831 853 832 if (!m_playerPrivate || m_appendState == AppendState::Invalid) { … … 1034 1013 return; 1035 1014 default: 1036 // No useful data, but notify anyway to complete the append operation. 1037 GST_DEBUG("Received all pending samples (no data)"); 1038 m_sourceBufferPrivate->didReceiveAllPendingSamples(); 1015 // No useful data. 1039 1016 break; 1040 1017 } … … 1077 1054 gst_bus_post(appendPipeline->bus(), message); 1078 1055 GST_TRACE("appsink-caps-changed message posted to bus"); 1079 }1080 1081 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)1082 {1083 ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);1084 1085 GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);1086 gsize bufferSize = gst_buffer_get_size(buffer);1087 1088 GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);1089 1090 appendPipeline->reportAppsrcAtLeastABufferLeft();1091 1092 return GST_PAD_PROBE_OK;1093 1056 } 1094 1057 … … 1130 1093 GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer)); 1131 1094 return GST_PAD_PROBE_DROP; 1132 }1133 1134 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)1135 {1136 appendPipeline->reportAppsrcNeedDataReceived();1137 1095 } 1138 1096 -
trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h
r236409 r236547 30 30 #include <atomic> 31 31 #include <gst/gst.h> 32 #include <mutex> 32 33 #include <wtf/Condition.h> 34 #include <wtf/Threading.h> 33 35 34 36 namespace WebCore { … … 65 67 void appsinkNewSample(GRefPtr<GstSample>&&); 66 68 void appsinkEOS(); 69 void handleEndOfAppend(); 67 70 void didReceiveInitializationSegment(); 68 71 AtomicString trackId(); … … 86 89 void connectDemuxerSrcPadToAppsink(GstPad*); 87 90 88 void reportAppsrcAtLeastABufferLeft();89 void reportAppsrcNeedDataReceived();90 91 91 private: 92 92 void resetPipeline(); 93 93 void checkEndOfAppend(); 94 void handleAppsrcAtLeastABufferLeft();95 void handleAppsrcNeedDataReceived();96 void removeAppsrcDataLeavingProbe();97 void setAppsrcDataLeavingProbe();98 94 void demuxerNoMorePads(); 99 95 100 96 void consumeAppsinkAvailableSamples(); 97 98 GstPadProbeReturn appsrcEndOfAppendCheckerProbe(GstPadProbeInfo*); 99 100 static void staticInitialization(); 101 102 static std::once_flag s_staticInitializationFlag; 103 static GType s_endOfAppendMetaType; 104 static const GstMetaInfo* s_webKitEndOfAppendMetaInfo; 105 106 // Used only for asserting that there is only one streaming thread. 107 // Only the pointers are compared. 108 WTF::Thread* m_streamingThread; 101 109 102 110 Ref<MediaSourceClientGStreamerMSE> m_mediaSourceClient; … … 131 139 FloatSize m_presentationSize; 132 140 133 bool m_appsrcAtLeastABufferLeft;134 bool m_appsrcNeedDataReceived;135 136 gulong m_appsrcDataLeavingProbeId;137 141 #if !LOG_DISABLED 138 142 struct PadProbeInformation m_demuxerDataEnteringPadProbeInformation;
Note: See TracChangeset
for help on using the changeset viewer.