Changeset 179690 in webkit


Ignore:
Timestamp:
Feb 5, 2015 6:31:07 AM (9 years ago)
Author:
Antti Koivisto
Message:

Avoid copying std::functions across threads in NetworkCacheStorage
https://bugs.webkit.org/show_bug.cgi?id=141273

Reviewed by Andreas Kling.

The current approach is risky. There is possiblity that captured variables are
deleted in an unexpected thread.

  • NetworkProcess/cache/NetworkCache.cpp:

(WebKit::NetworkCache::retrieve):

The capture trick here is no longer needed.

  • NetworkProcess/cache/NetworkCacheStorage.h:

For each cache operation we create Retrive/Store/UpdateOperation object kept alive by the active operation map.
This object captures all parameters of the operation including the lambda. When the operation completes
the object is removed from the map in the main thread, ensuring safe destruction.

  • NetworkProcess/cache/NetworkCacheStorageCocoa.mm:

(WebKit::NetworkCacheStorage::dispatchRetrieveOperation):
(WebKit::NetworkCacheStorage::dispatchPendingRetrieveOperations):
(WebKit::retrieveActive):

Instead of maintaining a separate write cache we just look through the active write and update maps.

(WebKit::NetworkCacheStorage::retrieve):

Use fixed sized priority array rather than a dynamic one. Vector<Deque<std::unique_ptr>> doesn't quite work.

(WebKit::NetworkCacheStorage::store):
(WebKit::NetworkCacheStorage::update):
(WebKit::NetworkCacheStorage::clear):

Location:
trunk/Source/WebKit2
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/Source/WebKit2/ChangeLog

    r179687 r179690  
     12015-02-04  Antti Koivisto  <antti@apple.com>
     2
     3        Avoid copying std::functions across threads in NetworkCacheStorage
     4        https://bugs.webkit.org/show_bug.cgi?id=141273
     5
     6        Reviewed by Andreas Kling.
     7
     8        The current approach is risky. There is possiblity that captured variables are
     9        deleted in an unexpected thread.
     10
     11        * NetworkProcess/cache/NetworkCache.cpp:
     12        (WebKit::NetworkCache::retrieve):
     13
     14            The capture trick here is no longer needed.
     15
     16        * NetworkProcess/cache/NetworkCacheStorage.h:
     17
     18            For each cache operation we create Retrive/Store/UpdateOperation object kept alive by the active operation map.
     19            This object captures all parameters of the operation including the lambda. When the operation completes
     20            the object is removed from the map in the main thread, ensuring safe destruction.
     21
     22        * NetworkProcess/cache/NetworkCacheStorageCocoa.mm:
     23        (WebKit::NetworkCacheStorage::dispatchRetrieveOperation):
     24        (WebKit::NetworkCacheStorage::dispatchPendingRetrieveOperations):
     25        (WebKit::retrieveActive):
     26
     27            Instead of maintaining a separate write cache we just look through the active write and update maps.
     28
     29        (WebKit::NetworkCacheStorage::retrieve):
     30
     31            Use fixed sized priority array rather than a dynamic one. Vector<Deque<std::unique_ptr>> doesn't quite work.
     32
     33        (WebKit::NetworkCacheStorage::store):
     34        (WebKit::NetworkCacheStorage::update):
     35        (WebKit::NetworkCacheStorage::clear):
     36
    1372015-02-05  Youenn Fablet  <youenn.fablet@crf.canon.fr> and Xabier Rodriguez Calvar <calvaris@igalia.com>
    238
  • trunk/Source/WebKit2/NetworkProcess/cache/NetworkCache.cpp

    r179584 r179690  
    227227
    228228    auto startTime = std::chrono::system_clock::now();
    229 
    230229    NetworkCacheKey storageKey = makeCacheKey(originalRequest);
    231230    unsigned priority = originalRequest.priority();
    232231
    233     // Captured data is going to be shuffled around threads. Avoid unsafe copying.
    234     struct Capture {
    235         WebCore::ResourceRequest originalRequest;
    236         std::function<void (std::unique_ptr<Entry>)> completionHandler;
    237     };
    238     // FIXME: With C++14 this could use unique_ptr and initialized lambda capture
    239     auto capture = std::make_shared<Capture>(Capture { originalRequest, completionHandler });
    240 
    241     m_storage->retrieve(storageKey, priority, [this, capture, startTime](std::unique_ptr<NetworkCacheStorage::Entry> entry) {
     232    m_storage->retrieve(storageKey, priority, [this, originalRequest, completionHandler, startTime](std::unique_ptr<NetworkCacheStorage::Entry> entry) {
    242233        if (!entry) {
    243234            LOG(NetworkCache, "(NetworkProcess) not found in storage");
    244             capture->completionHandler(nullptr);
     235            completionHandler(nullptr);
    245236            return false;
    246237        }
    247         auto decodedEntry = decodeStorageEntry(*entry, capture->originalRequest);
     238        auto decodedEntry = decodeStorageEntry(*entry, originalRequest);
    248239        bool success = !!decodedEntry;
    249240#if !LOG_DISABLED
    250241        auto elapsedMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count();
    251242#endif
    252         LOG(NetworkCache, "(NetworkProcess) retrieve complete success=%d priority=%u time=%lldms", success, capture->originalRequest.priority(), elapsedMS);
    253         capture->completionHandler(WTF::move(decodedEntry));
     243        LOG(NetworkCache, "(NetworkProcess) retrieve complete success=%d priority=%u time=%lldms", success, originalRequest.priority(), elapsedMS);
     244        completionHandler(WTF::move(decodedEntry));
    254245        return success;
    255246    });
  • trunk/Source/WebKit2/NetworkProcess/cache/NetworkCacheStorage.h

    r179547 r179690  
    3333#include <wtf/BloomFilter.h>
    3434#include <wtf/Deque.h>
     35#include <wtf/HashSet.h>
    3536#include <wtf/text/WTFString.h>
    3637
     
    158159        std::function<bool (std::unique_ptr<Entry>)> completionHandler;
    159160    };
    160     void dispatchRetrieveOperation(const RetrieveOperation&);
     161    void dispatchRetrieveOperation(std::unique_ptr<const RetrieveOperation>);
    161162    void dispatchPendingRetrieveOperations();
     163
     164    struct StoreOperation {
     165        NetworkCacheKey key;
     166        Entry entry;
     167        std::function<void (bool success)> completionHandler;
     168    };
     169
     170    struct UpdateOperation {
     171        NetworkCacheKey key;
     172        Entry entry;
     173        Entry existingEntry;
     174        std::function<void (bool success)> completionHandler;
     175    };
    162176
    163177    const String m_directoryPath;
     
    169183    std::atomic<bool> m_shrinkInProgress { false };
    170184
    171     Vector<Deque<RetrieveOperation>> m_pendingRetrieveOperationsByPriority;
    172     unsigned m_activeRetrieveOperationCount { 0 };
    173 
    174     typedef std::pair<NetworkCacheKey, Entry> KeyEntryPair;
    175     HashMap<NetworkCacheKey::HashType, std::shared_ptr<KeyEntryPair>, AlreadyHashed> m_writeCache;
     185    static const int maximumRetrievePriority = 4;
     186    Deque<std::unique_ptr<const RetrieveOperation>> m_pendingRetrieveOperationsByPriority[maximumRetrievePriority + 1];
     187
     188    HashSet<std::unique_ptr<const RetrieveOperation>> m_activeRetrieveOperations;
     189    HashSet<std::unique_ptr<const StoreOperation>> m_activeStoreOperations;
     190    HashSet<std::unique_ptr<const UpdateOperation>> m_activeUpdateOperations;
    176191
    177192#if PLATFORM(COCOA)
  • trunk/Source/WebKit2/NetworkProcess/cache/NetworkCacheStorageCocoa.mm

    r179547 r179690  
    353353}
    354354
    355 void NetworkCacheStorage::dispatchRetrieveOperation(const RetrieveOperation& retrieve)
    356 {
    357     ASSERT(RunLoop::isMain());
    358 
    359     ++m_activeRetrieveOperationCount;
     355void NetworkCacheStorage::dispatchRetrieveOperation(std::unique_ptr<const RetrieveOperation> retrieveOperation)
     356{
     357    ASSERT(RunLoop::isMain());
     358
     359    auto& retrieve = *retrieveOperation;
     360    m_activeRetrieveOperations.add(WTF::move(retrieveOperation));
    360361
    361362    StringCapture cachePathCapture(m_directoryPath);
    362     dispatch_async(m_ioQueue.get(), [this, retrieve, cachePathCapture] {
     363    dispatch_async(m_ioQueue.get(), [this, &retrieve, cachePathCapture] {
    363364        int fd;
    364365        auto channel = openFileForKey(retrieve.key, FileOpenType::Read, cachePathCapture.string(), fd);
    365366
    366367        bool didCallCompletionHandler = false;
    367         dispatch_io_read(channel.get(), 0, std::numeric_limits<size_t>::max(), dispatch_get_main_queue(), [this, fd, retrieve, didCallCompletionHandler](bool done, dispatch_data_t fileData, int error) mutable {
     368        dispatch_io_read(channel.get(), 0, std::numeric_limits<size_t>::max(), dispatch_get_main_queue(), [this, fd, &retrieve, didCallCompletionHandler](bool done, dispatch_data_t fileData, int error) mutable {
    368369            if (done) {
    369                 ASSERT(m_activeRetrieveOperationCount);
    370                 --m_activeRetrieveOperationCount;
    371                 dispatchPendingRetrieveOperations();
    372             }
    373             if (done) {
     370                if (error)
     371                    removeEntry(retrieve.key);
     372
    374373                if (!didCallCompletionHandler)
    375374                    retrieve.completionHandler(nullptr);
    376                 if (error)
    377                     removeEntry(retrieve.key);
     375
     376                ASSERT(m_activeRetrieveOperations.contains(&retrieve));
     377                m_activeRetrieveOperations.remove(&retrieve);
     378                dispatchPendingRetrieveOperations();
    378379                return;
    379380            }
     
    393394    ASSERT(RunLoop::isMain());
    394395
    395     const unsigned maximumActiveRetrieveOperationCount = 5;
    396 
    397     for (int priority = m_pendingRetrieveOperationsByPriority.size() - 1; priority >= 0; --priority) {
    398         if (m_activeRetrieveOperationCount > maximumActiveRetrieveOperationCount) {
     396    const int maximumActiveRetrieveOperationCount = 5;
     397
     398    for (int priority = maximumRetrievePriority; priority >= 0; --priority) {
     399        if (m_activeRetrieveOperations.size() > maximumActiveRetrieveOperationCount) {
    399400            LOG(NetworkCacheStorage, "(NetworkProcess) limiting parallel retrieves");
    400401            return;
     
    407408}
    408409
     410template <class T> bool retrieveActive(const T& operations, const NetworkCacheKey& key, std::function<bool (std::unique_ptr<NetworkCacheStorage::Entry>)>& completionHandler)
     411{
     412    for (auto& operation : operations) {
     413        if (operation->key == key) {
     414            LOG(NetworkCacheStorage, "(NetworkProcess) found store operation in progress");
     415            auto entry = operation->entry;
     416            dispatch_async(dispatch_get_main_queue(), [entry, completionHandler] {
     417                completionHandler(std::make_unique<NetworkCacheStorage::Entry>(entry));
     418            });
     419            return true;
     420        }
     421    }
     422    return false;
     423}
     424
    409425void NetworkCacheStorage::retrieve(const NetworkCacheKey& key, unsigned priority, std::function<bool (std::unique_ptr<Entry>)> completionHandler)
    410426{
    411427    ASSERT(RunLoop::isMain());
     428    ASSERT(priority <= maximumRetrievePriority);
    412429
    413430    if (!m_contentsFilter.mayContain(key.hash())) {
     
    415432        return;
    416433    }
    417 
    418     // Write cache is a temporary memory cache used to respond to requests while a write is pending.
    419     if (auto keyEntryPair = m_writeCache.get(key.hash())) {
    420         if (keyEntryPair->first == key) {
    421             LOG(NetworkCacheStorage, "(NetworkProcess) found from the write cache");
    422             dispatch_async(dispatch_get_main_queue(), [keyEntryPair, completionHandler] {
    423                 completionHandler(std::make_unique<Entry>(keyEntryPair->second));
    424             });
    425             return;
    426         }
    427     }
    428 
    429     if (m_pendingRetrieveOperationsByPriority.size() < priority + 1)
    430         m_pendingRetrieveOperationsByPriority.grow(priority + 1);
    431     m_pendingRetrieveOperationsByPriority[priority].append(RetrieveOperation { key, completionHandler });
    432 
     434    // See if we have the resource in memory.
     435    if (retrieveActive(m_activeStoreOperations, key, completionHandler))
     436        return;
     437    if (retrieveActive(m_activeUpdateOperations, key, completionHandler))
     438        return;
     439
     440    // Fetch from disk.
     441    m_pendingRetrieveOperationsByPriority[priority].append(std::make_unique<RetrieveOperation>(RetrieveOperation { key, completionHandler }));
    433442    dispatchPendingRetrieveOperations();
    434443}
     
    441450    ++m_approximateEntryCount;
    442451
    443     m_writeCache.set(key.hash(), std::make_shared<KeyEntryPair>(key, entry));
     452    auto storeOperation = std::make_unique<StoreOperation>(StoreOperation { key, entry, completionHandler });
     453    auto& store = *storeOperation;
     454    m_activeStoreOperations.add(WTF::move(storeOperation));
    444455
    445456    StringCapture cachePathCapture(m_directoryPath);
    446     dispatch_async(m_backgroundIOQueue.get(), [this, key, entry, cachePathCapture, completionHandler] {
    447         auto data = encodeEntry(key, entry);
     457    dispatch_async(m_backgroundIOQueue.get(), [this, &store, cachePathCapture] {
     458        auto data = encodeEntry(store.key, store.entry);
    448459
    449460        int fd;
    450         auto channel = openFileForKey(key, FileOpenType::Create, cachePathCapture.string(), fd);
    451         dispatch_io_write(channel.get(), 0, data.get(), dispatch_get_main_queue(), [this, key, completionHandler](bool done, dispatch_data_t, int error) {
     461        auto channel = openFileForKey(store.key, FileOpenType::Create, cachePathCapture.string(), fd);
     462        dispatch_io_write(channel.get(), 0, data.get(), dispatch_get_main_queue(), [this, &store](bool done, dispatch_data_t, int error) {
    452463            ASSERT_UNUSED(done, done);
    453464            LOG(NetworkCacheStorage, "(NetworkProcess) write complete error=%d", error);
    454465            if (error) {
    455                 if (m_contentsFilter.mayContain(key.hash()))
    456                     m_contentsFilter.remove(key.hash());
     466                if (m_contentsFilter.mayContain(store.key.hash()))
     467                    m_contentsFilter.remove(store.key.hash());
    457468                if (m_approximateEntryCount)
    458469                    --m_approximateEntryCount;
    459470            }
    460             m_writeCache.remove(key.hash());
    461 
    462             completionHandler(!error);
     471
     472            store.completionHandler(!error);
     473
     474            m_activeStoreOperations.remove(&store);
    463475        });
    464476    });
     
    477489    }
    478490
    479     m_writeCache.set(key.hash(), std::make_shared<KeyEntryPair>(key, updateEntry));
     491    auto updateOperation = std::make_unique<UpdateOperation>(UpdateOperation { key, updateEntry, existingEntry, completionHandler });
     492    auto& update = *updateOperation;
     493    m_activeUpdateOperations.add(WTF::move(updateOperation));
    480494
    481495    // Try to update the header of an existing entry.
    482496    StringCapture cachePathCapture(m_directoryPath);
    483     dispatch_async(m_backgroundIOQueue.get(), [this, key, updateEntry, existingEntry, cachePathCapture, completionHandler] {
    484         auto headerData = encodeEntryHeader(key, updateEntry);
    485         auto existingHeaderData = encodeEntryHeader(key, existingEntry);
     497    dispatch_async(m_backgroundIOQueue.get(), [this, &update, cachePathCapture] {
     498        auto headerData = encodeEntryHeader(update.key, update.entry);
     499        auto existingHeaderData = encodeEntryHeader(update.key, update.existingEntry);
    486500
    487501        bool pageRoundedHeaderSizeChanged = dispatch_data_get_size(headerData.get()) != dispatch_data_get_size(existingHeaderData.get());
    488502        if (pageRoundedHeaderSizeChanged) {
    489503            LOG(NetworkCacheStorage, "(NetworkProcess) page-rounded header size changed, storing full entry");
    490             dispatch_async(dispatch_get_main_queue(), [this, key, updateEntry, completionHandler] {
    491                 store(key, updateEntry, completionHandler);
     504            dispatch_async(dispatch_get_main_queue(), [this, &update] {
     505                store(update.key, update.entry, update.completionHandler);
     506
     507                ASSERT(m_activeUpdateOperations.contains(&update));
     508                m_activeUpdateOperations.remove(&update);
    492509            });
    493510            return;
     
    495512
    496513        int fd;
    497         auto channel = openFileForKey(key, FileOpenType::Write, cachePathCapture.string(), fd);
    498         dispatch_io_write(channel.get(), 0, headerData.get(), dispatch_get_main_queue(), [this, key, completionHandler](bool done, dispatch_data_t, int error) {
     514        auto channel = openFileForKey(update.key, FileOpenType::Write, cachePathCapture.string(), fd);
     515        dispatch_io_write(channel.get(), 0, headerData.get(), dispatch_get_main_queue(), [this, &update](bool done, dispatch_data_t, int error) {
    499516            ASSERT_UNUSED(done, done);
    500517            LOG(NetworkCacheStorage, "(NetworkProcess) update complete error=%d", error);
    501518
    502519            if (error)
    503                 removeEntry(key);
    504             m_writeCache.remove(key.hash());
    505 
    506             completionHandler(!error);
     520                removeEntry(update.key);
     521
     522            update.completionHandler(!error);
     523
     524            ASSERT(m_activeUpdateOperations.contains(&update));
     525            m_activeUpdateOperations.remove(&update);
    507526        });
    508527    });
     
    522541    LOG(NetworkCacheStorage, "(NetworkProcess) clearing cache");
    523542
    524     m_writeCache.clear();
    525543    m_contentsFilter.clear();
    526544    m_approximateEntryCount = 0;
Note: See TracChangeset for help on using the changeset viewer.