Changeset 260755 in webkit


Ignore:
Timestamp:
Apr 27, 2020 8:30:02 AM (4 years ago)
Author:
aboya@igalia.com
Message:

[GStreamer] Rework WebKitWebSrc threading
Source/WebCore:

https://bugs.webkit.org/show_bug.cgi?id=210284

Reviewed by Xabier Rodriguez-Calvar.

WebKitWebSrc as it is in master has a number of race conditions
leading to occasional starvation (due to cancelling the wrong request)
or data corruption (due to pushing data from a cancelled request).

The threading situation wasn't easy to follow, as it wasn't clear
access to what members should be protected by what mutex, in what
circumstances. Also, some parts of the design were also introducing
addicional complexity, such as the first request being sent from the
main thread whereas the rest were being sent from the streaming thread
or basesrc async start.

In response, this patch reworks all the locking in WebKitWebSrc to use
WTF::DataMutex. This ensures all accesses to its (now explicit)
protected members are locked. The two mutexes and condition variables
have been simplified into one, as there was no obvious need or benefit
for two of each in this case.

Requests have been numbered, which allows to safely and atomically
ignore results from cancelled requests, avoiding data corruption
races, and makes following them in debug logs much easier.

The conditions for making and cancelling requests have been simplified
to a simpler and safer model: There is at most only one active request
at anytime, flushes cancel the request, and the first create() call
always makes the new request (both at startup and after a flush).
Debug asserts and notes about the flow of operations during basesrc
seeks have been provided.

As this effort needed a review of the entire WebKitWebSrc, cleanups,
corrections and documentation comments have been provided where
appropriate.

This patch introduces no visible behavior changes, just stability
improvements.

  • platform/graphics/gstreamer/GRefPtrGStreamer.h:
  • platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:

(WebKitWebSrcPrivate::~WebKitWebSrcPrivate):
(webkit_web_src_class_init):
(webkitWebSrcReset):
(webKitWebSrcConstructed):
(webKitWebSrcSetProperty):
(webKitWebSrcGetProperty):
(webKitWebSrcSetContext):
(webKitWebSrcSendEvent):
(restartLoaderIfNeeded):
(stopLoaderIfNeeded):
(webKitWebSrcCreate):
(webKitWebSrcStart):
(webKitWebSrcMakeRequest):
(webKitWebSrcStop):
(webKitWebSrcGetSize):
(webKitWebSrcIsSeekable):
(webKitWebSrcDoSeek):
(webKitWebSrcQuery):
(webKitWebSrcUnLock):
(webKitWebSrcUnLockStop):
(webKitWebSrcSetUri):
(webKitWebSrcSetMediaPlayer):
(webKitSrcPassedCORSAccessCheck):
(CachedResourceStreamingClient::CachedResourceStreamingClient):
(CachedResourceStreamingClient::checkUpdateBlocksize):
(CachedResourceStreamingClient::responseReceived):
(CachedResourceStreamingClient::dataReceived):
(CachedResourceStreamingClient::accessControlCheckFailed):
(CachedResourceStreamingClient::loadFailed):
(CachedResourceStreamingClient::loadFinished):
(webKitSrcWouldTaintOrigin):

  • platform/graphics/gstreamer/WebKitWebSourceGStreamer.h:

LayoutTests:

https://bugs.webkit.org/show_bug.cgi?id=209811

Reviewed by Xabier Rodriguez-Calvar.

A test improved its status in TestExpectations from the changes made
in this patch.

  • platform/gtk/TestExpectations:
Location:
trunk
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/LayoutTests/ChangeLog

    r260737 r260755  
     12020-04-27  Alicia Boya García  <aboya@igalia.com>
     2
     3        [GStreamer] Rework WebKitWebSrc threading
     4        https://bugs.webkit.org/show_bug.cgi?id=209811
     5
     6        Reviewed by Xabier Rodriguez-Calvar.
     7
     8        A test improved its status in TestExpectations from the changes made
     9        in this patch.
     10
     11        * platform/gtk/TestExpectations:
     12
    1132020-04-26  Lauro Moura  <lmoura@igalia.com>
    214
  • trunk/LayoutTests/platform/gtk/TestExpectations

    r260737 r260755  
    12921292webkit.org/b/206656 imported/w3c/web-platform-tests/mediacapture-streams/MediaStream-MediaElement-preload-none-manual.https.html [ Crash Failure Pass ]
    12931293
    1294 webkit.org/b/206657 imported/w3c/web-platform-tests/html/semantics/embedded-content/the-video-element/resize-during-playback.html [ Failure ]
    1295 
    12961294# LFC (LayoutFormatingContext is disabled by default).
    12971295fast/layoutformattingcontext/ [ Skip ]
  • trunk/Source/WebCore/ChangeLog

    r260753 r260755  
     12020-04-27  Alicia Boya García  <aboya@igalia.com>
     2
     3        [GStreamer] Rework WebKitWebSrc threading
     4        https://bugs.webkit.org/show_bug.cgi?id=210284
     5
     6        Reviewed by Xabier Rodriguez-Calvar.
     7
     8        WebKitWebSrc as it is in master has a number of race conditions
     9        leading to occasional starvation (due to cancelling the wrong request)
     10        or data corruption (due to pushing data from a cancelled request).
     11
     12        The threading situation wasn't easy to follow, as it wasn't clear
     13        access to what members should be protected by what mutex, in what
     14        circumstances. Also, some parts of the design were also introducing
     15        addicional complexity, such as the first request being sent from the
     16        main thread whereas the rest were being sent from the streaming thread
     17        or basesrc async start.
     18
     19        In response, this patch reworks all the locking in WebKitWebSrc to use
     20        WTF::DataMutex. This ensures all accesses to its (now explicit)
     21        protected members are locked. The two mutexes and condition variables
     22        have been simplified into one, as there was no obvious need or benefit
     23        for two of each in this case.
     24
     25        Requests have been numbered, which allows to safely and atomically
     26        ignore results from cancelled requests, avoiding data corruption
     27        races, and makes following them in debug logs much easier.
     28
     29        The conditions for making and cancelling requests have been simplified
     30        to a simpler and safer model: There is at most only one active request
     31        at anytime, flushes cancel the request, and the first create() call
     32        always makes the new request (both at startup and after a flush).
     33        Debug asserts and notes about the flow of operations during basesrc
     34        seeks have been provided.
     35
     36        As this effort needed a review of the entire WebKitWebSrc, cleanups,
     37        corrections and documentation comments have been provided where
     38        appropriate.
     39
     40        This patch introduces no visible behavior changes, just stability
     41        improvements.
     42
     43        * platform/graphics/gstreamer/GRefPtrGStreamer.h:
     44        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
     45        (WebKitWebSrcPrivate::~WebKitWebSrcPrivate):
     46        (webkit_web_src_class_init):
     47        (webkitWebSrcReset):
     48        (webKitWebSrcConstructed):
     49        (webKitWebSrcSetProperty):
     50        (webKitWebSrcGetProperty):
     51        (webKitWebSrcSetContext):
     52        (webKitWebSrcSendEvent):
     53        (restartLoaderIfNeeded):
     54        (stopLoaderIfNeeded):
     55        (webKitWebSrcCreate):
     56        (webKitWebSrcStart):
     57        (webKitWebSrcMakeRequest):
     58        (webKitWebSrcStop):
     59        (webKitWebSrcGetSize):
     60        (webKitWebSrcIsSeekable):
     61        (webKitWebSrcDoSeek):
     62        (webKitWebSrcQuery):
     63        (webKitWebSrcUnLock):
     64        (webKitWebSrcUnLockStop):
     65        (webKitWebSrcSetUri):
     66        (webKitWebSrcSetMediaPlayer):
     67        (webKitSrcPassedCORSAccessCheck):
     68        (CachedResourceStreamingClient::CachedResourceStreamingClient):
     69        (CachedResourceStreamingClient::checkUpdateBlocksize):
     70        (CachedResourceStreamingClient::responseReceived):
     71        (CachedResourceStreamingClient::dataReceived):
     72        (CachedResourceStreamingClient::accessControlCheckFailed):
     73        (CachedResourceStreamingClient::loadFailed):
     74        (CachedResourceStreamingClient::loadFinished):
     75        (webKitSrcWouldTaintOrigin):
     76        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.h:
     77
    1782020-04-26  Darin Adler  <darin@apple.com>
    279
  • trunk/Source/WebCore/platform/graphics/gstreamer/GRefPtrGStreamer.h

    r254682 r260755  
    2626
    2727typedef struct _WebKitVideoSink WebKitVideoSink;
    28 typedef struct _WebKitWebSrc WebKitWebSrc;
     28struct WebKitWebSrc;
    2929
    3030#if USE(GSTREAMER_GL)
  • trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp

    r257977 r260755  
    22 *  Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
    33 *  Copyright (C) 2013 Collabora Ltd.
    4  *  Copyright (C) 2019 Igalia S.L.
     4 *  Copyright (C) 2019, 2020 Igalia S.L.
    55 *
    66 *  This library is free software; you can redistribute it and/or
     
    2626#include "GStreamerCommon.h"
    2727#include "HTTPHeaderNames.h"
    28 #include "MainThreadNotifier.h"
    2928#include "MediaPlayer.h"
    3029#include "PlatformMediaResourceLoader.h"
     
    3534#include <cstdint>
    3635#include <wtf/Condition.h>
     36#include <wtf/DataMutex.h>
     37#include <wtf/RunLoop.h>
    3738#include <wtf/Scope.h>
    3839#include <wtf/glib/WTFGType.h>
     
    4041
    4142using namespace WebCore;
     43using WTF::DataMutex;
    4244
    4345// Never pause download of media resources smaller than 2MiB.
     
    5658    WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
    5759public:
    58     CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&);
     60    CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&, unsigned requestNumber);
    5961    virtual ~CachedResourceStreamingClient();
    6062
     
    8183    int m_reduceBlocksizeCount { 0 };
    8284    int m_increaseBlocksizeCount { 0 };
     85    unsigned m_requestNumber;
    8386
    8487    GRefPtr<GstElement> m_src;
     
    8790};
    8891
    89 enum MainThreadSourceNotification {
    90     Start = 1 << 0,
    91     Stop = 1 << 1,
    92     Dispose = 1 << 2,
    93 };
    94 
    95 struct _WebKitWebSrcPrivate {
    96     ~_WebKitWebSrcPrivate()
    97     {
    98         if (notifier && notifier->isValid()) {
    99             notifier->notifyAndWait(MainThreadSourceNotification::Dispose, [&] {
    100                 if (resource) {
    101                     auto* client = static_cast<CachedResourceStreamingClient*>(resource->client());
    102                     if (client)
    103                         client->setSourceElement(nullptr);
    104 
    105                     resource->setClient(nullptr);
    106                 }
    107                 loader = nullptr;
    108             });
    109             notifier->invalidate();
    110             notifier = nullptr;
    111         }
    112     }
    113 
     92struct WebKitWebSrcPrivate {
     93    // Constants initialized during construction:
     94    unsigned minimumBlocksize;
     95
     96    // Configuration of the element (properties set by the user of WebKitWebSrc):
     97    // They can only change when state < PAUSED.
    11498    CString originalURI;
    115     CString redirectedURI;
    11699    bool keepAlive;
    117100    GUniquePtr<GstStructure> extraHeaders;
    118101    bool compress;
    119102    GUniquePtr<gchar> httpMethod;
    120     WebCore::MediaPlayer* player;
    121     RefPtr<PlatformMediaResourceLoader> loader;
    122     RefPtr<PlatformMediaResource> resource;
    123     RefPtr<MainThreadNotifier<MainThreadSourceNotification>> notifier;
    124     bool didPassAccessControlCheck;
    125     bool wereHeadersReceived;
    126     Condition headersCondition;
    127     Lock responseLock;
    128     bool wasResponseReceived;
    129     Condition responseCondition;
    130     bool doesHaveEOS;
    131     bool isFlushing { false };
    132     uint64_t readPosition;
    133     uint64_t requestedPosition;
    134     uint64_t stopPosition;
    135     bool isDurationSet;
    136     bool haveSize;
    137     uint64_t size;
    138     bool isSeekable;
    139     bool isSeeking;
    140     bool wasSeeking { false };
    141     unsigned minimumBlocksize;
    142     Lock adapterLock;
    143     Condition adapterCondition;
    144     bool isDownloadSuspended { false };
    145     GRefPtr<GstAdapter> adapter;
    146     GRefPtr<GstEvent> httpHeadersEvent;
    147     GUniquePtr<GstStructure> httpHeaders;
    148     WallTime downloadStartTime { WallTime::nan() };
    149     uint64_t totalDownloadedBytes { 0 };
     103
     104    struct StreamingMembers {
     105        ~StreamingMembers()
     106        {
     107            // By the time we're destroying WebKitWebSrcPrivate unLock() should have been called and therefore resource
     108            // should have already been cleared.
     109            ASSERT(!resource);
     110            // ResourceLoader is not thread-safe. It's not even ThreadSafeRefCounted. Therefore, to be safe, we want the
     111            // unref to happen in the main thread.
     112            if (loader)
     113                RunLoop::main().dispatch([loader = WTFMove(loader)] { });
     114        }
     115
     116        // Properties initially empty, but set once the first HTTP response arrives:
     117        bool wasResponseReceived;
     118        CString redirectedURI;
     119        bool didPassAccessControlCheck;
     120        bool haveSize;
     121        uint64_t size;
     122        bool isSeekable;
     123        GRefPtr<GstCaps> pendingCaps;
     124        GRefPtr<GstMessage> pendingHttpHeadersMessage; // Set from MT, sent from create().
     125        GRefPtr<GstEvent> pendingHttpHeadersEvent; // Set from MT, sent from create().
     126
     127        // Properties updated with every downloaded data block:
     128        WallTime downloadStartTime { WallTime::nan() };
     129        uint64_t totalDownloadedBytes { 0 };
     130        bool doesHaveEOS; // Set both when we reach stopPosition and on errors (including on responseReceived).
     131        bool isDownloadSuspended { false }; // Set to true from the network handler when the high water level is reached.
     132
     133        // Obtained by means of GstContext queries before making the first HTTP request.
     134        // We use it for getting access to WebKit networking objects: the PlatformMediaResourceLoader factory [createResourceLoader()]
     135        // and the player HTTP referrer string.
     136        WebCore::MediaPlayer* player;
     137
     138        // Properties used for GStreamer data-flow in create().
     139        bool isFlushing { false };
     140        Condition responseCondition; // Must be signaled after any updates on HTTP requests, and when flushing.
     141        GRefPtr<GstAdapter> adapter;
     142        bool isDurationSet;
     143        uint64_t readPosition;
     144
     145        // Properties only set during seek.
     146        // basesrc ensures they can't change during a create() call by taking the STREAMING_LOCK.
     147        // (An initial seek is also guaranteed by basesrc.)
     148        unsigned requestNumber { 1 };
     149        uint64_t requestedPosition { 0 };
     150        uint64_t stopPosition { UINT64_MAX };
     151
     152        bool isRequestPending { true };
     153
     154        RefPtr<PlatformMediaResourceLoader> loader;
     155        RefPtr<PlatformMediaResource> resource;
     156    };
     157    DataMutex<StreamingMembers> dataMutex;
    150158};
    151159
     
    170178static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
    171179static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
    172 static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
    173180static GstFlowReturn webKitWebSrcCreate(GstPushSrc*, GstBuffer**);
    174 static gboolean webKitWebSrcMakeRequest(GstBaseSrc*, bool);
     181static void webKitWebSrcMakeRequest(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
    175182static gboolean webKitWebSrcStart(GstBaseSrc*);
    176183static gboolean webKitWebSrcStop(GstBaseSrc*);
     
    182189static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
    183190static void webKitWebSrcSetContext(GstElement*, GstContext*);
    184 static void restartLoaderIfNeeded(WebKitWebSrc*);
    185 static void stopLoaderIfNeeded(WebKitWebSrc*);
     191static void restartLoaderIfNeeded(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
     192static void stopLoaderIfNeeded(WebKitWebSrc*, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper&);
    186193
    187194#define webkit_web_src_parent_class parent_class
     
    231238            nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
    232239
    233     eklass->change_state = GST_DEBUG_FUNCPTR(webKitWebSrcChangeState);
    234240    eklass->set_context = GST_DEBUG_FUNCPTR(webKitWebSrcSetContext);
    235241
     
    248254}
    249255
    250 static void webkitWebSrcReset(WebKitWebSrc* src)
    251 {
    252     WebKitWebSrcPrivate* priv = src->priv;
    253 
     256enum class ResetType {
     257    Soft,
     258    Hard
     259};
     260
     261static void webkitWebSrcReset(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members, ResetType resetType)
     262{
    254263    GST_DEBUG_OBJECT(src, "Resetting internal state");
    255     priv->haveSize = false;
    256     priv->wereHeadersReceived = false;
    257     priv->isSeekable = false;
    258     priv->readPosition = 0;
    259     priv->requestedPosition = 0;
    260     priv->stopPosition = -1;
    261     priv->size = 0;
     264    gst_adapter_clear(members->adapter.get());
     265    members->isRequestPending = true;
     266
     267    // Reset request state. Any previous request has been cancelled at this point.
     268    members->wasResponseReceived = false;
     269    members->doesHaveEOS = false;
     270    members->downloadStartTime = WallTime::nan();
     271    members->totalDownloadedBytes = 0; // Resetted for each request, used to estimate download speed.
     272    members->pendingHttpHeadersMessage = nullptr;
     273    members->pendingHttpHeadersEvent = nullptr;
     274
     275    // After a flush, we have to emit a segment again.
     276    members->isDurationSet = false;
     277
     278    // Hard reset is done during initialization and state transitions.
     279    // Soft reset is done during flushes. In these, we preserve the seek target.
     280    if (resetType == ResetType::Hard) {
     281        members->didPassAccessControlCheck = false;
     282        members->redirectedURI = CString();
     283        members->isSeekable = false;
     284        members->haveSize = false;
     285        members->size = 0;
     286        members->requestedPosition = 0;
     287        members->stopPosition = UINT64_MAX;
     288        members->readPosition = members->requestedPosition;
     289    }
    262290}
    263291
     
    269297    WebKitWebSrcPrivate* priv = src->priv;
    270298
    271     priv->notifier = MainThreadNotifier<MainThreadSourceNotification>::create();
    272     priv->adapter = adoptGRef(gst_adapter_new());
    273299    priv->minimumBlocksize = gst_base_src_get_blocksize(GST_BASE_SRC_CAST(src));
    274300
    275     webkitWebSrcReset(src);
     301    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     302    members->adapter = adoptGRef(gst_adapter_new());
     303    webkitWebSrcReset(src, members, ResetType::Hard);
     304
    276305    gst_base_src_set_automatic_eos(GST_BASE_SRC_CAST(src), FALSE);
    277     gst_base_src_set_async(GST_BASE_SRC_CAST(src), TRUE);
    278306}
    279307
     
    315343        g_value_set_string(value, priv->originalURI.data());
    316344        break;
    317     case PROP_RESOLVED_LOCATION:
    318         g_value_set_string(value, priv->redirectedURI.isNull() ? priv->originalURI.data() : priv->redirectedURI.data());
    319         break;
     345    case PROP_RESOLVED_LOCATION: {
     346        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     347        g_value_set_string(value, members->redirectedURI.isNull() ? priv->originalURI.data() : members->redirectedURI.data());
     348        break;
     349    }
    320350    case PROP_KEEP_ALIVE:
    321351        g_value_set_boolean(value, priv->keepAlive);
     
    344374    if (gst_context_has_context_type(context, WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME)) {
    345375        const GValue* value = gst_structure_get_value(gst_context_get_structure(context), "player");
    346         priv->player = reinterpret_cast<MediaPlayer*>(g_value_get_pointer(value));
     376        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     377        members->player = reinterpret_cast<MediaPlayer*>(g_value_get_pointer(value));
    347378    }
    348379    GST_ELEMENT_CLASS(parent_class)->set_context(element, context);
    349380}
    350381
    351 static void restartLoaderIfNeeded(WebKitWebSrc* src)
    352 {
    353     WebKitWebSrcPrivate* priv = src->priv;
    354 
    355     if (!priv->isDownloadSuspended) {
     382static void restartLoaderIfNeeded(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
     383{
     384    if (!members->isDownloadSuspended) {
    356385        GST_TRACE_OBJECT(src, "download already active");
    357386        return;
     
    359388
    360389    GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT
    361         " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize)
    362         , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
    363     if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
     390        " (min %u)", boolForPrinting(members->isDownloadSuspended), boolForPrinting(members->doesHaveEOS), boolForPrinting(members->haveSize)
     391        , boolForPrinting(members->isSeekable), members->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
     392    if (members->doesHaveEOS || !members->haveSize || !members->isSeekable || members->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
    364393        GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
    365394        return;
    366395    }
    367     GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src)));
    368     if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) {
     396    GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", members->readPosition, gst_element_state_get_name(GST_STATE(src)));
     397    if (!members->readPosition || members->readPosition == members->size || GST_STATE(src) < GST_STATE_PAUSED) {
    369398        GST_TRACE_OBJECT(src, "can't restart download");
    370399        return;
    371400    }
    372401
    373     size_t queueSize = gst_adapter_available(priv->adapter.get());
    374     GST_TRACE_OBJECT(src, "queue size %zd (min %1.0f)", queueSize
    375         , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
    376 
    377     if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
     402    size_t queueSize = gst_adapter_available(members->adapter.get());
     403    GST_TRACE_OBJECT(src, "queue size %zu (min %1.0f)", queueSize
     404        , members->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
     405
     406    if (queueSize >= members->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
    378407        GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download");
    379408        return;
     
    381410
    382411    GST_DEBUG_OBJECT(src, "restarting download");
    383     priv->isDownloadSuspended = false;
    384     webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false);
    385 }
    386 
    387 
    388 static void stopLoaderIfNeeded(WebKitWebSrc* src)
    389 {
    390     WebKitWebSrcPrivate* priv = src->priv;
    391 
    392     if (priv->isDownloadSuspended) {
     412    members->isDownloadSuspended = false;
     413    members->requestNumber++;
     414    members->requestedPosition = members->readPosition;
     415    webKitWebSrcMakeRequest(src, members);
     416}
     417
     418
     419static void stopLoaderIfNeeded(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
     420{
     421    ASSERT(isMainThread());
     422
     423    if (members->isDownloadSuspended) {
    393424        GST_TRACE_OBJECT(src, "download already suspended");
    394425        return;
     
    396427
    397428    GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)"
    398         , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size
     429        , boolForPrinting(members->isDownloadSuspended), boolForPrinting(members->haveSize), boolForPrinting(members->isSeekable), members->size
    399430        , SMALL_MEDIA_RESOURCE_MAX_SIZE);
    400     if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
     431    if (!members->isSeekable || members->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
    401432        GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
    402433        return;
    403434    }
    404435
    405     size_t queueSize = gst_adapter_available(priv->adapter.get());
    406     GST_TRACE_OBJECT(src, "queue size %zd (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
    407     if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
     436    size_t queueSize = gst_adapter_available(members->adapter.get());
     437    GST_TRACE_OBJECT(src, "queue size %zu (max %1.0f)", queueSize, members->size * HIGH_QUEUE_FACTOR_THRESHOLD);
     438    if (queueSize <= members->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
    408439        GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download");
    409440        return;
    410441    }
    411442
    412     GST_DEBUG_OBJECT(src, "stopping download");
    413     priv->isDownloadSuspended = true;
    414     priv->resource->stop();
     443    if (members->readPosition == members->size) {
     444        GST_TRACE_OBJECT(src, "just downloaded the last chunk in the file, loadFinished() is about to be called");
     445        return;
     446    }
     447
     448    GST_DEBUG_OBJECT(src, "R%u: stopping download", members->requestNumber);
     449    members->isDownloadSuspended = true;
     450    members->resource->stop();
    415451}
    416452
    417453static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
    418454{
     455    ASSERT(!isMainThread());
    419456    GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(pushSrc);
    420457    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    421458    WebKitWebSrcPrivate* priv = src->priv;
    422 
    423     GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, priv->readPosition, priv->requestedPosition);
    424 
    425     if (priv->requestedPosition != priv->readPosition) {
    426         {
    427             LockHolder adapterLocker(priv->adapterLock);
    428             GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
    429             gst_adapter_clear(priv->adapter.get());
    430         }
    431         uint64_t requestedPosition = priv->requestedPosition;
    432         webKitWebSrcStop(baseSrc);
    433         priv->requestedPosition = requestedPosition;
    434         // Do not notify async-completion, in seeking flows, we will
    435         // be called from GstBaseSrc's perform_seek vfunc, which holds
    436         // a streaming lock in our frame. Hence, we would deadlock
    437         // trying to notify async completion, since that also requires
    438         // the streaming lock.
    439         webKitWebSrcMakeRequest(baseSrc, false);
    440     }
    441 
    442     {
    443         LockHolder locker(priv->responseLock);
    444         priv->responseCondition.wait(priv->responseLock, [priv] () {
    445             return priv->wasResponseReceived || priv->isFlushing;
     459    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     460
     461    // We need members->player to make requests. There are two mechanisms for this.
     462    //
     463    // 1) webKitWebSrcSetMediaPlayer() is called by MediaPlayerPrivateGStreamer by means of hooking playbin's
     464    //    "source-setup" event. This doesn't work for additional WebKitWebSrc elements created by adaptivedemux.
     465    //
     466    // 2) A GstContext query made here. Because of a bug, this only works in GStreamer >= 1.12.
     467    //
     468    // As a compatibility workaround, the http: URI protocol is only registered for gst>=1.12; otherwise using
     469    // webkit+http:, which is used by MediaPlayerPrivateGStreamer but not by adaptivedemux's additional source
     470    // elements, therefore using souphttpsrc instead and not routing traffic through the NetworkProcess.
     471    if (webkitGstCheckVersion(1, 12, 0) && !members->player) {
     472        members.runUnlocked([src, baseSrc]() {
     473            GRefPtr<GstQuery> query = adoptGRef(gst_query_new_context(WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
     474            if (gst_pad_peer_query(GST_BASE_SRC_PAD(baseSrc), query.get())) {
     475                GstContext* context;
     476
     477                gst_query_parse_context(query.get(), &context);
     478                gst_element_set_context(GST_ELEMENT_CAST(src), context);
     479            } else
     480                gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_need_context(GST_OBJECT_CAST(src), WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
    446481        });
    447     }
     482        if (members->isFlushing)
     483            return GST_FLOW_FLUSHING;
     484    }
     485    RELEASE_ASSERT(members->player);
     486
     487    GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, members->readPosition, members->requestedPosition);
     488
     489    if (members->isRequestPending) {
     490        members->isRequestPending = false;
     491        webKitWebSrcMakeRequest(src, members);
     492    }
     493
     494    // Wait for the response headers.
     495    members->responseCondition.wait(members.mutex(), [&] () {
     496        return members->wasResponseReceived || members->isFlushing;
     497    });
     498
     499    if (members->isFlushing)
     500        return GST_FLOW_FLUSHING;
     501
     502    if (members->pendingCaps) {
     503        GST_DEBUG_OBJECT(src, "Setting caps: %" GST_PTR_FORMAT, members->pendingCaps.get());
     504        members.runUnlocked([baseSrc, caps = members->pendingCaps.leakRef()]() {
     505            gst_base_src_set_caps(baseSrc, caps);
     506        });
     507        if (members->isFlushing)
     508            return GST_FLOW_FLUSHING;
     509    }
     510
     511    if (members->haveSize && !members->isDurationSet) {
     512        GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, members->size);
     513        baseSrc->segment.duration = members->size;
     514        members->isDurationSet = true;
     515        gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
     516    }
     517
     518    if (members->pendingHttpHeadersMessage)
     519        gst_element_post_message(GST_ELEMENT(src), members->pendingHttpHeadersMessage.leakRef());
     520    if (members->pendingHttpHeadersEvent)
     521        gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), members->pendingHttpHeadersEvent.leakRef());
     522
     523    restartLoaderIfNeeded(src, members);
    448524
    449525    // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
    450526    // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
    451527    // other buffers can be retrieved with the "non _fast" API.
    452     GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing)
    453         , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended));
    454 
    455     if (priv->doesHaveEOS) {
    456         GST_DEBUG_OBJECT(src, "EOS");
    457         return GST_FLOW_EOS;
    458     }
     528    GST_TRACE_OBJECT(src, "doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(members->doesHaveEOS), boolForPrinting(members->isDownloadSuspended));
    459529
    460530    unsigned size = gst_base_src_get_blocksize(baseSrc);
    461     size_t queueSize;
    462     {
    463         LockHolder adapterLocker(priv->adapterLock);
    464         unsigned retries = 0;
    465         queueSize = gst_adapter_available(priv->adapter.get());
    466         GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
    467         while (!queueSize && !priv->isFlushing) {
    468             GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data");
    469             priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] {
    470                 restartLoaderIfNeeded(src);
    471                 return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get()));
    472             });
    473             queueSize = gst_adapter_available(priv->adapter.get());
    474             GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
    475             if (queueSize || priv->isFlushing) {
    476                 // We have data or we're flushing. We can break the loop here.
    477                 break;
    478             }
    479 
    480             // We should keep waiting but we could be in EOS. Let's check the two possibilities:
    481             // 1. We are at the end of the file with a known size.
    482             // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible.
    483             if (priv->haveSize && priv->readPosition >= priv->size) {
    484                 GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS");
    485                 return GST_FLOW_EOS;
    486             }
    487             GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1);
    488             if (!priv->isDownloadSuspended && ++retries >= 10) {
    489                 GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS");
    490                 return GST_FLOW_EOS;
    491             }
    492         }
    493     }
    494 
    495     if (priv->isFlushing) {
    496         GST_DEBUG_OBJECT(src, "Flushing");
    497         return GST_FLOW_FLUSHING;
    498     }
    499 
    500     if (priv->haveSize && !priv->isDurationSet) {
    501         GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, priv->size);
    502         baseSrc->segment.duration = priv->size;
    503         priv->isDurationSet = true;
    504         gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
    505     }
    506 
    507     if (priv->httpHeadersEvent)
    508         gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), priv->httpHeadersEvent.leakRef());
    509 
    510     {
    511         LockHolder adapterLocker(priv->adapterLock);
    512         queueSize = gst_adapter_available(priv->adapter.get());
     531    size_t queueSize = gst_adapter_available(members->adapter.get());
     532    GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
     533    if (!queueSize) {
     534        GST_TRACE_OBJECT(src, "let's wait for data or EOS");
     535        members->responseCondition.wait(members.mutex(), [&] {
     536            return members->isFlushing || gst_adapter_available(members->adapter.get()) || members->doesHaveEOS;
     537        });
     538        if (members->isFlushing)
     539            return GST_FLOW_FLUSHING;
     540
     541        queueSize = gst_adapter_available(members->adapter.get());
     542        GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
     543    }
     544
     545    if (queueSize) {
    513546        if (queueSize < size) {
    514547            GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize);
     
    516549        } else
    517550            GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
    518         if (size) {
    519             *buffer = gst_adapter_take_buffer(priv->adapter.get(), size);
    520             RELEASE_ASSERT(*buffer);
    521 
    522             GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
    523             GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
    524             GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
    525             GST_TRACE_OBJECT(src, "doesHaveEOS: %s, wasSeeking: %s, seeking: %s, buffer size: %u, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->wasSeeking), boolForPrinting(priv->isSeeking), size, priv->size);
    526             if (priv->haveSize && GST_BUFFER_OFFSET_END(*buffer) >= priv->size) {
    527                 if (priv->wasSeeking)
    528                     priv->wasSeeking = false;
    529                 else if (priv->isSeekable)
    530                     priv->doesHaveEOS = true;
    531             } else if (priv->wasSeeking)
    532                 priv->wasSeeking = false;
    533 
    534             restartLoaderIfNeeded(src);
    535         } else {
    536             GST_ERROR_OBJECT(src, "Empty adapter!");
    537             ASSERT_NOT_REACHED();
    538         }
    539     }
    540 
    541     return GST_FLOW_OK;
     551
     552        *buffer = gst_adapter_take_buffer(members->adapter.get(), size);
     553        RELEASE_ASSERT(*buffer);
     554
     555        GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
     556        GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
     557        GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
     558        GST_TRACE_OBJECT(src, "buffer size: %u, total content size: %" G_GUINT64_FORMAT, size, members->size);
     559
     560        restartLoaderIfNeeded(src, members);
     561        return GST_FLOW_OK;
     562    }
     563
     564    // If the queue is empty reached this point, the only other option is that we are in EOS.
     565    ASSERT(members->doesHaveEOS);
     566    GST_DEBUG_OBJECT(src, "Reached the end of the response, signalling EOS");
     567    return GST_FLOW_EOS;
    542568}
    543569
     
    595621static gboolean webKitWebSrcStart(GstBaseSrc* baseSrc)
    596622{
    597     // This method should only be called by BaseSrc, do not call it
    598     // from ourselves unless you ensure the streaming lock is not
    599     // held. If it is, you will deadlock the WebProcess.
    600     return webKitWebSrcMakeRequest(baseSrc, true);
    601 }
    602 
    603 static gboolean webKitWebSrcMakeRequest(GstBaseSrc* baseSrc, bool notifyAsyncCompletion)
    604 {
    605623    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
     624
     625    if (src->priv->originalURI.isNull()) {
     626        GST_ERROR_OBJECT(src, "No URI provided");
     627        return FALSE;
     628    }
     629    return TRUE;
     630}
     631
     632static void webKitWebSrcMakeRequest(WebKitWebSrc* src, DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper& members)
     633{
    606634    WebKitWebSrcPrivate* priv = src->priv;
    607635
    608     if (webkitGstCheckVersion(1, 12, 0) && !priv->player) {
    609         GRefPtr<GstQuery> query = adoptGRef(gst_query_new_context(WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
    610         if (gst_pad_peer_query(GST_BASE_SRC_PAD(baseSrc), query.get())) {
    611             GstContext* context;
    612 
    613             gst_query_parse_context(query.get(), &context);
    614             gst_element_set_context(GST_ELEMENT_CAST(src), context);
    615         } else
    616             gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_need_context(GST_OBJECT_CAST(src), WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME));
    617     }
    618 
    619     RELEASE_ASSERT(priv->player);
    620 
    621     priv->wereHeadersReceived = false;
    622     priv->wasResponseReceived = false;
    623     priv->isDurationSet = false;
    624     priv->doesHaveEOS = false;
    625     priv->isFlushing = false;
    626     priv->downloadStartTime = WallTime::nan();
    627 
    628     priv->didPassAccessControlCheck = false;
    629 
    630     if (priv->originalURI.isNull()) {
    631         GST_ERROR_OBJECT(src, "No URI provided");
    632         webKitWebSrcStop(baseSrc);
    633         return FALSE;
    634     }
    635 
    636     if (priv->requestedPosition == priv->stopPosition) {
    637         GST_DEBUG_OBJECT(src, "Empty segment, signaling EOS");
    638         priv->doesHaveEOS = true;
    639         return FALSE;
    640     }
    641 
    642     GST_DEBUG_OBJECT(src, "Fetching %s", priv->originalURI.data());
     636    ASSERT(!priv->originalURI.isNull());
     637    ASSERT(members->requestedPosition != members->stopPosition);
     638
     639    GST_DEBUG_OBJECT(src, "Posting task to request R%u %s requestedPosition=%" G_GUINT64_FORMAT " stopPosition=%" G_GUINT64_FORMAT, members->requestNumber, priv->originalURI.data(), members->requestedPosition, members->stopPosition);
    643640    URL url = URL(URL(), priv->originalURI.data());
    644641
     
    647644    request.setFirstPartyForCookies(url);
    648645
    649     request.setHTTPReferrer(priv->player->referrer());
     646    request.setHTTPReferrer(members->player->referrer());
    650647
    651648    if (priv->httpMethod.get())
     
    671668        request.setHTTPUserAgent("Quicktime/7.6.6");
    672669
    673     if (priv->requestedPosition) {
    674         GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedPosition));
     670    if (members->requestedPosition) {
     671        GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", members->requestedPosition));
    675672        GST_DEBUG_OBJECT(src, "Range request: %s", formatedRange.get());
    676673        request.setHTTPHeaderField(HTTPHeaderName::Range, formatedRange.get());
    677674    }
    678     priv->readPosition = priv->requestedPosition;
     675    ASSERT(members->readPosition == members->requestedPosition);
    679676
    680677    GST_DEBUG_OBJECT(src, "Persistent connection support %s", priv->keepAlive ? "enabled" : "disabled");
     
    688685    request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
    689686
    690     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
    691     priv->notifier->notify(MainThreadSourceNotification::Start, [protector, request = WTFMove(request), src, notifyAsyncCompletion] {
     687    ASSERT(!isMainThread());
     688    RunLoop::main().dispatch([protector = WTF::ensureGRef(src), request = WTFMove(request), requestNumber = members->requestNumber, src] {
    692689        WebKitWebSrcPrivate* priv = protector->priv;
    693         if (!priv->loader)
    694             priv->loader = priv->player->createResourceLoader();
     690        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     691        // Ignore this task (not making any HTTP request) if by now WebKitWebSrc streaming thread is already waiting
     692        // for a different request. There is no point anymore in sending this one.
     693        if (members->requestNumber != requestNumber) {
     694            GST_DEBUG_OBJECT(protector.get(), "Skipping R%u, current request number is %u", requestNumber, members->requestNumber);
     695            return;
     696        }
     697
     698        if (!members->loader)
     699            members->loader = members->player->createResourceLoader();
    695700
    696701        PlatformMediaResourceLoader::LoadOptions loadOptions = 0;
    697702        if (request.url().protocolIsBlob())
    698703            loadOptions |= PlatformMediaResourceLoader::LoadOption::BufferData;
    699         priv->resource = priv->loader->requestResource(ResourceRequest(request), loadOptions);
    700         if (priv->resource) {
    701             priv->resource->setClient(makeUnique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request)));
    702             GST_DEBUG_OBJECT(protector.get(), "Started request");
    703             if (notifyAsyncCompletion)
    704                 gst_base_src_start_complete(GST_BASE_SRC(src), GST_FLOW_OK);
     704        members->resource = members->loader->requestResource(ResourceRequest(request), loadOptions);
     705        if (members->resource) {
     706            members->resource->setClient(makeUnique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request), requestNumber));
     707            GST_DEBUG_OBJECT(protector.get(), "Started request R%u", requestNumber);
    705708        } else {
    706             GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client");
    707             if (notifyAsyncCompletion)
    708                 gst_base_src_start_complete(GST_BASE_SRC(src), GST_FLOW_ERROR);
    709             priv->loader = nullptr;
     709            GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client to handle R%u", requestNumber);
     710            members->loader = nullptr;
    710711        }
    711712    });
    712 
     713}
     714
     715static gboolean webKitWebSrcStop(GstBaseSrc* baseSrc)
     716{
     717    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
     718    // basesrc will always call unLock() and unLockStop() before calling this. See gst_base_src_stop().
     719    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     720    webkitWebSrcReset(src, members, ResetType::Hard);
     721    GST_DEBUG_OBJECT(src, "Stopped WebKitWebSrc");
    713722    return TRUE;
    714723}
    715724
    716 static void webKitWebSrcCloseSession(WebKitWebSrc* src)
    717 {
    718     WebKitWebSrcPrivate* priv = src->priv;
    719     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
    720 
    721     priv->notifier->notify(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
    722         WebKitWebSrcPrivate* priv = protector->priv;
    723 
    724         GST_DEBUG_OBJECT(protector.get(), "Stopping resource loader");
    725 
    726         if (priv->resource) {
    727             priv->resource->stop();
    728             priv->resource->setClient(nullptr);
    729             priv->resource = nullptr;
    730         }
    731 
    732         if (!keepAlive)
    733             priv->loader = nullptr;
    734     });
    735 
    736     GST_DEBUG_OBJECT(src, "Resource loader stopped");
    737 }
    738 
    739 static gboolean webKitWebSrcStop(GstBaseSrc* baseSrc)
     725static gboolean webKitWebSrcGetSize(GstBaseSrc* baseSrc, guint64* size)
    740726{
    741727    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    742     WebKitWebSrcPrivate* priv = src->priv;
    743 
    744     if (priv->resource || (priv->loader && !priv->keepAlive))
    745         webKitWebSrcCloseSession(src);
    746 
    747     {
    748         LockHolder adapterLocker(priv->adapterLock);
    749         gst_adapter_clear(priv->adapter.get());
    750     }
    751 
    752     webkitWebSrcReset(src);
    753     GST_DEBUG_OBJECT(src, "Stopped request");
    754     return TRUE;
    755 }
    756 
    757 static gboolean webKitWebSrcGetSize(GstBaseSrc* baseSrc, guint64* size)
     728    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     729
     730    GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(members->haveSize), members->size);
     731    if (members->haveSize) {
     732        *size = members->size;
     733        return TRUE;
     734    }
     735
     736    return FALSE;
     737}
     738
     739static gboolean webKitWebSrcIsSeekable(GstBaseSrc* baseSrc)
    758740{
    759741    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    760     WebKitWebSrcPrivate* priv = src->priv;
    761 
    762     GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->haveSize), priv->size);
    763     if (priv->haveSize) {
    764         *size = priv->size;
    765         return TRUE;
    766     }
    767 
    768     return FALSE;
    769 }
    770 
    771 static gboolean webKitWebSrcIsSeekable(GstBaseSrc* baseSrc)
    772 {
     742    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     743    GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(members->isSeekable));
     744    return members->isSeekable;
     745}
     746
     747static gboolean webKitWebSrcDoSeek(GstBaseSrc* baseSrc, GstSegment* segment)
     748{
     749    // This function is mutually exclusive with create(). It's only called when we're transitioning to >=PAUSED and
     750    // between flushes. In any case, basesrc holds the STREAM_LOCK, so we know create() is not running.
     751    // Also, both webKitWebSrcUnLock() and webKitWebSrcUnLockStop() are guaranteed to be called *before* this function.
     752    // [See gst_base_src_perform_seek()].
     753    ASSERT(GST_ELEMENT(baseSrc)->current_state < GST_STATE_PAUSED || GST_PAD_IS_FLUSHING(baseSrc->srcpad));
     754
     755    // Except for the initial seek, this function is only called if isSeekable() returns true.
     756    ASSERT(GST_ELEMENT(baseSrc)->current_state < GST_STATE_PAUSED || webKitWebSrcIsSeekable(baseSrc));
     757
    773758    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    774 
    775     GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(src->priv->isSeekable));
    776     return src->priv->isSeekable;
    777 }
    778 
    779 static gboolean webKitWebSrcDoSeek(GstBaseSrc* baseSrc, GstSegment* segment)
    780 {
    781     WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    782     WebKitWebSrcPrivate* priv = src->priv;
    783     LockHolder locker(priv->responseLock);
    784 
    785     GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ")", segment->start, segment->stop);
    786     if (priv->readPosition == segment->start && priv->requestedPosition == priv->readPosition && priv->stopPosition == segment->stop) {
    787         GST_DEBUG_OBJECT(src, "Seek to current read/end position and no seek pending");
    788         return TRUE;
    789     }
    790 
    791     if (priv->wereHeadersReceived && !priv->isSeekable) {
    792         GST_WARNING_OBJECT(src, "Not seekable");
     759    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     760
     761    GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ") Position previous to seek: %" G_GUINT64_FORMAT, segment->start, segment->stop, members->readPosition);
     762
     763    // Before attempting to seek, basesrc will call isSeekable(). If isSeekable() is true, a flush will be made and
     764    // this function will be called. basesrc still gives us the chance here to return FALSE and cancel the seek.
     765    // We cannot afford to return FALSE in this function though unless we're going to fail on purpose, since at this
     766    // point we have already been flushed and cancelled the HTTP request that was feeding us data.
     767
     768    if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
     769        GST_ERROR_OBJECT(src, "Invalid seek segment");
    793770        return FALSE;
    794771    }
    795772
    796     if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
    797         GST_WARNING_OBJECT(src, "Invalid seek segment");
    798         return FALSE;
    799     }
    800 
    801     if (priv->haveSize && segment->start >= priv->size)
     773    if (members->haveSize && segment->start >= members->size)
    802774        GST_WARNING_OBJECT(src, "Potentially seeking behind end of file, might EOS immediately");
    803775
    804     priv->isSeeking = true;
    805     priv->requestedPosition = segment->start;
    806     priv->stopPosition = segment->stop;
    807     priv->adapterCondition.notifyOne();
     776    members->requestedPosition = members->readPosition = segment->start;
     777    members->stopPosition = segment->stop;
    808778    return TRUE;
    809779}
     
    817787    if (GST_QUERY_TYPE(query) == GST_QUERY_URI) {
    818788        gst_query_set_uri(query, priv->originalURI.data());
    819         if (!priv->redirectedURI.isNull())
    820             gst_query_set_uri_redirection(query, priv->redirectedURI.data());
     789        DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     790        if (!members->redirectedURI.isNull())
     791            gst_query_set_uri_redirection(query, members->redirectedURI.data());
    821792        result = TRUE;
    822793    }
     
    839810{
    840811    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    841     LockHolder locker(src->priv->responseLock);
     812    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
    842813
    843814    GST_DEBUG_OBJECT(src, "Unlock");
    844     src->priv->isFlushing = true;
    845     src->priv->responseCondition.notifyOne();
    846     src->priv->adapterCondition.notifyOne();
     815    members->isFlushing = true;
     816
     817    // If we have a network resource request open, we ask the main thread to close it.
     818    if (members->resource) {
     819        GST_DEBUG_OBJECT(src, "Resource request R%u will be stopped", members->requestNumber);
     820        RunLoop::main().dispatch([protector = WTF::ensureGRef(src), resource = WTFMove(members->resource), requestNumber = members->requestNumber] {
     821            GST_DEBUG_OBJECT(protector.get(), "Stopping resource request R%u", requestNumber);
     822            resource->stop();
     823            resource->setClient(nullptr);
     824        });
     825    }
     826    ASSERT(!members->resource);
     827
     828    if (!src->priv->keepAlive)
     829        members->loader = nullptr;
     830
     831    // Ensure all network callbacks from the old request don't feed data to WebKitWebSrc anymore.
     832    members->requestNumber++;
     833
     834    // Wake up streaming thread.
     835    members->responseCondition.notifyOne();
     836
    847837    return TRUE;
    848838}
     
    851841{
    852842    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
    853     LockHolder locker(src->priv->responseLock);
     843    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
    854844    GST_DEBUG_OBJECT(src, "Unlock stop");
    855     src->priv->isFlushing = false;
     845    members->isFlushing = false;
     846    webkitWebSrcReset(src, members, ResetType::Soft);
    856847
    857848    return TRUE;
    858 }
    859 
    860 static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
    861 {
    862     WebKitWebSrc* src = WEBKIT_WEB_SRC(element);
    863 
    864 #if GST_CHECK_VERSION(1, 14, 0)
    865     GST_DEBUG_OBJECT(src, "%s", gst_state_change_get_name(transition));
    866 #endif
    867     switch (transition) {
    868     case GST_STATE_CHANGE_READY_TO_NULL:
    869         webKitWebSrcCloseSession(src);
    870         break;
    871     case GST_STATE_CHANGE_PAUSED_TO_READY: {
    872         LockHolder locker(src->priv->responseLock);
    873         GST_DEBUG_OBJECT(src, "PAUSED->READY cancelling network requests");
    874         src->priv->isFlushing = true;
    875         src->priv->responseCondition.notifyOne();
    876         src->priv->adapterCondition.notifyOne();
    877         break;
    878     }
    879     default:
    880         break;
    881     }
    882 
    883     return GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
    884849}
    885850
     
    939904    }
    940905
    941     priv->redirectedURI = CString();
    942906    priv->originalURI = CString();
    943907    if (!uri)
     
    973937{
    974938    ASSERT(player);
    975     src->priv->player = player;
     939    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     940    members->player = player;
    976941}
    977942
    978943bool webKitSrcPassedCORSAccessCheck(WebKitWebSrc* src)
    979944{
    980     return src->priv->didPassAccessControlCheck;
    981 }
    982 
    983 CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
    984     : m_src(GST_ELEMENT(src))
     945    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     946    return members->didPassAccessControlCheck;
     947}
     948
     949CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request, unsigned requestNumber)
     950    : m_requestNumber(requestNumber)
     951    , m_src(GST_ELEMENT(src))
    985952    , m_request(WTFMove(request))
    986953{
     
    991958void CachedResourceStreamingClient::checkUpdateBlocksize(unsigned bytesRead)
    992959{
     960    ASSERT(isMainThread());
    993961    WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
    994962    GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
     
    1027995void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response, CompletionHandler<void(PolicyChecker::ShouldContinue)>&& completionHandler)
    1028996{
     997    ASSERT(isMainThread());
    1029998    WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
    1030999    WebKitWebSrcPrivate* priv = src->priv;
    1031     priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
    1032 
    1033     GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
    1034 
     1000    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     1001    if (members->requestNumber != m_requestNumber) {
     1002        completionHandler(PolicyChecker::ShouldContinue::No);
     1003        return;
     1004    }
     1005
     1006    GST_DEBUG_OBJECT(src, "R%u: Received response: %d", m_requestNumber, response.httpStatusCode());
     1007
     1008    members->didPassAccessControlCheck = members->resource->didPassAccessControlCheck();
    10351009    m_origins.add(SecurityOrigin::create(response.url()));
    10361010
    10371011    auto responseURI = response.url().string().utf8();
    10381012    if (priv->originalURI != responseURI)
    1039         priv->redirectedURI = WTFMove(responseURI);
    1040 
    1041     uint64_t length = response.expectedContentLength();
    1042     if (length > 0 && priv->requestedPosition && response.httpStatusCode() == 206)
    1043         length += priv->requestedPosition;
    1044 
    1045     priv->httpHeaders.reset(gst_structure_new_empty("http-headers"));
    1046     gst_structure_set(priv->httpHeaders.get(), "uri", G_TYPE_STRING, priv->originalURI.data(),
     1013        members->redirectedURI = WTFMove(responseURI);
     1014
     1015    // length will be zero (unknown) if no Content-Length is provided or the response is compressed with Content-Encoding.
     1016    uint64_t length = !response.httpHeaderFields().contains(HTTPHeaderName::ContentEncoding) ? response.expectedContentLength() : 0;
     1017    if (length > 0 && members->requestedPosition && response.httpStatusCode() == 206)
     1018        length += members->requestedPosition;
     1019
     1020    GUniquePtr<GstStructure> httpHeaders(gst_structure_new_empty("http-headers"));
     1021
     1022    gst_structure_set(httpHeaders.get(), "uri", G_TYPE_STRING, priv->originalURI.data(),
    10471023        "http-status-code", G_TYPE_UINT, response.httpStatusCode(), nullptr);
    1048     if (!priv->redirectedURI.isNull())
    1049         gst_structure_set(priv->httpHeaders.get(), "redirection-uri", G_TYPE_STRING, priv->redirectedURI.data(), nullptr);
     1024    if (!members->redirectedURI.isNull())
     1025        gst_structure_set(httpHeaders.get(), "redirection-uri", G_TYPE_STRING, members->redirectedURI.data(), nullptr);
     1026
     1027    // Pack request headers in the http-headers structure.
    10501028    GUniquePtr<GstStructure> headers(gst_structure_new_empty("request-headers"));
    10511029    for (const auto& header : m_request.httpHeaderFields())
    10521030        gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
    1053     GST_DEBUG_OBJECT(src, "Request headers going downstream: %" GST_PTR_FORMAT, headers.get());
    1054     gst_structure_set(priv->httpHeaders.get(), "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
     1031    GST_DEBUG_OBJECT(src, "R%u: Request headers going downstream: %" GST_PTR_FORMAT, m_requestNumber, headers.get());
     1032    gst_structure_set(httpHeaders.get(), "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
     1033
     1034    // Pack response headers in the http-headers structure.
    10551035    headers.reset(gst_structure_new_empty("response-headers"));
    10561036    for (const auto& header : response.httpHeaderFields()) {
     
    10621042            gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
    10631043    }
    1064     auto contentLengthFieldName(httpHeaderNameString(HTTPHeaderName::ContentLength).toString());
    1065     if (!gst_structure_has_field(headers.get(), contentLengthFieldName.utf8().data()))
    1066         gst_structure_set(headers.get(), contentLengthFieldName.utf8().data(), G_TYPE_UINT64, static_cast<uint64_t>(length), nullptr);
    1067     gst_structure_set(priv->httpHeaders.get(), "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
    1068     GST_DEBUG_OBJECT(src, "Response headers going downstream: %" GST_PTR_FORMAT, headers.get());
    1069 
    1070     priv->httpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, gst_structure_copy(priv->httpHeaders.get())));
    1071 
    1072     auto scopeExit = makeScopeExit([&] {
    1073         GstStructure* structure = gst_structure_copy(src->priv->httpHeaders.get());
    1074         gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src), structure));
    1075     });
     1044    GST_DEBUG_OBJECT(src, "R%u: Response headers going downstream: %" GST_PTR_FORMAT, m_requestNumber, headers.get());
     1045    gst_structure_set(httpHeaders.get(), "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
     1046
     1047    members->pendingHttpHeadersMessage = adoptGRef(gst_message_new_element(GST_OBJECT_CAST(src), gst_structure_copy(httpHeaders.get())));
     1048    members->pendingHttpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders.release()));
    10761049
    10771050    if (response.httpStatusCode() >= 400) {
    1078         GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
    1079         priv->doesHaveEOS = true;
    1080         webKitWebSrcStop(GST_BASE_SRC_CAST(src));
     1051        GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: Received %d HTTP error code", m_requestNumber, response.httpStatusCode()), (nullptr));
     1052        members->doesHaveEOS = true;
     1053        members->responseCondition.notifyOne();
    10811054        completionHandler(PolicyChecker::ShouldContinue::No);
    10821055        return;
    10831056    }
    10841057
    1085     if (priv->requestedPosition) {
     1058    if (members->requestedPosition) {
    10861059        // Seeking ... we expect a 206 == PARTIAL_CONTENT
    1087         if (response.httpStatusCode() == 200) {
    1088             // Range request didn't have a ranged response; resetting offset.
    1089             priv->readPosition = 0;
    1090         } else if (response.httpStatusCode() != 206) {
     1060        if (response.httpStatusCode() != 206) {
    10911061            // Range request completely failed.
    1092             GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
    1093             priv->doesHaveEOS = true;
    1094             webKitWebSrcStop(GST_BASE_SRC_CAST(src));
     1062            GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: Received unexpected %d HTTP status code for range request", m_requestNumber, response.httpStatusCode()), (nullptr));
     1063            members->doesHaveEOS = true;
     1064            members->responseCondition.notifyOne();
    10951065            completionHandler(PolicyChecker::ShouldContinue::No);
    10961066            return;
    1097         } else {
    1098             GST_DEBUG_OBJECT(src, "Range request succeeded");
    1099             priv->isSeeking = false;
    1100             priv->wasSeeking = true;
    11011067        }
    1102     }
    1103 
    1104     priv->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
    1105 
    1106     GST_DEBUG_OBJECT(src, "Size: %" G_GUINT64_FORMAT ", isSeekable: %s", length, boolForPrinting(priv->isSeekable));
     1068        GST_DEBUG_OBJECT(src, "R%u: Range request succeeded", m_requestNumber);
     1069    }
     1070
     1071    members->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
     1072
     1073    GST_DEBUG_OBJECT(src, "R%u: Size: %" G_GUINT64_FORMAT ", isSeekable: %s", m_requestNumber, length, boolForPrinting(members->isSeekable));
    11071074    if (length > 0) {
    1108         if (!priv->haveSize || priv->size != length) {
    1109             priv->haveSize = true;
    1110             priv->size = length;
    1111             priv->isDurationSet = false;
     1075        if (!members->haveSize || members->size != length) {
     1076            members->haveSize = true;
     1077            members->size = length;
    11121078        }
    11131079    } else
    1114         priv->haveSize = false;
     1080        members->haveSize = false;
    11151081
    11161082    // Signal to downstream if this is an Icecast stream.
     
    11241090
    11251091            String contentType = response.httpHeaderField(HTTPHeaderName::ContentType);
    1126             GST_DEBUG_OBJECT(src, "Response ContentType: %s", contentType.utf8().data());
     1092            GST_DEBUG_OBJECT(src, "R%u: Response ContentType: %s", m_requestNumber, contentType.utf8().data());
    11271093            gst_caps_set_simple(caps.get(), "content-type", G_TYPE_STRING, contentType.utf8().data(), nullptr);
    11281094        }
    11291095    }
    1130 
    11311096    if (caps) {
    1132         GST_DEBUG_OBJECT(src, "Set caps to %" GST_PTR_FORMAT, caps.get());
    1133         gst_base_src_set_caps(GST_BASE_SRC_CAST(src), caps.get());
    1134     }
    1135 
    1136     {
    1137         LockHolder locker(priv->responseLock);
    1138         priv->wereHeadersReceived = true;
    1139         priv->headersCondition.notifyOne();
    1140     }
     1097        GST_DEBUG_OBJECT(src, "R%u: Set caps to %" GST_PTR_FORMAT, m_requestNumber, caps.get());
     1098        members->pendingCaps = WTFMove(caps);
     1099    }
     1100
     1101    members->wasResponseReceived = true;
     1102    members->responseCondition.notifyOne();
     1103
    11411104    completionHandler(PolicyChecker::ShouldContinue::Yes);
    11421105}
     
    11441107void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
    11451108{
     1109    ASSERT(isMainThread());
    11461110    WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
    1147     GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
    11481111    WebKitWebSrcPrivate* priv = src->priv;
    11491112
    1150     GST_LOG_OBJECT(src, "Have %d bytes of data", length);
    1151     LockHolder locker(priv->responseLock);
     1113    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(priv->dataMutex);
     1114    if (members->requestNumber != m_requestNumber)
     1115        return;
     1116
     1117    GST_LOG_OBJECT(src, "R%u: Have %d bytes of data", m_requestNumber, length);
     1118
    11521119    // Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and
    11531120    // that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the
    11541121    // sending time from the receiving time, it is better to ignore it.
    1155     if (!std::isnan(priv->downloadStartTime)) {
    1156         priv->totalDownloadedBytes += length;
    1157         double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds();
    1158         GST_TRACE_OBJECT(src, "downloaded %" G_GUINT64_FORMAT " bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart
    1159             , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0);
     1122    if (!std::isnan(members->downloadStartTime)) {
     1123        members->totalDownloadedBytes += length;
     1124        double timeSinceStart = (WallTime::now() - members->downloadStartTime).seconds();
     1125        GST_TRACE_OBJECT(src, "R%u: downloaded %" G_GUINT64_FORMAT " bytes in %f seconds =~ %1.0f bytes/second", m_requestNumber, members->totalDownloadedBytes, timeSinceStart
     1126            , timeSinceStart ? members->totalDownloadedBytes / timeSinceStart : 0);
    11601127    } else {
    1161         priv->downloadStartTime = WallTime::now();
    1162         priv->totalDownloadedBytes = 0;
    1163     }
    1164 
    1165     uint64_t newPosition = priv->readPosition + length;
    1166     if (LIKELY (priv->requestedPosition == priv->readPosition))
    1167         priv->requestedPosition = newPosition;
    1168     priv->readPosition = newPosition;
    1169 
    1170     uint64_t newSize = 0;
    1171     if (priv->haveSize && (newPosition > priv->size)) {
    1172         GST_DEBUG_OBJECT(src, "Got position previous estimated content size (%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", newPosition, priv->size);
    1173         newSize = newPosition;
    1174     }
    1175 
    1176     if (newSize) {
    1177         priv->size = newSize;
    1178         baseSrc->segment.duration = priv->size;
    1179         gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
    1180     }
     1128        members->downloadStartTime = WallTime::now();
     1129    }
     1130
     1131    members->readPosition += length;
     1132    ASSERT(!members->haveSize || members->readPosition <= members->size);
    11811133
    11821134    gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src),
    1183         gst_structure_new("webkit-network-statistics", "read-position", G_TYPE_UINT64, priv->readPosition, "size", G_TYPE_UINT64, priv->size, nullptr)));
     1135        gst_structure_new("webkit-network-statistics", "read-position", G_TYPE_UINT64, members->readPosition, "size", G_TYPE_UINT64, members->size, nullptr)));
    11841136
    11851137    checkUpdateBlocksize(length);
    11861138
    1187     if (!priv->wasResponseReceived)
    1188         priv->wasResponseReceived = true;
    1189     priv->responseCondition.notifyOne();
    1190 
    1191     {
    1192         LockHolder adapterLocker(priv->adapterLock);
    1193         GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
    1194         gst_adapter_push(priv->adapter.get(), buffer);
    1195         stopLoaderIfNeeded(src);
    1196         priv->adapterCondition.notifyOne();
    1197     }
     1139    GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
     1140    gst_adapter_push(members->adapter.get(), buffer);
     1141    stopLoaderIfNeeded(src, members);
     1142    members->responseCondition.notifyOne();
    11981143}
    11991144
    12001145void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResource&, const ResourceError& error)
    12011146{
     1147    ASSERT(isMainThread());
    12021148    WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
    1203     GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
    1204     src->priv->doesHaveEOS = true;
    1205     webKitWebSrcStop(GST_BASE_SRC_CAST(src));
     1149    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     1150    if (members->requestNumber != m_requestNumber)
     1151        return;
     1152
     1153    GST_ELEMENT_ERROR(src, RESOURCE, READ, ("R%u: %s", m_requestNumber, error.localizedDescription().utf8().data()), (nullptr));
     1154    members->doesHaveEOS = true;
     1155    members->responseCondition.notifyOne();
    12061156}
    12071157
    12081158void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const ResourceError& error)
    12091159{
     1160    ASSERT(isMainThread());
    12101161    WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
     1162    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     1163    if (members->requestNumber != m_requestNumber)
     1164        return;
    12111165
    12121166    if (!error.isCancellation()) {
    1213         GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
    1214         GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
    1215     }
    1216 
    1217     src->priv->doesHaveEOS = true;
     1167        GST_ERROR_OBJECT(src, "R%u: Have failure: %s", m_requestNumber, error.localizedDescription().utf8().data());
     1168        GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("R%u: %s", m_requestNumber, error.localizedDescription().utf8().data()), (nullptr));
     1169    }
     1170
     1171    members->doesHaveEOS = true;
     1172    members->responseCondition.notifyOne();
    12181173}
    12191174
    12201175void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
    12211176{
     1177    ASSERT(isMainThread());
    12221178    WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
    1223     WebKitWebSrcPrivate* priv = src->priv;
    1224 
    1225     if (priv->isSeeking && !priv->isFlushing)
    1226         priv->isSeeking = false;
     1179    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     1180    if (members->requestNumber != m_requestNumber)
     1181        return;
     1182
     1183    members->doesHaveEOS = true;
     1184    members->responseCondition.notifyOne();
    12271185}
    12281186
    12291187bool webKitSrcWouldTaintOrigin(WebKitWebSrc* src, const SecurityOrigin& origin)
    12301188{
    1231     WebKitWebSrcPrivate* priv = src->priv;
    1232 
    1233     auto* cachedResourceStreamingClient = reinterpret_cast<CachedResourceStreamingClient*>(priv->resource->client());
     1189    DataMutex<WebKitWebSrcPrivate::StreamingMembers>::LockedWrapper members(src->priv->dataMutex);
     1190
     1191    auto* cachedResourceStreamingClient = reinterpret_cast<CachedResourceStreamingClient*>(members->resource->client());
    12341192    for (auto& responseOrigin : cachedResourceStreamingClient->securityOrigins()) {
    12351193        if (!origin.canAccess(*responseOrigin))
  • trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.h

    r252398 r260755  
    4040#define WEBKIT_WEB_SRC_PLAYER_CONTEXT_TYPE_NAME  "webkit.media-player"
    4141
    42 typedef struct _WebKitWebSrc        WebKitWebSrc;
    43 typedef struct _WebKitWebSrcClass   WebKitWebSrcClass;
    44 typedef struct _WebKitWebSrcPrivate WebKitWebSrcPrivate;
     42struct WebKitWebSrcPrivate;
    4543
    46 struct _WebKitWebSrc {
     44struct WebKitWebSrc {
    4745    GstPushSrc parent;
    4846
     
    5048};
    5149
    52 struct _WebKitWebSrcClass {
     50struct WebKitWebSrcClass {
    5351    GstPushSrcClass parentClass;
    5452};
Note: See TracChangeset for help on using the changeset viewer.