Changeset 254503 in webkit
- Timestamp:
- Jan 14, 2020 2:16:19 AM (4 years ago)
- Location:
- trunk/Source/WebCore
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Source/WebCore/ChangeLog
r254502 r254503 1 2020-01-14 Xabier Rodriguez Calvar <calvaris@igalia.com> 2 3 [GStreamer] Rework WebKitWebSrc to improve robustness 4 https://bugs.webkit.org/show_bug.cgi?id=206003 5 6 Reviewed by Philippe Normand. 7 8 Reworked how the web source deals with data. It's more eager now 9 in pushing data downstream. We don't use the GstAdapter methods 10 marked as fast anymore because sometimes it was slower. The reason 11 why this was slower is that we can be waiting for more "fast" 12 (that could be retrieved with the _fast API) buffers to be 13 available even in cases where the queue is not empty. These other 14 buffers can be retrieved with the "non _fast" API. 15 16 The streaming thread locks now when it has no data to push 17 downstream and restarts the download if needed. 18 19 In this patch we also fixed the possible race condition of 20 receiving a flush during the streaming thread wait. 21 22 No new tests, just a rework. 23 24 * platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp: 25 (WebCore::MediaPlayerPrivateGStreamer::updateStates): Added FALLTHROUGH. 26 * platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp: 27 (restartLoaderIfNeeded): 28 (stopLoaderIfNeeded): Refactored. 29 (webKitWebSrcCreate): Avoid adapter methods marked as fast, 30 otherwise we might be waiting for data we already have. Streaming 31 thread is now going to lock waiting for data and is more eager in 32 pushing data downstream. 33 (webKitWebSrcStop): No more queueSize. 34 (webKitWebSrcDoSeek): 35 (webKitWebSrcUnLock): 36 (webKitWebSrcChangeState): Notify streaming thread. 37 (CachedResourceStreamingClient::checkUpdateBlocksize): Blocksize 38 adjustment improved. With former values blocksize grew too fast 39 and couldn't be reduced so easily. I think now it adjusts more 40 quickly to the real network values. 41 (CachedResourceStreamingClient::dataReceived): Added rudimentary 42 bandwith calculation and use stopLoaderIfNeeded. 43 1 44 2020-01-14 Tomoki Imai <Tomoki.Imai@sony.com> 2 45 -
trunk/Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp
r254214 r254503 2575 2575 break; 2576 2576 case GST_STATE_PAUSED: 2577 FALLTHROUGH; 2577 2578 case GST_STATE_PLAYING: 2578 2579 if (m_isBuffering) { … … 2615 2616 m_isPaused = false; 2616 2617 2617 if ((m_isBuffering && m_isLiveStream) || !m_playbackRate) {2618 if ((m_isBuffering && !m_isLiveStream) || !m_playbackRate) { 2618 2619 GST_DEBUG_OBJECT(pipeline(), "[Buffering] Pausing stream for buffering."); 2619 2620 changePipelineState(GST_STATE_PAUSED); -
trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp
r253964 r254503 74 74 75 75 static constexpr int s_growBlocksizeLimit { 1 }; 76 static constexpr int s_growBlocksizeCount { 1};76 static constexpr int s_growBlocksizeCount { 2 }; 77 77 static constexpr int s_growBlocksizeFactor { 2 }; 78 static constexpr float s_reduceBlocksizeLimit { 0. 20};78 static constexpr float s_reduceBlocksizeLimit { 0.5 }; 79 79 static constexpr int s_reduceBlocksizeCount { 2 }; 80 80 static constexpr float s_reduceBlocksizeFactor { 0.5 }; … … 142 142 Lock adapterLock; 143 143 Condition adapterCondition; 144 uint64_t queueSize { 0 };145 144 bool isDownloadSuspended { false }; 146 145 GRefPtr<GstAdapter> adapter; 147 146 GRefPtr<GstEvent> httpHeadersEvent; 148 147 GUniquePtr<GstStructure> httpHeaders; 148 WallTime downloadStartTime { WallTime::nan() }; 149 uint64_t totalDownloadedBytes { 0 }; 149 150 }; 150 151 … … 181 182 static gboolean webKitWebSrcUnLockStop(GstBaseSrc*); 182 183 static void webKitWebSrcSetContext(GstElement*, GstContext*); 184 static void restartLoaderIfNeeded(WebKitWebSrc*); 185 static void stopLoaderIfNeeded(WebKitWebSrc*); 183 186 184 187 #define webkit_web_src_parent_class parent_class … … 346 349 } 347 350 351 static void restartLoaderIfNeeded(WebKitWebSrc* src) 352 { 353 WebKitWebSrcPrivate* priv = src->priv; 354 355 if (!priv->isDownloadSuspended) { 356 GST_TRACE_OBJECT(src, "download already active"); 357 return; 358 } 359 360 GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT 361 " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize) 362 , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE); 363 if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) { 364 GST_TRACE_OBJECT(src, "download cannot be stopped/restarted"); 365 return; 366 } 367 GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src))); 368 if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) { 369 GST_TRACE_OBJECT(src, "can't restart download"); 370 return; 371 } 372 373 size_t queueSize = gst_adapter_available(priv->adapter.get()); 374 GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (min %1.0f)", queueSize 375 , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD); 376 377 if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) { 378 GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download"); 379 return; 380 } 381 382 GST_DEBUG_OBJECT(src, "restarting download"); 383 priv->isDownloadSuspended = false; 384 webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false); 385 } 386 387 388 static void stopLoaderIfNeeded(WebKitWebSrc* src) 389 { 390 WebKitWebSrcPrivate* priv = src->priv; 391 392 if (priv->isDownloadSuspended) { 393 GST_TRACE_OBJECT(src, "download already suspended"); 394 return; 395 } 396 397 GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)" 398 , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size 399 , SMALL_MEDIA_RESOURCE_MAX_SIZE); 400 if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) { 401 GST_TRACE_OBJECT(src, "download cannot be stopped/restarted"); 402 return; 403 } 404 405 size_t queueSize = gst_adapter_available(priv->adapter.get()); 406 GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD); 407 if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) { 408 GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download"); 409 return; 410 } 411 412 GST_DEBUG_OBJECT(src, "stopping download"); 413 priv->isDownloadSuspended = true; 414 priv->resource->stop(); 415 } 416 348 417 static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer) 349 418 { … … 358 427 LockHolder adapterLocker(priv->adapterLock); 359 428 GST_DEBUG_OBJECT(src, "Seeking, flushing adapter"); 360 // Discard all the buffers coming before the requested seek position. 361 gst_adapter_flush(priv->adapter.get(), priv->queueSize); 362 priv->queueSize = 0; 429 gst_adapter_clear(priv->adapter.get()); 363 430 } 364 431 uint64_t requestedPosition = priv->requestedPosition; … … 380 447 } 381 448 382 GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, queueSize: %" G_GUINT64_FORMAT ", isDownloadSuspended: %s", 383 boolForPrinting(priv->isFlushing), boolForPrinting(priv->doesHaveEOS), priv->queueSize, 384 boolForPrinting(priv->isDownloadSuspended)); 449 // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be 450 // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These 451 // other buffers can be retrieved with the "non _fast" API. 452 GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing) 453 , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended)); 454 455 if (priv->doesHaveEOS) { 456 GST_DEBUG_OBJECT(src, "EOS"); 457 return GST_FLOW_EOS; 458 } 459 460 unsigned size = gst_base_src_get_blocksize(baseSrc); 461 size_t queueSize; 462 { 463 LockHolder adapterLocker(priv->adapterLock); 464 unsigned retries = 0; 465 queueSize = gst_adapter_available(priv->adapter.get()); 466 GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size); 467 while (!queueSize && !priv->isFlushing) { 468 GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data"); 469 priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] { 470 restartLoaderIfNeeded(src); 471 return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get())); 472 }); 473 queueSize = gst_adapter_available(priv->adapter.get()); 474 GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize); 475 if (queueSize || priv->isFlushing) { 476 // We have data or we're flushing. We can break the loop here. 477 break; 478 } 479 480 // We should keep waiting but we could be in EOS. Let's check the two possibilities: 481 // 1. We are at the end of the file with a known size. 482 // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible. 483 if (priv->haveSize && priv->readPosition >= priv->size) { 484 GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS"); 485 return GST_FLOW_EOS; 486 } 487 GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1); 488 if (!priv->isDownloadSuspended && ++retries >= 10) { 489 GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS"); 490 return GST_FLOW_EOS; 491 } 492 } 493 } 385 494 386 495 if (priv->isFlushing) { 387 496 GST_DEBUG_OBJECT(src, "Flushing"); 388 497 return GST_FLOW_FLUSHING; 389 }390 391 if (priv->doesHaveEOS) {392 GST_DEBUG_OBJECT(src, "EOS");393 return GST_FLOW_EOS;394 }395 396 unsigned size = gst_base_src_get_blocksize(baseSrc);397 bool isAdapterDrained = false;398 {399 LockHolder adapterLocker(priv->adapterLock);400 unsigned retries = 0;401 size_t available = gst_adapter_available_fast(priv->adapter.get());402 while (available < size && !isAdapterDrained) {403 priv->adapterCondition.waitFor(priv->adapterLock, 100_ms, [&] {404 return gst_adapter_available_fast(priv->adapter.get()) >= size;405 });406 retries++;407 available = gst_adapter_available_fast(priv->adapter.get());408 if (available && available < size) {409 GST_TRACE_OBJECT(src, "did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, available);410 size = available;411 } else if (retries > 3)412 isAdapterDrained = true;413 }414 }415 416 if (isAdapterDrained) {417 GST_DEBUG_OBJECT(src, "Adapter still empty after 400 milli-seconds of waiting, assuming EOS");418 return GST_FLOW_EOS;419 498 } 420 499 … … 430 509 431 510 { 432 GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);433 511 LockHolder adapterLocker(priv->adapterLock); 512 queueSize = gst_adapter_available(priv->adapter.get()); 513 if (queueSize < size) { 514 GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize); 515 size = queueSize; 516 } else 517 GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size); 434 518 if (size) { 435 *buffer = gst_adapter_take_buffer _fast(priv->adapter.get(), size);519 *buffer = gst_adapter_take_buffer(priv->adapter.get(), size); 436 520 RELEASE_ASSERT(*buffer); 437 438 priv->queueSize -= size;439 521 440 522 GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position; … … 450 532 priv->wasSeeking = false; 451 533 452 if (!priv->doesHaveEOS && priv->haveSize && priv->isSeekable 453 && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && priv->readPosition 454 && (priv->readPosition != priv->size) 455 && (priv->queueSize < (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD)) 456 && GST_STATE(src) >= GST_STATE_PAUSED && priv->isDownloadSuspended) { 457 GST_DEBUG_OBJECT(src, "[Buffering] Adapter running out of data, restarting download"); 458 priv->isDownloadSuspended = false; 459 webKitWebSrcMakeRequest(baseSrc, false); 460 } 461 462 } else 463 GST_ERROR_OBJECT(src, "Empty adapter?"); 534 restartLoaderIfNeeded(src); 535 } else { 536 GST_ERROR_OBJECT(src, "Empty adapter!"); 537 ASSERT_NOT_REACHED(); 538 } 464 539 } 465 540 … … 549 624 priv->doesHaveEOS = false; 550 625 priv->isFlushing = false; 626 priv->downloadStartTime = WallTime::nan(); 551 627 552 628 priv->didPassAccessControlCheck = false; … … 672 748 LockHolder adapterLocker(priv->adapterLock); 673 749 gst_adapter_clear(priv->adapter.get()); 674 priv->queueSize = 0;675 750 } 676 751 … … 730 805 priv->requestedPosition = segment->start; 731 806 priv->stopPosition = segment->stop; 807 priv->adapterCondition.notifyOne(); 732 808 return TRUE; 733 809 } … … 768 844 src->priv->isFlushing = true; 769 845 src->priv->responseCondition.notifyOne(); 846 src->priv->adapterCondition.notifyOne(); 770 847 return TRUE; 771 848 } … … 797 874 src->priv->isFlushing = true; 798 875 src->priv->responseCondition.notifyOne(); 799 break; 800 } default: 876 src->priv->adapterCondition.notifyOne(); 877 break; 878 } 879 default: 801 880 break; 802 881 } … … 919 998 GST_LOG_OBJECT(src, "Checking to update blocksize. Read: %u, current blocksize: %u", bytesRead, blocksize); 920 999 921 if (bytesRead > =blocksize * s_growBlocksizeLimit) {1000 if (bytesRead > blocksize * s_growBlocksizeLimit) { 922 1001 m_reduceBlocksizeCount = 0; 923 1002 m_increaseBlocksizeCount++; … … 1071 1150 GST_LOG_OBJECT(src, "Have %d bytes of data", length); 1072 1151 LockHolder locker(priv->responseLock); 1152 // Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and 1153 // that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the 1154 // sending time from the receiving time, it is better to ignore it. 1155 if (!isnan(priv->downloadStartTime)) { 1156 priv->totalDownloadedBytes += length; 1157 double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds(); 1158 GST_TRACE_OBJECT(src, "downloaded %u bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart 1159 , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0); 1160 } else { 1161 priv->downloadStartTime = WallTime::now(); 1162 priv->totalDownloadedBytes = 0; 1163 } 1073 1164 1074 1165 uint64_t newPosition = priv->readPosition + length; … … 1105 1196 LockHolder adapterLocker(priv->adapterLock); 1106 1197 GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length); 1107 priv->queueSize += length;1108 1198 gst_adapter_push(priv->adapter.get(), buffer); 1109 GST_TRACE_OBJECT(src, "[Buffering] isDownloadSuspended: %s", boolForPrinting(priv->isDownloadSuspended)); 1110 if (priv->haveSize && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && (priv->queueSize > (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD)) 1111 && !priv->isDownloadSuspended && priv->isSeekable) { 1112 GST_TRACE_OBJECT(src, "[Buffering] queueSize: %" G_GUINT64_FORMAT ", threshold: %f", priv->queueSize, 1113 priv->size * HIGH_QUEUE_FACTOR_THRESHOLD); 1114 GST_DEBUG_OBJECT(src, "[Buffering] Stopping resource loader"); 1115 priv->isDownloadSuspended = true; 1116 priv->resource->stop(); 1117 return; 1118 } 1199 stopLoaderIfNeeded(src); 1119 1200 priv->adapterCondition.notifyOne(); 1120 1201 }
Note: See TracChangeset
for help on using the changeset viewer.