blob: b1cea7cac870500530a6e98749338aca89ad2623 [file] [edit]
/*
* Copyright (C) 2026 Igalia, S.L.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include <wtf/AutomaticThread.h>
#include <wtf/Condition.h>
#include <wtf/DataMutex.h>
#include <wtf/Deque.h>
#include <wtf/ThreadSpecific.h>
#include <wtf/TZoneMallocInlines.h>
#include <wtf/Variant.h>
struct Invalid { };
struct Trivial { };
struct Exit { };
struct WaitOn {
WaitOn(Box<Lock> lock, Condition* condition)
: m_lock(WTF::move(lock))
, m_condition(condition)
{
}
Box<Lock> m_lock;
Condition* m_condition;
};
using WorkItem = WTF::Variant<Invalid, Trivial, Exit, struct WaitOn*>;
struct CachedData {
CachedData()
{
Locker locker { s_lock };
s_numberOfCachedData++;
s_condition.notifyAll();
}
~CachedData()
{
Locker locker { s_lock };
--s_numberOfCachedData;
s_condition.notifyAll();
}
static void waitUntil(size_t count)
{
Locker locker { s_lock };
while (s_numberOfCachedData != count)
s_condition.wait(s_lock);
}
int m_value;
static Lock s_lock;
static size_t s_numberOfCachedData WTF_GUARDED_BY_LOCK(s_lock);
static Condition s_condition;
};
using ThreadSpecificCachedData = ThreadSpecific<CachedData>;
size_t CachedData::s_numberOfCachedData;
Lock CachedData::s_lock;
Condition CachedData::s_condition { };
ThreadSpecificCachedData& threadSpecificCachedData()
{
static ThreadSpecificCachedData* data;
static std::once_flag flag;
std::call_once(
flag,
[]() {
data = new ThreadSpecificCachedData();
});
return *data;
}
class WorkItemConsumerThread : public AutomaticThread {
WTF_MAKE_TZONE_ALLOCATED(WorkItemConsumerThread);
WTF_OVERRIDE_DELETE_FOR_CHECKED_PTR(WorkItemConsumerThread);
public:
WorkItemConsumerThread(const AbstractLocker& locker, Box<Lock> lock, Ref<AutomaticThreadCondition>&& condition, Seconds timeout)
: AutomaticThread(locker, WTF::move(lock), WTF::move(condition), timeout)
, m_started(0)
, m_stopped(0)
, m_lock()
, m_runningCondition()
, m_currentItem(Invalid { })
, m_numberOfSubmittedItems(0)
, m_numberOfProcessedItems(0)
{
}
void waitUntilHasStopped(unsigned times)
{
for (;;) {
Locker locker { m_lock };
if (m_stopped >= times)
return;
m_runningCondition.wait(m_lock);
}
}
void waitUntilHasStarted(unsigned times)
{
for (;;) {
Locker locker { m_lock };
if (m_started >= times)
return;
m_runningCondition.wait(m_lock);
}
}
void appendWorkItem(WorkItem item)
{
DataMutexLocker workItems { m_workItems };
workItems->append(item);
++m_numberOfSubmittedItems;
}
private:
PollResult poll(const AbstractLocker&)
{
DataMutexLocker workItems { m_workItems };
if (workItems->isEmpty())
return PollResult::Wait;
m_currentItem = workItems->takeFirst();
return PollResult::Work;
}
WorkResult work()
{
threadSpecificCachedData()->m_value++;
RELEASE_ASSERT(!std::get_if<Invalid>(&m_currentItem));
++m_numberOfProcessedItems;
if (std::get_if<Trivial>(&m_currentItem)) {
m_currentItem = Invalid { };
return WorkResult::Continue;
}
if (std::get_if<Exit>(&m_currentItem))
return WorkResult::Stop;
if (auto waitOn = std::get_if<WaitOn*>(&m_currentItem)) {
Locker locker { *(*waitOn)->m_lock };
(*waitOn)->m_condition->wait(*(*waitOn)->m_lock);
// Wait twice. This gives outsiders the chance to do things knowing
// we're waiting here.
(*waitOn)->m_condition->wait(*(*waitOn)->m_lock);
return WorkResult::Continue;
}
RELEASE_ASSERT_NOT_REACHED();
}
void threadDidStart()
{
Locker locker { m_lock };
++m_started;
m_runningCondition.notifyAll();
}
void threadIsStopping(const AbstractLocker&)
{
Locker locker { m_lock };
++m_stopped;
RELEASE_ASSERT(m_started == m_stopped);
m_runningCondition.notifyAll();
ASSERT_EQ(m_numberOfSubmittedItems, m_numberOfProcessedItems);
}
unsigned m_started;
unsigned m_stopped;
DataMutex<Deque<WorkItem>> m_workItems;
Lock m_lock;
Condition m_runningCondition;
WorkItem m_currentItem;
size_t m_numberOfSubmittedItems;
size_t m_numberOfProcessedItems;
};
WTF_MAKE_TZONE_ALLOCATED_IMPL(WorkItemConsumerThread);
TEST(WTF, AutomaticThreadStopsWhenNotGivenWork)
{
auto lock = Box<Lock>::create();
auto condition = AutomaticThreadCondition::create();
RefPtr<WorkItemConsumerThread> thread;
{
Locker locker { *lock };
thread = adoptRef(new WorkItemConsumerThread(locker, lock, condition.copyRef(), 1_s));
condition->notifyOne(locker);
}
// Test that the thread stops when it hasn't ever been given work.
thread->waitUntilHasStopped(1);
CachedData::waitUntil(0);
// Test that the thread stops after processing work.
thread->appendWorkItem(Trivial { });
{
Locker locker { *lock };
condition->notifyOne(locker);
}
thread->waitUntilHasStopped(2);
thread->appendWorkItem(Exit { });
{
Locker locker { *lock };
condition->notifyOne(locker);
}
thread->waitUntilHasStopped(3);
CachedData::waitUntil(0);
thread = nullptr;
}
TEST(WTF, AutomaticThreadTemporaryStop)
{
auto lock = Box<Lock>::create();
auto condition = AutomaticThreadCondition::create();
RefPtr<WorkItemConsumerThread> thread;
CachedData::waitUntil(0);
{
Locker locker { *lock };
// Note the timeout: will never stop on its own.
thread = adoptRef(new WorkItemConsumerThread(locker, lock, condition.copyRef(), Seconds::infinity()));
thread->appendWorkItem(Trivial { });
condition->notifyOne(locker);
}
thread->waitUntilHasStarted(1);
CachedData::waitUntil(1); // ::work has run, i.e. we should have an underlying thread.
{
Locker locker { *lock };
ASSERT_TRUE(thread->hasUnderlyingThread(locker));
thread->requestTemporaryStop(locker);
}
thread->waitUntilHasStopped(1);
CachedData::waitUntil(0);
{
Locker locker { *lock };
ASSERT_FALSE(thread->hasUnderlyingThread(locker));
}
// Does it start again?
thread->appendWorkItem(Trivial { });
{
Locker locker { *lock };
condition->notifyOne(locker);
}
thread->waitUntilHasStarted(2);
{
Locker locker { *lock };
// Shut it down.
thread->appendWorkItem(Exit { });
condition->notifyOne(locker);
}
thread->waitUntilHasStopped(2);
CachedData::waitUntil(0);
thread = nullptr;
}
TEST(WTF, AutomaticThreadTemporaryStopWhileRunning)
{
auto lock = Box<Lock>::create();
auto condition = AutomaticThreadCondition::create();
auto itemLock = Box<Lock>::create();
Condition* itemCondition = new Condition();
RefPtr<WorkItemConsumerThread> thread;
CachedData::waitUntil(0);
{
Locker locker { *lock };
// Note the timeout: will never stop on its own.
thread = adoptRef(new WorkItemConsumerThread(locker, lock, condition.copyRef(), Seconds::infinity()));
Locker itemLocker { *itemLock };
thread->appendWorkItem(new WaitOn { itemLock, itemCondition });
condition->notifyOne(locker);
}
thread->waitUntilHasStarted(1);
// The thread should block on the WaitOn item in its ::work soon.
while (!itemCondition->notifyOne())
Thread::yield();
// OK, now we know the thread is in ::work.
// It must have created thread-specific data.
{
Locker locker { CachedData::s_lock };
ASSERT_EQ(CachedData::s_numberOfCachedData, 1);
}
// Request the temporary stop.
{
Locker locker { *lock };
thread->requestTemporaryStop(locker);
}
// Allow the thread to complete its work.
while (!itemCondition->notifyOne())
Thread::yield();
// Now the thread should stop. This can only happen as a result of our
// request (timeout is set to infinity).
thread->waitUntilHasStopped(1);
CachedData::waitUntil(0);
{
Locker locker { *lock };
// Shut it down.
thread->appendWorkItem(Exit { });
condition->notifyOne(locker);
}
thread->waitUntilHasStopped(2);
CachedData::waitUntil(0);
thread = nullptr;
}