Changeset 202370 in webkit


Ignore:
Timestamp:
Jun 23, 2016 12:23:19 AM (8 years ago)
Author:
Carlos Garcia Campos
Message:

[Soup] Clean up SocketStreamHandle soup implementation
https://bugs.webkit.org/show_bug.cgi?id=159024

Reviewed by Žan Doberšek.

Stop using a global HashMap to "acivate"/"deactivate" handles, and just take a reference of the handle and
pass the ownership to the callbacks, using a GCancellable to cancel all async operations.

  • platform/network/soup/SocketStreamHandle.h:

(WebCore::SocketStreamHandle::create):
(WebCore::SocketStreamHandle::id): Deleted.

  • platform/network/soup/SocketStreamHandleSoup.cpp:

(WebCore::SocketStreamHandle::SocketStreamHandle):
(WebCore::SocketStreamHandle::connected):
(WebCore::SocketStreamHandle::connectedCallback):
(WebCore::SocketStreamHandle::readBytes):
(WebCore::SocketStreamHandle::readReadyCallback):
(WebCore::SocketStreamHandle::didFail):
(WebCore::SocketStreamHandle::platformSend):
(WebCore::SocketStreamHandle::platformClose):
(WebCore::SocketStreamHandle::beginWaitingForSocketWritability):
(WebCore::SocketStreamHandle::writeReadyCallback):
(WebCore::getHandleFromId): Deleted.
(WebCore::deactivateHandle): Deleted.
(WebCore::activateHandle): Deleted.
(WebCore::SocketStreamHandle::~SocketStreamHandle): Deleted.
(WebCore::connectedCallback): Deleted.
(WebCore::readReadyCallback): Deleted.
(WebCore::writeReadyCallback): Deleted.

Location:
trunk/Source/WebCore
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/Source/WebCore/ChangeLog

    r202359 r202370  
     12016-06-23  Carlos Garcia Campos  <cgarcia@igalia.com>
     2
     3        [Soup] Clean up SocketStreamHandle soup implementation
     4        https://bugs.webkit.org/show_bug.cgi?id=159024
     5
     6        Reviewed by Žan Doberšek.
     7
     8        Stop using a global HashMap to "acivate"/"deactivate" handles, and just take a reference of the handle and
     9        pass the ownership to the callbacks, using a GCancellable to cancel all async operations.
     10
     11        * platform/network/soup/SocketStreamHandle.h:
     12        (WebCore::SocketStreamHandle::create):
     13        (WebCore::SocketStreamHandle::id): Deleted.
     14        * platform/network/soup/SocketStreamHandleSoup.cpp:
     15        (WebCore::SocketStreamHandle::SocketStreamHandle):
     16        (WebCore::SocketStreamHandle::connected):
     17        (WebCore::SocketStreamHandle::connectedCallback):
     18        (WebCore::SocketStreamHandle::readBytes):
     19        (WebCore::SocketStreamHandle::readReadyCallback):
     20        (WebCore::SocketStreamHandle::didFail):
     21        (WebCore::SocketStreamHandle::platformSend):
     22        (WebCore::SocketStreamHandle::platformClose):
     23        (WebCore::SocketStreamHandle::beginWaitingForSocketWritability):
     24        (WebCore::SocketStreamHandle::writeReadyCallback):
     25        (WebCore::getHandleFromId): Deleted.
     26        (WebCore::deactivateHandle): Deleted.
     27        (WebCore::activateHandle): Deleted.
     28        (WebCore::SocketStreamHandle::~SocketStreamHandle): Deleted.
     29        (WebCore::connectedCallback): Deleted.
     30        (WebCore::readReadyCallback): Deleted.
     31        (WebCore::writeReadyCallback): Deleted.
     32
    1332016-06-22  Brady Eidson  <beidson@apple.com>
    234
  • trunk/Source/WebCore/platform/network/soup/SocketStreamHandle.h

    r201880 r202370  
    3838#if USE(SOUP)
    3939
    40 #include <wtf/PassRefPtr.h>
    4140#include <wtf/RefCounted.h>
    4241#include <wtf/glib/GRefPtr.h>
     
    4443namespace WebCore {
    4544
    46     class NetworkingContext;
    47     class SocketStreamHandleClient;
     45class NetworkingContext;
     46class SocketStreamError;
     47class SocketStreamHandleClient;
    4848
    49     class SocketStreamHandle : public RefCounted<SocketStreamHandle>, public SocketStreamHandleBase {
    50     public:
    51         static Ref<SocketStreamHandle> create(const URL& url, SocketStreamHandleClient* client, NetworkingContext&, bool) { return adoptRef(*new SocketStreamHandle(url, client)); }
    52         static Ref<SocketStreamHandle> create(GSocketConnection* socketConnection, SocketStreamHandleClient* client) { return adoptRef(*new SocketStreamHandle(socketConnection, client)); }
     49class SocketStreamHandle final : public RefCounted<SocketStreamHandle>, public SocketStreamHandleBase {
     50public:
     51    static Ref<SocketStreamHandle> create(const URL& url, SocketStreamHandleClient* client, NetworkingContext&, bool) { return adoptRef(*new SocketStreamHandle(url, client)); }
     52    static Ref<SocketStreamHandle> create(GSocketConnection* socketConnection, SocketStreamHandleClient* client) { return adoptRef(*new SocketStreamHandle(socketConnection, client)); }
    5353
    54         virtual ~SocketStreamHandle();
    55         void connected(GSocketConnection*, GError*);
    56         void readBytes(signed long, GError*);
    57         void writeReady();
    58         void* id() { return m_id; }
     54    virtual ~SocketStreamHandle();
    5955
    60     protected:
    61         virtual int platformSend(const char* data, int length);
    62         virtual void platformClose();
     56private:
     57    SocketStreamHandle(const URL&, SocketStreamHandleClient*);
     58    SocketStreamHandle(GSocketConnection*, SocketStreamHandleClient*);
    6359
    64     private:
    65         GRefPtr<GSocketConnection> m_socketConnection;
    66         GRefPtr<GInputStream> m_inputStream;
    67         GRefPtr<GPollableOutputStream> m_outputStream;
    68         GRefPtr<GSource> m_writeReadySource;
    69         std::unique_ptr<char[]> m_readBuffer;
    70         void* m_id;
     60    int platformSend(const char* data, int length) override;
     61    void platformClose() override;
    7162
    72         SocketStreamHandle(const URL&, SocketStreamHandleClient*);
    73         SocketStreamHandle(GSocketConnection*, SocketStreamHandleClient*);
     63    void beginWaitingForSocketWritability();
     64    void stopWaitingForSocketWritability();
    7465
    75         void beginWaitingForSocketWritability();
    76         void stopWaitingForSocketWritability();
    77     };
     66    static void connectedCallback(GSocketClient*, GAsyncResult*, SocketStreamHandle*);
     67    static void readReadyCallback(GInputStream*, GAsyncResult*, SocketStreamHandle*);
     68    static gboolean writeReadyCallback(GPollableOutputStream*, SocketStreamHandle*);
     69
     70    void connected(GRefPtr<GSocketConnection>&&);
     71    void readBytes(gssize);
     72    void didFail(SocketStreamError&&);
     73    void writeReady();
     74
     75    GRefPtr<GSocketConnection> m_socketConnection;
     76    GRefPtr<GInputStream> m_inputStream;
     77    GRefPtr<GPollableOutputStream> m_outputStream;
     78    GRefPtr<GSource> m_writeReadySource;
     79    GRefPtr<GCancellable> m_cancellable;
     80    std::unique_ptr<char[]> m_readBuffer;
     81};
    7882
    7983}  // namespace WebCore
  • trunk/Source/WebCore/platform/network/soup/SocketStreamHandleSoup.cpp

    r201880 r202370  
    3939#include "SocketStreamError.h"
    4040#include "SocketStreamHandleClient.h"
    41 
    4241#include <gio/gio.h>
    4342#include <glib.h>
    44 
    4543#include <wtf/Vector.h>
    4644#include <wtf/glib/GUniquePtr.h>
     
    5149namespace WebCore {
    5250
    53 // These functions immediately call the similarly named SocketStreamHandle methods.
    54 static void connectedCallback(GSocketClient*, GAsyncResult*, void*);
    55 static void readReadyCallback(GInputStream*, GAsyncResult*, void*);
    56 static gboolean writeReadyCallback(GPollableOutputStream*, void*);
    57 
    58 // Having a list of active handles means that we do not have to worry about WebCore
    59 // reference counting in GLib callbacks. Once the handle is off the active handles list
    60 // we just ignore it in the callback. We avoid a lot of extra checks and tricky
    61 // situations this way.
    62 static HashMap<void*, SocketStreamHandle*> gActiveHandles;
    63 COMPILE_ASSERT(HashTraits<SocketStreamHandle*>::emptyValueIsZero, emptyMapValue_is_0);
    64 
    65 static SocketStreamHandle* getHandleFromId(void* id)
    66 {
    67     return gActiveHandles.get(id);
    68 }
    69 
    70 static void deactivateHandle(SocketStreamHandle& handle)
    71 {
    72     gActiveHandles.remove(handle.id());
    73 }
    74 
    75 static void* activateHandle(SocketStreamHandle& handle)
    76 {
    77     // The first id cannot be 0, because it conflicts with the HashMap emptyValue.
    78     static gint currentHandleId = 1;
    79     void* id = GINT_TO_POINTER(currentHandleId++);
    80     gActiveHandles.set(id, &handle);
    81     return id;
    82 }
    83 
    8451SocketStreamHandle::SocketStreamHandle(const URL& url, SocketStreamHandleClient* client)
    8552    : SocketStreamHandleBase(url, client)
     53    , m_cancellable(adoptGRef(g_cancellable_new()))
    8654{
    8755    LOG(Network, "SocketStreamHandle %p new client %p", this, m_client);
    88     unsigned int port = url.hasPort() ? url.port() : (url.protocolIs("wss") ? 443 : 80);
    89 
    90     m_id = activateHandle(*this);
     56    unsigned port = url.hasPort() ? url.port() : (url.protocolIs("wss") ? 443 : 80);
     57
    9158    GRefPtr<GSocketClient> socketClient = adoptGRef(g_socket_client_new());
    9259    if (url.protocolIs("wss"))
    9360        g_socket_client_set_tls(socketClient.get(), TRUE);
    94     g_socket_client_connect_to_host_async(socketClient.get(), url.host().utf8().data(), port, 0,
    95         reinterpret_cast<GAsyncReadyCallback>(connectedCallback), m_id);
     61    RefPtr<SocketStreamHandle> protectedThis(this);
     62    g_socket_client_connect_to_host_async(socketClient.get(), url.host().utf8().data(), port, m_cancellable.get(),
     63        reinterpret_cast<GAsyncReadyCallback>(connectedCallback), protectedThis.leakRef());
    9664}
    9765
    9866SocketStreamHandle::SocketStreamHandle(GSocketConnection* socketConnection, SocketStreamHandleClient* client)
    9967    : SocketStreamHandleBase(URL(), client)
     68    , m_cancellable(adoptGRef(g_cancellable_new()))
    10069{
    10170    LOG(Network, "SocketStreamHandle %p new client %p", this, m_client);
    102     m_id = activateHandle(*this);
    103     connected(socketConnection, 0);
     71    GRefPtr<GSocketConnection> connection = socketConnection;
     72    connected(WTFMove(connection));
    10473}
    10574
     
    10776{
    10877    LOG(Network, "SocketStreamHandle %p delete", this);
    109     // If for some reason we were destroyed without closing, ensure that we are deactivated.
    110     deactivateHandle(*this);
    11178    setClient(nullptr);
    11279}
    11380
    114 void SocketStreamHandle::connected(GSocketConnection* socketConnection, GError* error)
    115 {
    116     if (error) {
    117         m_client->didFailSocketStream(*this, SocketStreamError(error->code, error->message));
    118         return;
    119     }
    120 
    121     m_socketConnection = socketConnection;
     81void SocketStreamHandle::connected(GRefPtr<GSocketConnection>&& socketConnection)
     82{
     83    m_socketConnection = WTFMove(socketConnection);
    12284    m_outputStream = G_POLLABLE_OUTPUT_STREAM(g_io_stream_get_output_stream(G_IO_STREAM(m_socketConnection.get())));
    12385    m_inputStream = g_io_stream_get_input_stream(G_IO_STREAM(m_socketConnection.get()));
    124 
    12586    m_readBuffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
    126     g_input_stream_read_async(m_inputStream.get(), m_readBuffer.get(), READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, 0,
    127         reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), m_id);
     87
     88    RefPtr<SocketStreamHandle> protectedThis(this);
     89    g_input_stream_read_async(m_inputStream.get(), m_readBuffer.get(), READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, m_cancellable.get(),
     90        reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), protectedThis.leakRef());
    12891
    12992    m_state = Open;
     
    13194}
    13295
    133 void SocketStreamHandle::readBytes(signed long bytesRead, GError* error)
    134 {
    135     if (error) {
    136         m_client->didFailSocketStream(*this, SocketStreamError(error->code, error->message));
    137         return;
    138     }
    139 
     96void SocketStreamHandle::connectedCallback(GSocketClient* client, GAsyncResult* result, SocketStreamHandle* handle)
     97{
     98    RefPtr<SocketStreamHandle> protectedThis = adoptRef(handle);
     99
     100    // Always finish the connection, even if this SocketStreamHandle was cancelled earlier.
     101    GUniqueOutPtr<GError> error;
     102    GRefPtr<GSocketConnection> socketConnection = adoptGRef(g_socket_client_connect_to_host_finish(client, result, &error.outPtr()));
     103
     104    // The SocketStreamHandle has been cancelled, so just close the connection, ignoring errors.
     105    if (g_cancellable_is_cancelled(handle->m_cancellable.get())) {
     106        if (socketConnection)
     107            g_io_stream_close(G_IO_STREAM(socketConnection.get()), nullptr, nullptr);
     108        return;
     109    }
     110
     111    if (error)
     112        handle->didFail(SocketStreamError(error->code, error->message));
     113    else
     114        handle->connected(WTFMove(socketConnection));
     115}
     116
     117void SocketStreamHandle::readBytes(gssize bytesRead)
     118{
    140119    if (!bytesRead) {
    141120        close();
     
    144123
    145124    // The client can close the handle, potentially removing the last reference.
    146     Ref<SocketStreamHandle> protectedThis(*this);
     125    RefPtr<SocketStreamHandle> protectedThis(this);
    147126    m_client->didReceiveSocketStreamData(*this, m_readBuffer.get(), bytesRead);
    148     if (m_inputStream) // The client may have closed the connection.
    149         g_input_stream_read_async(m_inputStream.get(), m_readBuffer.get(), READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, 0,
    150             reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), m_id);
     127    if (m_inputStream) {
     128        g_input_stream_read_async(m_inputStream.get(), m_readBuffer.get(), READ_BUFFER_SIZE, G_PRIORITY_DEFAULT, m_cancellable.get(),
     129            reinterpret_cast<GAsyncReadyCallback>(readReadyCallback), protectedThis.leakRef());
     130    }
     131}
     132
     133void SocketStreamHandle::readReadyCallback(GInputStream* stream, GAsyncResult* result, SocketStreamHandle* handle)
     134{
     135    RefPtr<SocketStreamHandle> protectedThis = adoptRef(handle);
     136
     137    // Always finish the read, even if this SocketStreamHandle was cancelled earlier.
     138    GUniqueOutPtr<GError> error;
     139    gssize bytesRead = g_input_stream_read_finish(stream, result, &error.outPtr());
     140
     141    if (g_cancellable_is_cancelled(handle->m_cancellable.get()))
     142        return;
     143
     144    if (error)
     145        handle->didFail(SocketStreamError(error->code, error->message));
     146    else
     147        handle->readBytes(bytesRead);
     148}
     149
     150void SocketStreamHandle::didFail(SocketStreamError&& error)
     151{
     152    m_client->didFailSocketStream(*this, WTFMove(error));
    151153}
    152154
     
    169171
    170172    GUniqueOutPtr<GError> error;
    171     gssize written = g_pollable_output_stream_write_nonblocking(m_outputStream.get(), data, length, 0, &error.outPtr());
     173    gssize written = g_pollable_output_stream_write_nonblocking(m_outputStream.get(), data, length, m_cancellable.get(), &error.outPtr());
    172174    if (error) {
    173175        if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
    174176            beginWaitingForSocketWritability();
    175177        else
    176             m_client->didFailSocketStream(*this, SocketStreamError(error->code, error->message));
     178            didFail(SocketStreamError(error->code, error->message));
    177179        return 0;
    178180    }
     
    189191{
    190192    LOG(Network, "SocketStreamHandle %p platformClose", this);
    191     // We remove this handle from the active handles list first, to disable all callbacks.
    192     deactivateHandle(*this);
     193    // We cancel this handle first to disable all callbacks.
     194    g_cancellable_cancel(m_cancellable.get());
    193195    stopWaitingForSocketWritability();
    194196
    195197    if (m_socketConnection) {
    196198        GUniqueOutPtr<GError> error;
    197         g_io_stream_close(G_IO_STREAM(m_socketConnection.get()), 0, &error.outPtr());
     199        g_io_stream_close(G_IO_STREAM(m_socketConnection.get()), nullptr, &error.outPtr());
    198200        if (error)
    199             m_client->didFailSocketStream(*this, SocketStreamError(error->code, error->message));
    200         m_socketConnection = 0;
    201     }
    202 
    203     m_outputStream = 0;
    204     m_inputStream = 0;
     201            didFail(SocketStreamError(error->code, error->message));
     202        m_socketConnection = nullptr;
     203    }
     204
     205    m_outputStream = nullptr;
     206    m_inputStream = nullptr;
    205207    m_readBuffer = nullptr;
    206208
     
    213215        return;
    214216
    215     m_writeReadySource = adoptGRef(g_pollable_output_stream_create_source(m_outputStream.get(), 0));
    216     g_source_set_callback(m_writeReadySource.get(), reinterpret_cast<GSourceFunc>(writeReadyCallback), m_id, 0);
     217    m_writeReadySource = adoptGRef(g_pollable_output_stream_create_source(m_outputStream.get(), m_cancellable.get()));
     218    ref();
     219    g_source_set_callback(m_writeReadySource.get(), reinterpret_cast<GSourceFunc>(writeReadyCallback), this,
     220        [](gpointer handle) { static_cast<SocketStreamHandle*>(handle)->deref(); });
    217221    g_source_attach(m_writeReadySource.get(), g_main_context_get_thread_default());
    218222}
     
    227231}
    228232
    229 static void connectedCallback(GSocketClient* client, GAsyncResult* result, void* id)
    230 {
    231     // Always finish the connection, even if this SocketStreamHandle was deactivated earlier.
    232     GUniqueOutPtr<GError> error;
    233     GSocketConnection* socketConnection = g_socket_client_connect_to_host_finish(client, result, &error.outPtr());
    234 
    235     // The SocketStreamHandle has been deactivated, so just close the connection, ignoring errors.
    236     SocketStreamHandle* handle = getHandleFromId(id);
    237     if (!handle) {
    238         if (socketConnection)
    239             g_io_stream_close(G_IO_STREAM(socketConnection), 0, 0);
    240         return;
    241     }
    242 
    243     handle->connected(socketConnection, error.get());
    244 }
    245 
    246 static void readReadyCallback(GInputStream* stream, GAsyncResult* result, void* id)
    247 {
    248     // Always finish the read, even if this SocketStreamHandle was deactivated earlier.
    249     GUniqueOutPtr<GError> error;
    250     gssize bytesRead = g_input_stream_read_finish(stream, result, &error.outPtr());
    251 
    252     SocketStreamHandle* handle = getHandleFromId(id);
    253     if (!handle)
    254         return;
    255 
    256     handle->readBytes(bytesRead, error.get());
    257 }
    258 
    259 static gboolean writeReadyCallback(GPollableOutputStream*, void* id)
    260 {
    261     SocketStreamHandle* handle = getHandleFromId(id);
    262     if (!handle)
    263         return FALSE;
     233gboolean SocketStreamHandle::writeReadyCallback(GPollableOutputStream*, SocketStreamHandle* handle)
     234{
     235    if (g_cancellable_is_cancelled(handle->m_cancellable.get()))
     236        return G_SOURCE_REMOVE;
    264237
    265238    handle->writeReady();
    266     return TRUE;
     239    return G_SOURCE_CONTINUE;
    267240}
    268241
Note: See TracChangeset for help on using the changeset viewer.