Changeset 256728 in webkit


Ignore:
Timestamp:
Feb 17, 2020 4:07:08 AM (4 years ago)
Author:
commit-queue@webkit.org
Message:

[Curl] Use shared single thread for WebSocket connections
https://bugs.webkit.org/show_bug.cgi?id=187984

Patch by Takashi Komori <Takashi.Komori@sony.com> on 2020-02-17
Reviewed by Fujii Hironori.

This patch suppresses invoking worker threads for websocket connections.
CurlStreamScheduler starts up to one worker thread.

No new tests. Covered by existing WebSocket tests.

  • platform/Curl.cmake:
  • platform/network/curl/CurlContext.cpp:

(WebCore::CurlContext::streamScheduler):
(WebCore::CurlHandle::getActiveSocket):
(WebCore::CurlHandle::send):
(WebCore::CurlHandle::receive):
(WebCore::CurlSocketHandle::CurlSocketHandle): Deleted.
(WebCore::CurlSocketHandle::connect): Deleted.
(WebCore::CurlSocketHandle::send): Deleted.
(WebCore::CurlSocketHandle::receive): Deleted.
(WebCore::CurlSocketHandle::wait): Deleted.

  • platform/network/curl/CurlContext.h:
  • platform/network/curl/CurlStream.cpp: Added.

(WebCore::CurlStream::CurlStream):
(WebCore::CurlStream::~CurlStream):
(WebCore::CurlStream::destroyHandle):
(WebCore::CurlStream::send):
(WebCore::CurlStream::appendMonitoringFd):
(WebCore::CurlStream::tryToTransfer):
(WebCore::CurlStream::tryToReceive):
(WebCore::CurlStream::tryToSend):
(WebCore::CurlStream::notifyFailure):

  • platform/network/curl/CurlStream.h: Added.

(WebCore::CurlStream::create):

  • platform/network/curl/CurlStreamScheduler.cpp: Added.

(WebCore::CurlStreamScheduler::CurlStreamScheduler):
(WebCore::CurlStreamScheduler::~CurlStreamScheduler):
(WebCore::CurlStreamScheduler::createStream):
(WebCore::CurlStreamScheduler::destroyStream):
(WebCore::CurlStreamScheduler::send):
(WebCore::CurlStreamScheduler::callOnWorkerThread):
(WebCore::CurlStreamScheduler::callClientOnMainThread):
(WebCore::CurlStreamScheduler::startThreadIfNeeded):
(WebCore::CurlStreamScheduler::stopThreadIfNoMoreJobRunning):
(WebCore::CurlStreamScheduler::executeTasks):
(WebCore::CurlStreamScheduler::workerThread):

  • platform/network/curl/CurlStreamScheduler.h: Added.
  • platform/network/curl/SocketStreamHandleImpl.h:

(WebCore::SocketStreamHandleImpl::isStreamInvalidated):

  • platform/network/curl/SocketStreamHandleImplCurl.cpp:

(WebCore::SocketStreamHandleImpl::SocketStreamHandleImpl):
(WebCore::SocketStreamHandleImpl::~SocketStreamHandleImpl):
(WebCore::SocketStreamHandleImpl::platformSendInternal):
(WebCore::SocketStreamHandleImpl::platformClose):
(WebCore::SocketStreamHandleImpl::didOpen):
(WebCore::SocketStreamHandleImpl::didSendData):
(WebCore::SocketStreamHandleImpl::didReceiveData):
(WebCore::SocketStreamHandleImpl::didFail):
(WebCore::SocketStreamHandleImpl::destructStream):
(WebCore::SocketStreamHandleImpl::threadEntryPoint): Deleted.
(WebCore::SocketStreamHandleImpl::handleError): Deleted.
(WebCore::SocketStreamHandleImpl::stopThread): Deleted.
(WebCore::SocketStreamHandleImpl::callOnWorkerThread): Deleted.
(WebCore::SocketStreamHandleImpl::executeTasks): Deleted.

Location:
trunk/Source/WebCore
Files:
4 added
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/Source/WebCore/ChangeLog

    r256726 r256728  
     12020-02-17  Takashi Komori  <Takashi.Komori@sony.com>
     2
     3        [Curl] Use shared single thread for WebSocket connections
     4        https://bugs.webkit.org/show_bug.cgi?id=187984
     5
     6        Reviewed by Fujii Hironori.
     7
     8        This patch suppresses invoking worker threads for websocket connections.
     9        CurlStreamScheduler starts up to one worker thread.
     10
     11        No new tests. Covered by existing WebSocket tests.
     12
     13        * platform/Curl.cmake:
     14        * platform/network/curl/CurlContext.cpp:
     15        (WebCore::CurlContext::streamScheduler):
     16        (WebCore::CurlHandle::getActiveSocket):
     17        (WebCore::CurlHandle::send):
     18        (WebCore::CurlHandle::receive):
     19        (WebCore::CurlSocketHandle::CurlSocketHandle): Deleted.
     20        (WebCore::CurlSocketHandle::connect): Deleted.
     21        (WebCore::CurlSocketHandle::send): Deleted.
     22        (WebCore::CurlSocketHandle::receive): Deleted.
     23        (WebCore::CurlSocketHandle::wait): Deleted.
     24        * platform/network/curl/CurlContext.h:
     25        * platform/network/curl/CurlStream.cpp: Added.
     26        (WebCore::CurlStream::CurlStream):
     27        (WebCore::CurlStream::~CurlStream):
     28        (WebCore::CurlStream::destroyHandle):
     29        (WebCore::CurlStream::send):
     30        (WebCore::CurlStream::appendMonitoringFd):
     31        (WebCore::CurlStream::tryToTransfer):
     32        (WebCore::CurlStream::tryToReceive):
     33        (WebCore::CurlStream::tryToSend):
     34        (WebCore::CurlStream::notifyFailure):
     35        * platform/network/curl/CurlStream.h: Added.
     36        (WebCore::CurlStream::create):
     37        * platform/network/curl/CurlStreamScheduler.cpp: Added.
     38        (WebCore::CurlStreamScheduler::CurlStreamScheduler):
     39        (WebCore::CurlStreamScheduler::~CurlStreamScheduler):
     40        (WebCore::CurlStreamScheduler::createStream):
     41        (WebCore::CurlStreamScheduler::destroyStream):
     42        (WebCore::CurlStreamScheduler::send):
     43        (WebCore::CurlStreamScheduler::callOnWorkerThread):
     44        (WebCore::CurlStreamScheduler::callClientOnMainThread):
     45        (WebCore::CurlStreamScheduler::startThreadIfNeeded):
     46        (WebCore::CurlStreamScheduler::stopThreadIfNoMoreJobRunning):
     47        (WebCore::CurlStreamScheduler::executeTasks):
     48        (WebCore::CurlStreamScheduler::workerThread):
     49        * platform/network/curl/CurlStreamScheduler.h: Added.
     50        * platform/network/curl/SocketStreamHandleImpl.h:
     51        (WebCore::SocketStreamHandleImpl::isStreamInvalidated):
     52        * platform/network/curl/SocketStreamHandleImplCurl.cpp:
     53        (WebCore::SocketStreamHandleImpl::SocketStreamHandleImpl):
     54        (WebCore::SocketStreamHandleImpl::~SocketStreamHandleImpl):
     55        (WebCore::SocketStreamHandleImpl::platformSendInternal):
     56        (WebCore::SocketStreamHandleImpl::platformClose):
     57        (WebCore::SocketStreamHandleImpl::didOpen):
     58        (WebCore::SocketStreamHandleImpl::didSendData):
     59        (WebCore::SocketStreamHandleImpl::didReceiveData):
     60        (WebCore::SocketStreamHandleImpl::didFail):
     61        (WebCore::SocketStreamHandleImpl::destructStream):
     62        (WebCore::SocketStreamHandleImpl::threadEntryPoint): Deleted.
     63        (WebCore::SocketStreamHandleImpl::handleError): Deleted.
     64        (WebCore::SocketStreamHandleImpl::stopThread): Deleted.
     65        (WebCore::SocketStreamHandleImpl::callOnWorkerThread): Deleted.
     66        (WebCore::SocketStreamHandleImpl::executeTasks): Deleted.
     67
    1682020-02-17  Carlos Garcia Campos  <cgarcia@igalia.com>
    269
  • trunk/Source/WebCore/platform/Curl.cmake

    r253430 r256728  
    2323    platform/network/curl/CurlSSLHandle.cpp
    2424    platform/network/curl/CurlSSLVerifier.cpp
     25    platform/network/curl/CurlStream.cpp
     26    platform/network/curl/CurlStreamScheduler.cpp
    2527    platform/network/curl/DNSResolveQueueCurl.cpp
    2628    platform/network/curl/NetworkStorageSessionCurl.cpp
     
    5961    platform/network/curl/CurlSSLHandle.h
    6062    platform/network/curl/CurlSSLVerifier.h
     63    platform/network/curl/CurlStream.h
     64    platform/network/curl/CurlStreamScheduler.h
    6165    platform/network/curl/DNSResolveQueueCurl.h
    6266    platform/network/curl/DownloadBundle.h
  • trunk/Source/WebCore/platform/network/curl/CurlContext.cpp

    r256487 r256728  
    3333#include "CurlSSLHandle.h"
    3434#include "CurlSSLVerifier.h"
     35#include "CurlStreamScheduler.h"
    3536#include "HTTPHeaderMap.h"
    3637#include <NetworkLoadMetrics.h>
     
    145146
    146147    curl_easy_cleanup(curl);
     148}
     149
     150CurlStreamScheduler& CurlContext::streamScheduler()
     151{
     152    static NeverDestroyed<CurlStreamScheduler> sharedInstance;
     153    return sharedInstance;
    147154}
    148155
     
    889896#endif
    890897
    891 // CurlSocketHandle
    892 
    893 CurlSocketHandle::CurlSocketHandle(const URL& url, Function<void(CURLcode)>&& errorHandler)
    894     : m_errorHandler(WTFMove(errorHandler))
    895 {
    896     // Libcurl is not responsible for the protocol handling. It just handles connection.
    897     // Only scheme, host and port is required.
    898     URL urlForConnection;
    899     urlForConnection.setProtocol(url.protocolIs("wss") ? "https" : "http");
    900     urlForConnection.setHostAndPort(url.hostAndPort());
    901     setUrl(urlForConnection);
    902 
    903     enableConnectionOnly();
    904 }
    905 
    906 bool CurlSocketHandle::connect()
    907 {
    908     CURLcode errorCode = perform();
    909     if (errorCode != CURLE_OK) {
    910         m_errorHandler(errorCode);
    911         return false;
    912     }
    913 
    914     return true;
    915 }
    916 
    917 size_t CurlSocketHandle::send(const uint8_t* buffer, size_t size)
    918 {
    919     size_t totalBytesSent = 0;
    920 
    921     while (totalBytesSent < size) {
    922         size_t bytesSent = 0;
    923         CURLcode errorCode = curl_easy_send(handle(), buffer + totalBytesSent, size - totalBytesSent, &bytesSent);
    924         if (errorCode != CURLE_OK) {
    925             if (errorCode != CURLE_AGAIN)
    926                 m_errorHandler(errorCode);
    927             break;
    928         }
    929 
    930         totalBytesSent += bytesSent;
    931     }
    932 
    933     return totalBytesSent;
    934 }
    935 
    936 Optional<size_t> CurlSocketHandle::receive(uint8_t* buffer, size_t bufferSize)
    937 {
    938     size_t bytesRead = 0;
    939 
    940     CURLcode errorCode = curl_easy_recv(handle(), buffer, bufferSize, &bytesRead);
    941     if (errorCode != CURLE_OK) {
    942         if (errorCode != CURLE_AGAIN)
    943             m_errorHandler(errorCode);
    944 
    945         return WTF::nullopt;
    946     }
    947 
    948     return bytesRead;
    949 }
    950 
    951 Optional<CurlSocketHandle::WaitResult> CurlSocketHandle::wait(const Seconds& timeout, bool alsoWaitForWrite)
     898Expected<curl_socket_t, CURLcode> CurlHandle::getActiveSocket()
    952899{
    953900    curl_socket_t socket;
    954     CURLcode errorCode = curl_easy_getinfo(handle(), CURLINFO_ACTIVESOCKET, &socket);
    955     if (errorCode != CURLE_OK) {
    956         m_errorHandler(errorCode);
    957         return WTF::nullopt;
    958     }
    959 
    960     int64_t usec = timeout.microsecondsAs<int64_t>();
    961 
    962     struct timeval selectTimeout;
    963     if (usec <= 0) {
    964         selectTimeout.tv_sec = 0;
    965         selectTimeout.tv_usec = 0;
    966     } else {
    967         selectTimeout.tv_sec = usec / 1000000;
    968         selectTimeout.tv_usec = usec % 1000000;
    969     }
    970 
    971     int rc = 0;
    972     int maxfd = static_cast<int>(socket) + 1;
    973     fd_set fdread;
    974     fd_set fdwrite;
    975     fd_set fderr;
    976 
    977     // Retry 'select' if it was interrupted by a process signal.
    978     do {
    979         FD_ZERO(&fdread);
    980         FD_SET(socket, &fdread);
    981 
    982         FD_ZERO(&fdwrite);
    983         if (alsoWaitForWrite)
    984             FD_SET(socket, &fdwrite);
    985 
    986         FD_ZERO(&fderr);
    987         FD_SET(socket, &fderr);
    988 
    989         rc = ::select(maxfd, &fdread, &fdwrite, &fderr, &selectTimeout);
    990     } while (rc == -1 && errno == EINTR);
    991 
    992     if (rc <= 0)
    993         return WTF::nullopt;
    994 
    995     WaitResult result;
    996     result.readable = FD_ISSET(socket, &fdread) || FD_ISSET(socket, &fderr);
    997     result.writable = FD_ISSET(socket, &fdwrite);
    998     return result;
     901
     902    CURLcode errorCode = curl_easy_getinfo(m_handle, CURLINFO_ACTIVESOCKET, &socket);
     903    if (errorCode != CURLE_OK)
     904        return makeUnexpected(errorCode);
     905
     906    return socket;
     907}
     908
     909CURLcode CurlHandle::send(const uint8_t* buffer, size_t bufferSize, size_t& bytesSent)
     910{
     911    return curl_easy_send(m_handle, buffer, bufferSize, &bytesSent);
     912}
     913
     914CURLcode CurlHandle::receive(uint8_t* buffer, size_t bufferSize, size_t& bytesRead)
     915{
     916    return curl_easy_recv(m_handle, buffer, bufferSize, &bytesRead);
    999917}
    1000918
  • trunk/Source/WebCore/platform/network/curl/CurlContext.h

    r248762 r256728  
    9090
    9191class CurlRequestScheduler;
     92class CurlStreamScheduler;
    9293
    9394class CurlContext : public CurlGlobal {
     
    102103
    103104    CurlRequestScheduler& scheduler() { return *m_scheduler; }
     105    CurlStreamScheduler& streamScheduler();
    104106
    105107    // Proxy
     
    294296    static long long maxCurlOffT();
    295297
     298    // socket
     299    Expected<curl_socket_t, CURLcode> getActiveSocket();
     300    CURLcode send(const uint8_t*, size_t, size_t&);
     301    CURLcode receive(uint8_t*, size_t, size_t&);
     302
    296303#ifndef NDEBUG
    297304    void enableVerboseIfUsed();
     
    314321};
    315322
    316 class CurlSocketHandle : public CurlHandle {
    317     WTF_MAKE_NONCOPYABLE(CurlSocketHandle);
    318 
    319 public:
    320     struct WaitResult {
    321         bool readable { false };
    322         bool writable { false };
    323     };
    324 
    325     CurlSocketHandle(const URL&, Function<void(CURLcode)>&& errorHandler);
    326 
    327     bool connect();
    328     size_t send(const uint8_t*, size_t);
    329     Optional<size_t> receive(uint8_t*, size_t);
    330     Optional<WaitResult> wait(const Seconds& timeout, bool alsoWaitForWrite);
    331 
    332 private:
    333     Function<void(CURLcode)> m_errorHandler;
    334 };
    335 
    336323} // namespace WebCore
  • trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h

    r248182 r256728  
    22 * Copyright (C) 2009-2018 Apple Inc. All rights reserved.
    33 * Copyright (C) 2009 Google Inc.  All rights reserved.
     4 * Copyright (C) 2020 Sony Interactive Entertainment Inc.
    45 *
    56 * Redistribution and use in source and binary forms, with or without
     
    3233#pragma once
    3334
    34 #include "CurlContext.h"
     35#include "CurlStream.h"
    3536#include "SocketStreamHandle.h"
    3637#include <pal/SessionID.h>
    37 #include <wtf/Function.h>
    38 #include <wtf/Lock.h>
    39 #include <wtf/MessageQueue.h>
    40 #include <wtf/RefCounted.h>
    4138#include <wtf/StreamBuffer.h>
    42 #include <wtf/Threading.h>
    43 #include <wtf/UniqueArray.h>
    4439
    4540namespace WebCore {
     
    4843class StorageSessionProvider;
    4944
    50 class SocketStreamHandleImpl : public SocketStreamHandle {
     45class SocketStreamHandleImpl : public SocketStreamHandle, public CurlStream::Client {
    5146public:
    5247    static Ref<SocketStreamHandleImpl> create(const URL& url, SocketStreamHandleClient& client, PAL::SessionID, const String&, SourceApplicationAuditToken&&, const StorageSessionProvider* provider) { return adoptRef(*new SocketStreamHandleImpl(url, client, provider)); }
     
    6560    bool sendPendingData();
    6661
    67     void threadEntryPoint(const URL&);
    68     void handleError(CURLcode);
    69     void stopThread();
     62    void didOpen(CurlStreamID) final;
     63    void didSendData(CurlStreamID, size_t) final;
     64    void didReceiveData(CurlStreamID, const char*, size_t) final;
     65    void didFail(CurlStreamID, CURLcode) final;
    7066
    71     void callOnWorkerThread(Function<void()>&&);
    72     void executeTasks();
    73 
    74     static const size_t kReadBufferSize = 4 * 1024;
     67    bool isStreamInvalidated() { return m_streamID == invalidCurlStreamID; }
     68    void destructStream();
    7569
    7670    RefPtr<const StorageSessionProvider> m_storageSessionProvider;
    77     RefPtr<Thread> m_workerThread;
    78     std::atomic<bool> m_running { true };
    79 
    80     MessageQueue<Function<void()>> m_taskQueue;
    81 
    82     bool m_hasPendingWriteData { false };
    83     size_t m_writeBufferSize { 0 };
    84     size_t m_writeBufferOffset { 0 };
    85     UniqueArray<uint8_t> m_writeBuffer;
    8671
    8772    StreamBuffer<uint8_t, 1024 * 1024> m_buffer;
    8873    static const unsigned maxBufferSize = 100 * 1024 * 1024;
     74
     75    CurlStreamScheduler& m_scheduler;
     76    CurlStreamID m_streamID { invalidCurlStreamID };
     77    unsigned m_totalSendDataSize { 0 };
    8978};
    9079
  • trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp

    r248846 r256728  
    22 * Copyright (C) 2009 Brent Fulgham.  All rights reserved.
    33 * Copyright (C) 2009 Google Inc.  All rights reserved.
    4  * Copyright (C) 2018 Sony Interactive Entertainment Inc.
     4 * Copyright (C) 2020 Sony Interactive Entertainment Inc.
    55 *
    66 * Redistribution and use in source and binary forms, with or without
     
    3636#if USE(CURL)
    3737
     38#include "CurlStreamScheduler.h"
    3839#include "DeprecatedGlobalSettings.h"
    39 #include "Logging.h"
    4040#include "SocketStreamError.h"
    4141#include "SocketStreamHandleClient.h"
    4242#include "StorageSessionProvider.h"
    43 #include <wtf/MainThread.h>
    44 #include <wtf/URL.h>
    45 #include <wtf/text/CString.h>
    4643
    4744namespace WebCore {
     
    5047    : SocketStreamHandle(url, client)
    5148    , m_storageSessionProvider(provider)
     49    , m_scheduler(CurlContext::singleton().streamScheduler())
    5250{
    53     LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);
    54     ASSERT(isMainThread());
    55 
    5651    // FIXME: Using DeprecatedGlobalSettings from here is a layering violation.
    5752    if (m_url.protocolIs("wss") && DeprecatedGlobalSettings::allowsAnySSLCertificate())
    5853        CurlContext::singleton().sslHandle().setIgnoreSSLErrors(true);
    5954
    60     m_workerThread = Thread::create("WebSocket thread", [this, protectedThis = makeRef(*this), url = url.isolatedCopy()] {
    61         threadEntryPoint(url);
    62     });
     55    m_streamID = m_scheduler.createStream(m_url, *this);
    6356}
    6457
    6558SocketStreamHandleImpl::~SocketStreamHandleImpl()
    6659{
    67     LOG(Network, "SocketStreamHandle %p delete", this);
    68     stopThread();
     60    destructStream();
    6961}
    7062
    7163Optional<size_t> SocketStreamHandleImpl::platformSendInternal(const uint8_t* data, size_t length)
    7264{
    73     LOG(Network, "SocketStreamHandle %p platformSend", this);
    74     ASSERT(isMainThread());
     65    if (isStreamInvalidated())
     66        return WTF::nullopt;
    7567
    76     if (m_hasPendingWriteData)
     68    if (m_totalSendDataSize + length > maxBufferSize)
    7769        return 0;
     70    m_totalSendDataSize += length;
    7871
    79     m_hasPendingWriteData = true;
     72    auto buffer = makeUniqueArray<uint8_t>(length);
     73    memcpy(buffer.get(), data, length);
    8074
    81     auto writeBuffer = makeUniqueArray<uint8_t>(length);
    82     memcpy(writeBuffer.get(), data, length);
    83 
    84     callOnWorkerThread([this, writeBuffer = WTFMove(writeBuffer), writeBufferSize = length]() mutable {
    85         ASSERT(!isMainThread());
    86         m_writeBuffer = WTFMove(writeBuffer);
    87         m_writeBufferSize = writeBufferSize;
    88         m_writeBufferOffset = 0;
    89     });
    90 
     75    m_scheduler.send(m_streamID, WTFMove(buffer), length);
    9176    return length;
    9277}
     
    9479void SocketStreamHandleImpl::platformClose()
    9580{
    96     LOG(Network, "SocketStreamHandle %p platformClose", this);
    97     ASSERT(isMainThread());
     81    destructStream();
    9882
    9983    if (m_state == Closed)
     
    10185    m_state = Closed;
    10286
    103     stopThread();
    10487    m_client.didCloseSocketStream(*this);
    10588}
    10689
    107 void SocketStreamHandleImpl::threadEntryPoint(const URL& url)
     90void SocketStreamHandleImpl::didOpen(CurlStreamID)
    10891{
    109     ASSERT(!isMainThread());
     92    if (m_state != Connecting)
     93        return;
     94    m_state = Open;
    11095
    111     CurlSocketHandle socket { url, [this](CURLcode errorCode) {
    112         handleError(errorCode);
    113     }};
     96    m_client.didOpenSocketStream(*this);
     97}
    11498
    115     // Connect to host
    116     if (!socket.connect())
     99void SocketStreamHandleImpl::didSendData(CurlStreamID, size_t length)
     100{
     101    ASSERT(m_totalSendDataSize - length >= 0);
     102
     103    m_totalSendDataSize -= length;
     104    sendPendingData();
     105}
     106
     107void SocketStreamHandleImpl::didReceiveData(CurlStreamID, const char* data, size_t length)
     108{
     109    if (m_state != Open)
    117110        return;
    118111
    119     callOnMainThread([this, protectedThis = makeRef(*this)] {
    120         if (m_state == Connecting) {
    121             m_state = Open;
    122             m_client.didOpenSocketStream(*this);
    123         }
    124     });
    125 
    126     while (m_running) {
    127         executeTasks();
    128 
    129         auto result = socket.wait(20_ms, m_writeBuffer.get());
    130         if (!result)
    131             continue;
    132 
    133         // These logic only run when there's data waiting.
    134         if (result->writable && m_running) {
    135             auto bytesSent = socket.send(m_writeBuffer.get() + m_writeBufferOffset, m_writeBufferSize - m_writeBufferOffset);
    136             m_writeBufferOffset += bytesSent;
    137 
    138             if (m_writeBufferSize <= m_writeBufferOffset) {
    139                 m_writeBuffer = nullptr;
    140                 m_writeBufferSize = 0;
    141                 m_writeBufferOffset = 0;
    142 
    143                 callOnMainThread([this, protectedThis = makeRef(*this)] {
    144                     m_hasPendingWriteData = false;
    145                     sendPendingData();
    146                 });
    147             }
    148         }
    149 
    150         if (result->readable && m_running) {
    151             auto readBuffer = makeUniqueArray<uint8_t>(kReadBufferSize);
    152             auto bytesRead = socket.receive(readBuffer.get(), kReadBufferSize);
    153             // `nullopt` result means nothing to handle at this moment.
    154             if (!bytesRead)
    155                 continue;
    156 
    157             // 0 bytes indicates a closed connection.
    158             if (!*bytesRead) {
    159                 m_running = false;
    160                 callOnMainThread([this, protectedThis = makeRef(*this)] {
    161                     close();
    162                 });
    163                 break;
    164             }
    165 
    166             callOnMainThread([this, protectedThis = makeRef(*this), buffer = WTFMove(readBuffer), size = *bytesRead ] {
    167                 if (m_state == Open)
    168                     m_client.didReceiveSocketStreamData(*this, reinterpret_cast<const char*>(buffer.get()), size);
    169             });
    170         }
    171     }
    172 
    173     m_writeBuffer = nullptr;
     112    m_client.didReceiveSocketStreamData(*this, data, length);
    174113}
    175114
    176 void SocketStreamHandleImpl::handleError(CURLcode errorCode)
     115void SocketStreamHandleImpl::didFail(CurlStreamID, CURLcode errorCode)
    177116{
    178     m_running = false;
    179     callOnMainThread([this, protectedThis = makeRef(*this), errorCode, localizedDescription = CurlHandle::errorDescription(errorCode).isolatedCopy()] {
    180         if (m_state == Closed)
    181             return;
     117    destructStream();
    182118
    183         if (errorCode == CURLE_RECV_ERROR)
    184             m_client.didFailToReceiveSocketStreamData(*this);
    185         else
    186             m_client.didFailSocketStream(*this, SocketStreamError(static_cast<int>(errorCode), { }, localizedDescription));
    187     });
     119    if (m_state == Closed)
     120        return;
     121
     122    if (errorCode == CURLE_RECV_ERROR)
     123        m_client.didFailToReceiveSocketStreamData(*this);
     124    else
     125        m_client.didFailSocketStream(*this, SocketStreamError(errorCode, m_url, CurlHandle::errorDescription(errorCode)));
    188126}
    189127
    190 void SocketStreamHandleImpl::stopThread()
     128void SocketStreamHandleImpl::destructStream()
    191129{
    192     ASSERT(isMainThread());
     130    if (isStreamInvalidated())
     131        return;
    193132
    194     m_running = false;
    195 
    196     if (m_workerThread) {
    197         m_workerThread->waitForCompletion();
    198         m_workerThread = nullptr;
    199     }
    200 }
    201 
    202 void SocketStreamHandleImpl::callOnWorkerThread(Function<void()>&& task)
    203 {
    204     ASSERT(isMainThread());
    205     m_taskQueue.append(makeUnique<Function<void()>>(WTFMove(task)));
    206 }
    207 
    208 void SocketStreamHandleImpl::executeTasks()
    209 {
    210     ASSERT(!isMainThread());
    211 
    212     auto tasks = m_taskQueue.takeAllMessages();
    213     for (auto& task : tasks)
    214         (*task)();
     133    m_scheduler.destroyStream(m_streamID);
     134    m_streamID = invalidCurlStreamID;
    215135}
    216136
Note: See TracChangeset for help on using the changeset viewer.