Changeset 256728 in webkit
- Timestamp:
- Feb 17, 2020 4:07:08 AM (4 years ago)
- Location:
- trunk/Source/WebCore
- Files:
-
- 4 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Source/WebCore/ChangeLog
r256726 r256728 1 2020-02-17 Takashi Komori <Takashi.Komori@sony.com> 2 3 [Curl] Use shared single thread for WebSocket connections 4 https://bugs.webkit.org/show_bug.cgi?id=187984 5 6 Reviewed by Fujii Hironori. 7 8 This patch suppresses invoking worker threads for websocket connections. 9 CurlStreamScheduler starts up to one worker thread. 10 11 No new tests. Covered by existing WebSocket tests. 12 13 * platform/Curl.cmake: 14 * platform/network/curl/CurlContext.cpp: 15 (WebCore::CurlContext::streamScheduler): 16 (WebCore::CurlHandle::getActiveSocket): 17 (WebCore::CurlHandle::send): 18 (WebCore::CurlHandle::receive): 19 (WebCore::CurlSocketHandle::CurlSocketHandle): Deleted. 20 (WebCore::CurlSocketHandle::connect): Deleted. 21 (WebCore::CurlSocketHandle::send): Deleted. 22 (WebCore::CurlSocketHandle::receive): Deleted. 23 (WebCore::CurlSocketHandle::wait): Deleted. 24 * platform/network/curl/CurlContext.h: 25 * platform/network/curl/CurlStream.cpp: Added. 26 (WebCore::CurlStream::CurlStream): 27 (WebCore::CurlStream::~CurlStream): 28 (WebCore::CurlStream::destroyHandle): 29 (WebCore::CurlStream::send): 30 (WebCore::CurlStream::appendMonitoringFd): 31 (WebCore::CurlStream::tryToTransfer): 32 (WebCore::CurlStream::tryToReceive): 33 (WebCore::CurlStream::tryToSend): 34 (WebCore::CurlStream::notifyFailure): 35 * platform/network/curl/CurlStream.h: Added. 36 (WebCore::CurlStream::create): 37 * platform/network/curl/CurlStreamScheduler.cpp: Added. 38 (WebCore::CurlStreamScheduler::CurlStreamScheduler): 39 (WebCore::CurlStreamScheduler::~CurlStreamScheduler): 40 (WebCore::CurlStreamScheduler::createStream): 41 (WebCore::CurlStreamScheduler::destroyStream): 42 (WebCore::CurlStreamScheduler::send): 43 (WebCore::CurlStreamScheduler::callOnWorkerThread): 44 (WebCore::CurlStreamScheduler::callClientOnMainThread): 45 (WebCore::CurlStreamScheduler::startThreadIfNeeded): 46 (WebCore::CurlStreamScheduler::stopThreadIfNoMoreJobRunning): 47 (WebCore::CurlStreamScheduler::executeTasks): 48 (WebCore::CurlStreamScheduler::workerThread): 49 * platform/network/curl/CurlStreamScheduler.h: Added. 50 * platform/network/curl/SocketStreamHandleImpl.h: 51 (WebCore::SocketStreamHandleImpl::isStreamInvalidated): 52 * platform/network/curl/SocketStreamHandleImplCurl.cpp: 53 (WebCore::SocketStreamHandleImpl::SocketStreamHandleImpl): 54 (WebCore::SocketStreamHandleImpl::~SocketStreamHandleImpl): 55 (WebCore::SocketStreamHandleImpl::platformSendInternal): 56 (WebCore::SocketStreamHandleImpl::platformClose): 57 (WebCore::SocketStreamHandleImpl::didOpen): 58 (WebCore::SocketStreamHandleImpl::didSendData): 59 (WebCore::SocketStreamHandleImpl::didReceiveData): 60 (WebCore::SocketStreamHandleImpl::didFail): 61 (WebCore::SocketStreamHandleImpl::destructStream): 62 (WebCore::SocketStreamHandleImpl::threadEntryPoint): Deleted. 63 (WebCore::SocketStreamHandleImpl::handleError): Deleted. 64 (WebCore::SocketStreamHandleImpl::stopThread): Deleted. 65 (WebCore::SocketStreamHandleImpl::callOnWorkerThread): Deleted. 66 (WebCore::SocketStreamHandleImpl::executeTasks): Deleted. 67 1 68 2020-02-17 Carlos Garcia Campos <cgarcia@igalia.com> 2 69 -
trunk/Source/WebCore/platform/Curl.cmake
r253430 r256728 23 23 platform/network/curl/CurlSSLHandle.cpp 24 24 platform/network/curl/CurlSSLVerifier.cpp 25 platform/network/curl/CurlStream.cpp 26 platform/network/curl/CurlStreamScheduler.cpp 25 27 platform/network/curl/DNSResolveQueueCurl.cpp 26 28 platform/network/curl/NetworkStorageSessionCurl.cpp … … 59 61 platform/network/curl/CurlSSLHandle.h 60 62 platform/network/curl/CurlSSLVerifier.h 63 platform/network/curl/CurlStream.h 64 platform/network/curl/CurlStreamScheduler.h 61 65 platform/network/curl/DNSResolveQueueCurl.h 62 66 platform/network/curl/DownloadBundle.h -
trunk/Source/WebCore/platform/network/curl/CurlContext.cpp
r256487 r256728 33 33 #include "CurlSSLHandle.h" 34 34 #include "CurlSSLVerifier.h" 35 #include "CurlStreamScheduler.h" 35 36 #include "HTTPHeaderMap.h" 36 37 #include <NetworkLoadMetrics.h> … … 145 146 146 147 curl_easy_cleanup(curl); 148 } 149 150 CurlStreamScheduler& CurlContext::streamScheduler() 151 { 152 static NeverDestroyed<CurlStreamScheduler> sharedInstance; 153 return sharedInstance; 147 154 } 148 155 … … 889 896 #endif 890 897 891 // CurlSocketHandle 892 893 CurlSocketHandle::CurlSocketHandle(const URL& url, Function<void(CURLcode)>&& errorHandler) 894 : m_errorHandler(WTFMove(errorHandler)) 895 { 896 // Libcurl is not responsible for the protocol handling. It just handles connection. 897 // Only scheme, host and port is required. 898 URL urlForConnection; 899 urlForConnection.setProtocol(url.protocolIs("wss") ? "https" : "http"); 900 urlForConnection.setHostAndPort(url.hostAndPort()); 901 setUrl(urlForConnection); 902 903 enableConnectionOnly(); 904 } 905 906 bool CurlSocketHandle::connect() 907 { 908 CURLcode errorCode = perform(); 909 if (errorCode != CURLE_OK) { 910 m_errorHandler(errorCode); 911 return false; 912 } 913 914 return true; 915 } 916 917 size_t CurlSocketHandle::send(const uint8_t* buffer, size_t size) 918 { 919 size_t totalBytesSent = 0; 920 921 while (totalBytesSent < size) { 922 size_t bytesSent = 0; 923 CURLcode errorCode = curl_easy_send(handle(), buffer + totalBytesSent, size - totalBytesSent, &bytesSent); 924 if (errorCode != CURLE_OK) { 925 if (errorCode != CURLE_AGAIN) 926 m_errorHandler(errorCode); 927 break; 928 } 929 930 totalBytesSent += bytesSent; 931 } 932 933 return totalBytesSent; 934 } 935 936 Optional<size_t> CurlSocketHandle::receive(uint8_t* buffer, size_t bufferSize) 937 { 938 size_t bytesRead = 0; 939 940 CURLcode errorCode = curl_easy_recv(handle(), buffer, bufferSize, &bytesRead); 941 if (errorCode != CURLE_OK) { 942 if (errorCode != CURLE_AGAIN) 943 m_errorHandler(errorCode); 944 945 return WTF::nullopt; 946 } 947 948 return bytesRead; 949 } 950 951 Optional<CurlSocketHandle::WaitResult> CurlSocketHandle::wait(const Seconds& timeout, bool alsoWaitForWrite) 898 Expected<curl_socket_t, CURLcode> CurlHandle::getActiveSocket() 952 899 { 953 900 curl_socket_t socket; 954 CURLcode errorCode = curl_easy_getinfo(handle(), CURLINFO_ACTIVESOCKET, &socket); 955 if (errorCode != CURLE_OK) { 956 m_errorHandler(errorCode); 957 return WTF::nullopt; 958 } 959 960 int64_t usec = timeout.microsecondsAs<int64_t>(); 961 962 struct timeval selectTimeout; 963 if (usec <= 0) { 964 selectTimeout.tv_sec = 0; 965 selectTimeout.tv_usec = 0; 966 } else { 967 selectTimeout.tv_sec = usec / 1000000; 968 selectTimeout.tv_usec = usec % 1000000; 969 } 970 971 int rc = 0; 972 int maxfd = static_cast<int>(socket) + 1; 973 fd_set fdread; 974 fd_set fdwrite; 975 fd_set fderr; 976 977 // Retry 'select' if it was interrupted by a process signal. 978 do { 979 FD_ZERO(&fdread); 980 FD_SET(socket, &fdread); 981 982 FD_ZERO(&fdwrite); 983 if (alsoWaitForWrite) 984 FD_SET(socket, &fdwrite); 985 986 FD_ZERO(&fderr); 987 FD_SET(socket, &fderr); 988 989 rc = ::select(maxfd, &fdread, &fdwrite, &fderr, &selectTimeout); 990 } while (rc == -1 && errno == EINTR); 991 992 if (rc <= 0) 993 return WTF::nullopt; 994 995 WaitResult result; 996 result.readable = FD_ISSET(socket, &fdread) || FD_ISSET(socket, &fderr); 997 result.writable = FD_ISSET(socket, &fdwrite); 998 return result; 901 902 CURLcode errorCode = curl_easy_getinfo(m_handle, CURLINFO_ACTIVESOCKET, &socket); 903 if (errorCode != CURLE_OK) 904 return makeUnexpected(errorCode); 905 906 return socket; 907 } 908 909 CURLcode CurlHandle::send(const uint8_t* buffer, size_t bufferSize, size_t& bytesSent) 910 { 911 return curl_easy_send(m_handle, buffer, bufferSize, &bytesSent); 912 } 913 914 CURLcode CurlHandle::receive(uint8_t* buffer, size_t bufferSize, size_t& bytesRead) 915 { 916 return curl_easy_recv(m_handle, buffer, bufferSize, &bytesRead); 999 917 } 1000 918 -
trunk/Source/WebCore/platform/network/curl/CurlContext.h
r248762 r256728 90 90 91 91 class CurlRequestScheduler; 92 class CurlStreamScheduler; 92 93 93 94 class CurlContext : public CurlGlobal { … … 102 103 103 104 CurlRequestScheduler& scheduler() { return *m_scheduler; } 105 CurlStreamScheduler& streamScheduler(); 104 106 105 107 // Proxy … … 294 296 static long long maxCurlOffT(); 295 297 298 // socket 299 Expected<curl_socket_t, CURLcode> getActiveSocket(); 300 CURLcode send(const uint8_t*, size_t, size_t&); 301 CURLcode receive(uint8_t*, size_t, size_t&); 302 296 303 #ifndef NDEBUG 297 304 void enableVerboseIfUsed(); … … 314 321 }; 315 322 316 class CurlSocketHandle : public CurlHandle {317 WTF_MAKE_NONCOPYABLE(CurlSocketHandle);318 319 public:320 struct WaitResult {321 bool readable { false };322 bool writable { false };323 };324 325 CurlSocketHandle(const URL&, Function<void(CURLcode)>&& errorHandler);326 327 bool connect();328 size_t send(const uint8_t*, size_t);329 Optional<size_t> receive(uint8_t*, size_t);330 Optional<WaitResult> wait(const Seconds& timeout, bool alsoWaitForWrite);331 332 private:333 Function<void(CURLcode)> m_errorHandler;334 };335 336 323 } // namespace WebCore -
trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h
r248182 r256728 2 2 * Copyright (C) 2009-2018 Apple Inc. All rights reserved. 3 3 * Copyright (C) 2009 Google Inc. All rights reserved. 4 * Copyright (C) 2020 Sony Interactive Entertainment Inc. 4 5 * 5 6 * Redistribution and use in source and binary forms, with or without … … 32 33 #pragma once 33 34 34 #include "Curl Context.h"35 #include "CurlStream.h" 35 36 #include "SocketStreamHandle.h" 36 37 #include <pal/SessionID.h> 37 #include <wtf/Function.h>38 #include <wtf/Lock.h>39 #include <wtf/MessageQueue.h>40 #include <wtf/RefCounted.h>41 38 #include <wtf/StreamBuffer.h> 42 #include <wtf/Threading.h>43 #include <wtf/UniqueArray.h>44 39 45 40 namespace WebCore { … … 48 43 class StorageSessionProvider; 49 44 50 class SocketStreamHandleImpl : public SocketStreamHandle {45 class SocketStreamHandleImpl : public SocketStreamHandle, public CurlStream::Client { 51 46 public: 52 47 static Ref<SocketStreamHandleImpl> create(const URL& url, SocketStreamHandleClient& client, PAL::SessionID, const String&, SourceApplicationAuditToken&&, const StorageSessionProvider* provider) { return adoptRef(*new SocketStreamHandleImpl(url, client, provider)); } … … 65 60 bool sendPendingData(); 66 61 67 void threadEntryPoint(const URL&); 68 void handleError(CURLcode); 69 void stopThread(); 62 void didOpen(CurlStreamID) final; 63 void didSendData(CurlStreamID, size_t) final; 64 void didReceiveData(CurlStreamID, const char*, size_t) final; 65 void didFail(CurlStreamID, CURLcode) final; 70 66 71 void callOnWorkerThread(Function<void()>&&); 72 void executeTasks(); 73 74 static const size_t kReadBufferSize = 4 * 1024; 67 bool isStreamInvalidated() { return m_streamID == invalidCurlStreamID; } 68 void destructStream(); 75 69 76 70 RefPtr<const StorageSessionProvider> m_storageSessionProvider; 77 RefPtr<Thread> m_workerThread;78 std::atomic<bool> m_running { true };79 80 MessageQueue<Function<void()>> m_taskQueue;81 82 bool m_hasPendingWriteData { false };83 size_t m_writeBufferSize { 0 };84 size_t m_writeBufferOffset { 0 };85 UniqueArray<uint8_t> m_writeBuffer;86 71 87 72 StreamBuffer<uint8_t, 1024 * 1024> m_buffer; 88 73 static const unsigned maxBufferSize = 100 * 1024 * 1024; 74 75 CurlStreamScheduler& m_scheduler; 76 CurlStreamID m_streamID { invalidCurlStreamID }; 77 unsigned m_totalSendDataSize { 0 }; 89 78 }; 90 79 -
trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp
r248846 r256728 2 2 * Copyright (C) 2009 Brent Fulgham. All rights reserved. 3 3 * Copyright (C) 2009 Google Inc. All rights reserved. 4 * Copyright (C) 20 18Sony Interactive Entertainment Inc.4 * Copyright (C) 2020 Sony Interactive Entertainment Inc. 5 5 * 6 6 * Redistribution and use in source and binary forms, with or without … … 36 36 #if USE(CURL) 37 37 38 #include "CurlStreamScheduler.h" 38 39 #include "DeprecatedGlobalSettings.h" 39 #include "Logging.h"40 40 #include "SocketStreamError.h" 41 41 #include "SocketStreamHandleClient.h" 42 42 #include "StorageSessionProvider.h" 43 #include <wtf/MainThread.h>44 #include <wtf/URL.h>45 #include <wtf/text/CString.h>46 43 47 44 namespace WebCore { … … 50 47 : SocketStreamHandle(url, client) 51 48 , m_storageSessionProvider(provider) 49 , m_scheduler(CurlContext::singleton().streamScheduler()) 52 50 { 53 LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);54 ASSERT(isMainThread());55 56 51 // FIXME: Using DeprecatedGlobalSettings from here is a layering violation. 57 52 if (m_url.protocolIs("wss") && DeprecatedGlobalSettings::allowsAnySSLCertificate()) 58 53 CurlContext::singleton().sslHandle().setIgnoreSSLErrors(true); 59 54 60 m_workerThread = Thread::create("WebSocket thread", [this, protectedThis = makeRef(*this), url = url.isolatedCopy()] { 61 threadEntryPoint(url); 62 }); 55 m_streamID = m_scheduler.createStream(m_url, *this); 63 56 } 64 57 65 58 SocketStreamHandleImpl::~SocketStreamHandleImpl() 66 59 { 67 LOG(Network, "SocketStreamHandle %p delete", this); 68 stopThread(); 60 destructStream(); 69 61 } 70 62 71 63 Optional<size_t> SocketStreamHandleImpl::platformSendInternal(const uint8_t* data, size_t length) 72 64 { 73 LOG(Network, "SocketStreamHandle %p platformSend", this);74 ASSERT(isMainThread());65 if (isStreamInvalidated()) 66 return WTF::nullopt; 75 67 76 if (m_ hasPendingWriteData)68 if (m_totalSendDataSize + length > maxBufferSize) 77 69 return 0; 70 m_totalSendDataSize += length; 78 71 79 m_hasPendingWriteData = true; 72 auto buffer = makeUniqueArray<uint8_t>(length); 73 memcpy(buffer.get(), data, length); 80 74 81 auto writeBuffer = makeUniqueArray<uint8_t>(length); 82 memcpy(writeBuffer.get(), data, length); 83 84 callOnWorkerThread([this, writeBuffer = WTFMove(writeBuffer), writeBufferSize = length]() mutable { 85 ASSERT(!isMainThread()); 86 m_writeBuffer = WTFMove(writeBuffer); 87 m_writeBufferSize = writeBufferSize; 88 m_writeBufferOffset = 0; 89 }); 90 75 m_scheduler.send(m_streamID, WTFMove(buffer), length); 91 76 return length; 92 77 } … … 94 79 void SocketStreamHandleImpl::platformClose() 95 80 { 96 LOG(Network, "SocketStreamHandle %p platformClose", this); 97 ASSERT(isMainThread()); 81 destructStream(); 98 82 99 83 if (m_state == Closed) … … 101 85 m_state = Closed; 102 86 103 stopThread();104 87 m_client.didCloseSocketStream(*this); 105 88 } 106 89 107 void SocketStreamHandleImpl:: threadEntryPoint(const URL& url)90 void SocketStreamHandleImpl::didOpen(CurlStreamID) 108 91 { 109 ASSERT(!isMainThread()); 92 if (m_state != Connecting) 93 return; 94 m_state = Open; 110 95 111 CurlSocketHandle socket { url, [this](CURLcode errorCode) { 112 handleError(errorCode); 113 }}; 96 m_client.didOpenSocketStream(*this); 97 } 114 98 115 // Connect to host 116 if (!socket.connect()) 99 void SocketStreamHandleImpl::didSendData(CurlStreamID, size_t length) 100 { 101 ASSERT(m_totalSendDataSize - length >= 0); 102 103 m_totalSendDataSize -= length; 104 sendPendingData(); 105 } 106 107 void SocketStreamHandleImpl::didReceiveData(CurlStreamID, const char* data, size_t length) 108 { 109 if (m_state != Open) 117 110 return; 118 111 119 callOnMainThread([this, protectedThis = makeRef(*this)] { 120 if (m_state == Connecting) { 121 m_state = Open; 122 m_client.didOpenSocketStream(*this); 123 } 124 }); 125 126 while (m_running) { 127 executeTasks(); 128 129 auto result = socket.wait(20_ms, m_writeBuffer.get()); 130 if (!result) 131 continue; 132 133 // These logic only run when there's data waiting. 134 if (result->writable && m_running) { 135 auto bytesSent = socket.send(m_writeBuffer.get() + m_writeBufferOffset, m_writeBufferSize - m_writeBufferOffset); 136 m_writeBufferOffset += bytesSent; 137 138 if (m_writeBufferSize <= m_writeBufferOffset) { 139 m_writeBuffer = nullptr; 140 m_writeBufferSize = 0; 141 m_writeBufferOffset = 0; 142 143 callOnMainThread([this, protectedThis = makeRef(*this)] { 144 m_hasPendingWriteData = false; 145 sendPendingData(); 146 }); 147 } 148 } 149 150 if (result->readable && m_running) { 151 auto readBuffer = makeUniqueArray<uint8_t>(kReadBufferSize); 152 auto bytesRead = socket.receive(readBuffer.get(), kReadBufferSize); 153 // `nullopt` result means nothing to handle at this moment. 154 if (!bytesRead) 155 continue; 156 157 // 0 bytes indicates a closed connection. 158 if (!*bytesRead) { 159 m_running = false; 160 callOnMainThread([this, protectedThis = makeRef(*this)] { 161 close(); 162 }); 163 break; 164 } 165 166 callOnMainThread([this, protectedThis = makeRef(*this), buffer = WTFMove(readBuffer), size = *bytesRead ] { 167 if (m_state == Open) 168 m_client.didReceiveSocketStreamData(*this, reinterpret_cast<const char*>(buffer.get()), size); 169 }); 170 } 171 } 172 173 m_writeBuffer = nullptr; 112 m_client.didReceiveSocketStreamData(*this, data, length); 174 113 } 175 114 176 void SocketStreamHandleImpl:: handleError(CURLcode errorCode)115 void SocketStreamHandleImpl::didFail(CurlStreamID, CURLcode errorCode) 177 116 { 178 m_running = false; 179 callOnMainThread([this, protectedThis = makeRef(*this), errorCode, localizedDescription = CurlHandle::errorDescription(errorCode).isolatedCopy()] { 180 if (m_state == Closed) 181 return; 117 destructStream(); 182 118 183 if (errorCode == CURLE_RECV_ERROR) 184 m_client.didFailToReceiveSocketStreamData(*this); 185 else 186 m_client.didFailSocketStream(*this, SocketStreamError(static_cast<int>(errorCode), { }, localizedDescription)); 187 }); 119 if (m_state == Closed) 120 return; 121 122 if (errorCode == CURLE_RECV_ERROR) 123 m_client.didFailToReceiveSocketStreamData(*this); 124 else 125 m_client.didFailSocketStream(*this, SocketStreamError(errorCode, m_url, CurlHandle::errorDescription(errorCode))); 188 126 } 189 127 190 void SocketStreamHandleImpl:: stopThread()128 void SocketStreamHandleImpl::destructStream() 191 129 { 192 ASSERT(isMainThread()); 130 if (isStreamInvalidated()) 131 return; 193 132 194 m_running = false; 195 196 if (m_workerThread) { 197 m_workerThread->waitForCompletion(); 198 m_workerThread = nullptr; 199 } 200 } 201 202 void SocketStreamHandleImpl::callOnWorkerThread(Function<void()>&& task) 203 { 204 ASSERT(isMainThread()); 205 m_taskQueue.append(makeUnique<Function<void()>>(WTFMove(task))); 206 } 207 208 void SocketStreamHandleImpl::executeTasks() 209 { 210 ASSERT(!isMainThread()); 211 212 auto tasks = m_taskQueue.takeAllMessages(); 213 for (auto& task : tasks) 214 (*task)(); 133 m_scheduler.destroyStream(m_streamID); 134 m_streamID = invalidCurlStreamID; 215 135 } 216 136
Note: See TracChangeset
for help on using the changeset viewer.