Changeset 254503 in webkit


Ignore:
Timestamp:
Jan 14, 2020 2:16:19 AM (4 years ago)
Author:
calvaris@igalia.com
Message:

[GStreamer] Rework WebKitWebSrc to improve robustness
https://bugs.webkit.org/show_bug.cgi?id=206003

Reviewed by Philippe Normand.

Reworked how the web source deals with data. It's more eager now
in pushing data downstream. We don't use the GstAdapter methods
marked as fast anymore because sometimes it was slower. The reason
why this was slower is that we can be waiting for more "fast"
(that could be retrieved with the _fast API) buffers to be
available even in cases where the queue is not empty. These other
buffers can be retrieved with the "non _fast" API.

The streaming thread locks now when it has no data to push
downstream and restarts the download if needed.

In this patch we also fixed the possible race condition of
receiving a flush during the streaming thread wait.

No new tests, just a rework.

  • platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:

(WebCore::MediaPlayerPrivateGStreamer::updateStates): Added FALLTHROUGH.

  • platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:

(restartLoaderIfNeeded):
(stopLoaderIfNeeded): Refactored.
(webKitWebSrcCreate): Avoid adapter methods marked as fast,
otherwise we might be waiting for data we already have. Streaming
thread is now going to lock waiting for data and is more eager in
pushing data downstream.
(webKitWebSrcStop): No more queueSize.
(webKitWebSrcDoSeek):
(webKitWebSrcUnLock):
(webKitWebSrcChangeState): Notify streaming thread.
(CachedResourceStreamingClient::checkUpdateBlocksize): Blocksize
adjustment improved. With former values blocksize grew too fast
and couldn't be reduced so easily. I think now it adjusts more
quickly to the real network values.
(CachedResourceStreamingClient::dataReceived): Added rudimentary
bandwith calculation and use stopLoaderIfNeeded.

Location:
trunk/Source/WebCore
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/Source/WebCore/ChangeLog

    r254502 r254503  
     12020-01-14  Xabier Rodriguez Calvar  <calvaris@igalia.com>
     2
     3        [GStreamer] Rework WebKitWebSrc to improve robustness
     4        https://bugs.webkit.org/show_bug.cgi?id=206003
     5
     6        Reviewed by Philippe Normand.
     7
     8        Reworked how the web source deals with data. It's more eager now
     9        in pushing data downstream.  We don't use the GstAdapter methods
     10        marked as fast anymore because sometimes it was slower. The reason
     11        why this was slower is that we can be waiting for more "fast"
     12        (that could be retrieved with the _fast API) buffers to be
     13        available even in cases where the queue is not empty. These other
     14        buffers can be retrieved with the "non _fast" API.
     15
     16        The streaming thread locks now when it has no data to push
     17        downstream and restarts the download if needed.
     18
     19        In this patch we also fixed the possible race condition of
     20        receiving a flush during the streaming thread wait.
     21
     22        No new tests, just a rework.
     23
     24        * platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
     25        (WebCore::MediaPlayerPrivateGStreamer::updateStates): Added FALLTHROUGH.
     26        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
     27        (restartLoaderIfNeeded):
     28        (stopLoaderIfNeeded): Refactored.
     29        (webKitWebSrcCreate): Avoid adapter methods marked as fast,
     30        otherwise we might be waiting for data we already have. Streaming
     31        thread is now going to lock waiting for data and is more eager in
     32        pushing data downstream.
     33        (webKitWebSrcStop): No more queueSize.
     34        (webKitWebSrcDoSeek):
     35        (webKitWebSrcUnLock):
     36        (webKitWebSrcChangeState): Notify streaming thread.
     37        (CachedResourceStreamingClient::checkUpdateBlocksize): Blocksize
     38        adjustment improved. With former values blocksize grew too fast
     39        and couldn't be reduced so easily. I think now it adjusts more
     40        quickly to the real network values.
     41        (CachedResourceStreamingClient::dataReceived): Added rudimentary
     42        bandwith calculation and use stopLoaderIfNeeded.
     43
    1442020-01-14  Tomoki Imai  <Tomoki.Imai@sony.com>
    245
  • trunk/Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp

    r254214 r254503  
    25752575            break;
    25762576        case GST_STATE_PAUSED:
     2577            FALLTHROUGH;
    25772578        case GST_STATE_PLAYING:
    25782579            if (m_isBuffering) {
     
    26152616            m_isPaused = false;
    26162617
    2617             if ((m_isBuffering && m_isLiveStream) || !m_playbackRate) {
     2618            if ((m_isBuffering && !m_isLiveStream) || !m_playbackRate) {
    26182619                GST_DEBUG_OBJECT(pipeline(), "[Buffering] Pausing stream for buffering.");
    26192620                changePipelineState(GST_STATE_PAUSED);
  • trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp

    r253964 r254503  
    7474
    7575    static constexpr int s_growBlocksizeLimit { 1 };
    76     static constexpr int s_growBlocksizeCount { 1 };
     76    static constexpr int s_growBlocksizeCount { 2 };
    7777    static constexpr int s_growBlocksizeFactor { 2 };
    78     static constexpr float s_reduceBlocksizeLimit { 0.20 };
     78    static constexpr float s_reduceBlocksizeLimit { 0.5 };
    7979    static constexpr int s_reduceBlocksizeCount { 2 };
    8080    static constexpr float s_reduceBlocksizeFactor { 0.5 };
     
    142142    Lock adapterLock;
    143143    Condition adapterCondition;
    144     uint64_t queueSize { 0 };
    145144    bool isDownloadSuspended { false };
    146145    GRefPtr<GstAdapter> adapter;
    147146    GRefPtr<GstEvent> httpHeadersEvent;
    148147    GUniquePtr<GstStructure> httpHeaders;
     148    WallTime downloadStartTime { WallTime::nan() };
     149    uint64_t totalDownloadedBytes { 0 };
    149150};
    150151
     
    181182static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
    182183static void webKitWebSrcSetContext(GstElement*, GstContext*);
     184static void restartLoaderIfNeeded(WebKitWebSrc*);
     185static void stopLoaderIfNeeded(WebKitWebSrc*);
    183186
    184187#define webkit_web_src_parent_class parent_class
     
    346349}
    347350
     351static void restartLoaderIfNeeded(WebKitWebSrc* src)
     352{
     353    WebKitWebSrcPrivate* priv = src->priv;
     354
     355    if (!priv->isDownloadSuspended) {
     356        GST_TRACE_OBJECT(src, "download already active");
     357        return;
     358    }
     359
     360    GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT
     361        " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize)
     362        , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
     363    if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
     364        GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
     365        return;
     366    }
     367    GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src)));
     368    if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) {
     369        GST_TRACE_OBJECT(src, "can't restart download");
     370        return;
     371    }
     372
     373    size_t queueSize = gst_adapter_available(priv->adapter.get());
     374    GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (min %1.0f)", queueSize
     375        , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
     376
     377    if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
     378        GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download");
     379        return;
     380    }
     381
     382    GST_DEBUG_OBJECT(src, "restarting download");
     383    priv->isDownloadSuspended = false;
     384    webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false);
     385}
     386
     387
     388static void stopLoaderIfNeeded(WebKitWebSrc* src)
     389{
     390    WebKitWebSrcPrivate* priv = src->priv;
     391
     392    if (priv->isDownloadSuspended) {
     393        GST_TRACE_OBJECT(src, "download already suspended");
     394        return;
     395    }
     396
     397    GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)"
     398        , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size
     399        , SMALL_MEDIA_RESOURCE_MAX_SIZE);
     400    if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
     401        GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
     402        return;
     403    }
     404
     405    size_t queueSize = gst_adapter_available(priv->adapter.get());
     406    GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
     407    if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
     408        GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download");
     409        return;
     410    }
     411
     412    GST_DEBUG_OBJECT(src, "stopping download");
     413    priv->isDownloadSuspended = true;
     414    priv->resource->stop();
     415}
     416
    348417static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
    349418{
     
    358427            LockHolder adapterLocker(priv->adapterLock);
    359428            GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
    360             // Discard all the buffers coming before the requested seek position.
    361             gst_adapter_flush(priv->adapter.get(), priv->queueSize);
    362             priv->queueSize = 0;
     429            gst_adapter_clear(priv->adapter.get());
    363430        }
    364431        uint64_t requestedPosition = priv->requestedPosition;
     
    380447    }
    381448
    382     GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, queueSize: %" G_GUINT64_FORMAT ", isDownloadSuspended: %s",
    383         boolForPrinting(priv->isFlushing), boolForPrinting(priv->doesHaveEOS), priv->queueSize,
    384         boolForPrinting(priv->isDownloadSuspended));
     449    // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
     450    // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
     451    // other buffers can be retrieved with the "non _fast" API.
     452    GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing)
     453        , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended));
     454
     455    if (priv->doesHaveEOS) {
     456        GST_DEBUG_OBJECT(src, "EOS");
     457        return GST_FLOW_EOS;
     458    }
     459
     460    unsigned size = gst_base_src_get_blocksize(baseSrc);
     461    size_t queueSize;
     462    {
     463        LockHolder adapterLocker(priv->adapterLock);
     464        unsigned retries = 0;
     465        queueSize = gst_adapter_available(priv->adapter.get());
     466        GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
     467        while (!queueSize && !priv->isFlushing) {
     468            GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data");
     469            priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] {
     470                restartLoaderIfNeeded(src);
     471                return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get()));
     472            });
     473            queueSize = gst_adapter_available(priv->adapter.get());
     474            GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
     475            if (queueSize || priv->isFlushing) {
     476                // We have data or we're flushing. We can break the loop here.
     477                break;
     478            }
     479
     480            // We should keep waiting but we could be in EOS. Let's check the two possibilities:
     481            // 1. We are at the end of the file with a known size.
     482            // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible.
     483            if (priv->haveSize && priv->readPosition >= priv->size) {
     484                GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS");
     485                return GST_FLOW_EOS;
     486            }
     487            GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1);
     488            if (!priv->isDownloadSuspended && ++retries >= 10) {
     489                GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS");
     490                return GST_FLOW_EOS;
     491            }
     492        }
     493    }
    385494
    386495    if (priv->isFlushing) {
    387496        GST_DEBUG_OBJECT(src, "Flushing");
    388497        return GST_FLOW_FLUSHING;
    389     }
    390 
    391     if (priv->doesHaveEOS) {
    392         GST_DEBUG_OBJECT(src, "EOS");
    393         return GST_FLOW_EOS;
    394     }
    395 
    396     unsigned size = gst_base_src_get_blocksize(baseSrc);
    397     bool isAdapterDrained = false;
    398     {
    399         LockHolder adapterLocker(priv->adapterLock);
    400         unsigned retries = 0;
    401         size_t available = gst_adapter_available_fast(priv->adapter.get());
    402         while (available < size && !isAdapterDrained) {
    403             priv->adapterCondition.waitFor(priv->adapterLock, 100_ms, [&] {
    404                 return gst_adapter_available_fast(priv->adapter.get()) >= size;
    405             });
    406             retries++;
    407             available = gst_adapter_available_fast(priv->adapter.get());
    408             if (available && available < size) {
    409                 GST_TRACE_OBJECT(src, "did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, available);
    410                 size = available;
    411             } else if (retries > 3)
    412                 isAdapterDrained = true;
    413         }
    414     }
    415 
    416     if (isAdapterDrained) {
    417         GST_DEBUG_OBJECT(src, "Adapter still empty after 400 milli-seconds of waiting, assuming EOS");
    418         return GST_FLOW_EOS;
    419498    }
    420499
     
    430509
    431510    {
    432         GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
    433511        LockHolder adapterLocker(priv->adapterLock);
     512        queueSize = gst_adapter_available(priv->adapter.get());
     513        if (queueSize < size) {
     514            GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize);
     515            size = queueSize;
     516        } else
     517            GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
    434518        if (size) {
    435             *buffer = gst_adapter_take_buffer_fast(priv->adapter.get(), size);
     519            *buffer = gst_adapter_take_buffer(priv->adapter.get(), size);
    436520            RELEASE_ASSERT(*buffer);
    437 
    438             priv->queueSize -= size;
    439521
    440522            GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
     
    450532                priv->wasSeeking = false;
    451533
    452             if (!priv->doesHaveEOS && priv->haveSize && priv->isSeekable
    453                 && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && priv->readPosition
    454                 && (priv->readPosition != priv->size)
    455                 && (priv->queueSize < (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD))
    456                 && GST_STATE(src) >= GST_STATE_PAUSED && priv->isDownloadSuspended) {
    457                 GST_DEBUG_OBJECT(src, "[Buffering] Adapter running out of data, restarting download");
    458                 priv->isDownloadSuspended = false;
    459                 webKitWebSrcMakeRequest(baseSrc, false);
    460             }
    461 
    462         } else
    463             GST_ERROR_OBJECT(src, "Empty adapter?");
     534            restartLoaderIfNeeded(src);
     535        } else {
     536            GST_ERROR_OBJECT(src, "Empty adapter!");
     537            ASSERT_NOT_REACHED();
     538        }
    464539    }
    465540
     
    549624    priv->doesHaveEOS = false;
    550625    priv->isFlushing = false;
     626    priv->downloadStartTime = WallTime::nan();
    551627
    552628    priv->didPassAccessControlCheck = false;
     
    672748        LockHolder adapterLocker(priv->adapterLock);
    673749        gst_adapter_clear(priv->adapter.get());
    674         priv->queueSize = 0;
    675750    }
    676751
     
    730805    priv->requestedPosition = segment->start;
    731806    priv->stopPosition = segment->stop;
     807    priv->adapterCondition.notifyOne();
    732808    return TRUE;
    733809}
     
    768844    src->priv->isFlushing = true;
    769845    src->priv->responseCondition.notifyOne();
     846    src->priv->adapterCondition.notifyOne();
    770847    return TRUE;
    771848}
     
    797874        src->priv->isFlushing = true;
    798875        src->priv->responseCondition.notifyOne();
    799         break;
    800     } default:
     876        src->priv->adapterCondition.notifyOne();
     877        break;
     878    }
     879    default:
    801880        break;
    802881    }
     
    919998    GST_LOG_OBJECT(src, "Checking to update blocksize. Read: %u, current blocksize: %u", bytesRead, blocksize);
    920999
    921     if (bytesRead >= blocksize * s_growBlocksizeLimit) {
     1000    if (bytesRead > blocksize * s_growBlocksizeLimit) {
    9221001        m_reduceBlocksizeCount = 0;
    9231002        m_increaseBlocksizeCount++;
     
    10711150    GST_LOG_OBJECT(src, "Have %d bytes of data", length);
    10721151    LockHolder locker(priv->responseLock);
     1152    // Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and
     1153    // that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the
     1154    // sending time from the receiving time, it is better to ignore it.
     1155    if (!isnan(priv->downloadStartTime)) {
     1156        priv->totalDownloadedBytes += length;
     1157        double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds();
     1158        GST_TRACE_OBJECT(src, "downloaded %u bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart
     1159            , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0);
     1160    } else {
     1161        priv->downloadStartTime = WallTime::now();
     1162        priv->totalDownloadedBytes = 0;
     1163    }
    10731164
    10741165    uint64_t newPosition = priv->readPosition + length;
     
    11051196        LockHolder adapterLocker(priv->adapterLock);
    11061197        GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
    1107         priv->queueSize += length;
    11081198        gst_adapter_push(priv->adapter.get(), buffer);
    1109         GST_TRACE_OBJECT(src, "[Buffering] isDownloadSuspended: %s", boolForPrinting(priv->isDownloadSuspended));
    1110         if (priv->haveSize && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && (priv->queueSize > (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD))
    1111             && !priv->isDownloadSuspended && priv->isSeekable) {
    1112             GST_TRACE_OBJECT(src, "[Buffering] queueSize: %" G_GUINT64_FORMAT ", threshold: %f", priv->queueSize,
    1113                 priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
    1114             GST_DEBUG_OBJECT(src, "[Buffering] Stopping resource loader");
    1115             priv->isDownloadSuspended = true;
    1116             priv->resource->stop();
    1117             return;
    1118         }
     1199        stopLoaderIfNeeded(src);
    11191200        priv->adapterCondition.notifyOne();
    11201201    }
Note: See TracChangeset for help on using the changeset viewer.