From d7337309500ca0ba3d044af7b8809f11acb3f120 Mon Sep 17 00:00:00 2001 From: TellowKrinkle Date: Fri, 25 Mar 2022 05:12:52 -0500 Subject: [PATCH] Common: Add WorkSema --- common/Darwin/DarwinSemaphore.cpp | 59 +++++++++++- common/Semaphore.cpp | 151 +++++++++++++++++++++++++++++- common/Threading.h | 81 ++++++++++++++++ 3 files changed, 288 insertions(+), 3 deletions(-) diff --git a/common/Darwin/DarwinSemaphore.cpp b/common/Darwin/DarwinSemaphore.cpp index 78101fe4b7..8b65bd8193 100644 --- a/common/Darwin/DarwinSemaphore.cpp +++ b/common/Darwin/DarwinSemaphore.cpp @@ -56,6 +56,61 @@ static void MACH_CHECK(kern_return_t mach_retval) } } +Threading::KernelSemaphore::KernelSemaphore() +{ + MACH_CHECK(semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, 0)); +} + +Threading::KernelSemaphore::~KernelSemaphore() +{ + MACH_CHECK(semaphore_destroy(mach_task_self(), m_sema)); +} + +void Threading::KernelSemaphore::Post() +{ + MACH_CHECK(semaphore_signal(m_sema)); +} + +void Threading::KernelSemaphore::Wait() +{ + pxAssertMsg(!wxThread::IsMain(), "Unyielding semaphore wait issued from the main/gui thread. Use WaitWithYield."); + MACH_CHECK(semaphore_wait(m_sema)); +} + +/// Wait up to the given time +/// Returns true if successful, false if timed out +static bool WaitUpTo(semaphore_t sema, wxTimeSpan wxtime) +{ + mach_timespec_t time; + u64 ms = wxtime.GetMilliseconds().GetValue(); + time.tv_sec = ms / 1000; + time.tv_nsec = (ms % 1000) * 1000000; + kern_return_t res = semaphore_timedwait(sema, time); + if (res == KERN_OPERATION_TIMED_OUT) + return false; + MACH_CHECK(res); + return true; +} + +void Threading::KernelSemaphore::WaitWithYield() +{ +#if wxUSE_GUI + if (!wxThread::IsMain() || (wxTheApp == NULL)) + { + Wait(); + } + else + { + while (!WaitUpTo(m_sema, def_yieldgui_interval)) + { + YieldToMain(); + } + } +#else + WaitWithoutYield(); +#endif +} + Threading::Semaphore::Semaphore() { // other platforms explicitly make a thread-private (unshared) semaphore @@ -176,7 +231,9 @@ void Threading::Semaphore::Wait() } else { - while (!WaitWithoutYield(def_yieldgui_interval)) + if (__atomic_sub_fetch(&m_counter, 1, __ATOMIC_ACQUIRE) >= 0) + return; + while (!WaitUpTo(m_sema, def_yieldgui_interval)) { YieldToMain(); } diff --git a/common/Semaphore.cpp b/common/Semaphore.cpp index 130536cfb5..d7caebb236 100644 --- a/common/Semaphore.cpp +++ b/common/Semaphore.cpp @@ -13,8 +13,6 @@ * If not, see . */ -#if !defined(__APPLE__) - #include "common/Threading.h" #include "common/ThreadingInternal.h" @@ -22,6 +20,155 @@ // Semaphore Implementations // -------------------------------------------------------------------------------------- + +void Threading::WorkSema::WaitForWork() +{ + // State change: + // SLEEPING, SPINNING: This is the worker thread and it's clearly not asleep or spinning, so these states should be impossible + // RUNNING_0: Change state to SLEEPING, wake up thread if WAITING_EMPTY + // RUNNING_N: Change state to RUNNING_0 (and preserve WAITING_EMPTY flag) + s32 value = m_state.load(std::memory_order_relaxed); + while (!m_state.compare_exchange_weak(value, NextStateWaitForWork(value), std::memory_order_acq_rel, std::memory_order_relaxed)) + ; + if (IsReadyForSleep(value)) + { + if (value & STATE_FLAG_WAITING_EMPTY) + m_empty_sema.Post(); + m_sema.Wait(); + // Acknowledge any additional work added between wake up request and getting here + m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire); + } +} + +void Threading::WorkSema::WaitForWorkWithSpin() +{ + s32 value = m_state.load(std::memory_order_relaxed); + while (IsReadyForSleep(value)) + { + if (m_state.compare_exchange_weak(value, STATE_SPINNING, std::memory_order_release, std::memory_order_relaxed)) + { + if (value & STATE_FLAG_WAITING_EMPTY) + m_empty_sema.Post(); + value = STATE_SPINNING; + break; + } + } + u32 waited = 0; + while (value < 0) + { + if (waited > SPIN_TIME_NS) + { + if (!m_state.compare_exchange_weak(value, STATE_SLEEPING, std::memory_order_relaxed)) + continue; + m_sema.Wait(); + break; + } + waited += ShortSpin(); + value = m_state.load(std::memory_order_relaxed); + } + // Clear back to STATE_RUNNING_0 (but preserve waiting empty flag) + m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire); +} + +void Threading::WorkSema::WaitForEmpty() +{ + s32 value = m_state.load(std::memory_order_acquire); + while (true) + { + if (value < 0) + return; // STATE_SLEEPING or STATE_SPINNING, queue is empty! + if (m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire)) + break; + } + pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)"); + m_empty_sema.WaitWithYield(); +} + +void Threading::WorkSema::WaitForEmptyWithSpin() +{ + s32 value = m_state.load(std::memory_order_acquire); + u32 waited = 0; + while (true) + { + if (value < 0) + return; // STATE_SLEEPING or STATE_SPINNING, queue is empty! + if (waited > SPIN_TIME_NS && m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire)) + break; + waited += ShortSpin(); + value = m_state.load(std::memory_order_acquire); + } + pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)"); + m_empty_sema.WaitWithYield(); +} + +#if !defined(__APPLE__) // macOS implementations are in DarwinSemaphore + +Threading::KernelSemaphore::KernelSemaphore() +{ +#ifdef _WIN32 + m_sema = CreateSemaphore(nullptr, 0, LONG_MAX, nullptr); +#else + sem_init(&m_sema, false, 0); +#endif +} + +Threading::KernelSemaphore::~KernelSemaphore() +{ +#ifdef _WIN32 + CloseHandle(m_sema); +#else + sem_destroy(&m_sema); +#endif +} + +void Threading::KernelSemaphore::Post() +{ +#ifdef _WIN32 + ReleaseSemaphore(m_sema, 1, nullptr); +#else + sem_post(&m_sema); +#endif +} + +void Threading::KernelSemaphore::Wait() +{ + pxAssertMsg(!wxThread::IsMain(), "Unyielding semaphore wait issued from the main/gui thread. Use WaitWithYield."); +#ifdef _WIN32 + pthreadCancelableWait(m_sema); +#else + sem_wait(&m_sema); +#endif +} + +void Threading::KernelSemaphore::WaitWithYield() +{ +#if wxUSE_GUI + if (!wxThread::IsMain() || (wxTheApp == NULL)) + { + Wait(); + } + else + { +#ifdef _WIN32 + u64 millis = def_yieldgui_interval.GetMilliseconds().GetValue(); + while (pthreadCancelableTimedWait(m_sema, millis) == WAIT_TIMEOUT) + YieldToMain(); +#else + while (true) + { + wxDateTime megafail(wxDateTime::UNow() + def_yieldgui_interval); + const timespec fail = {megafail.GetTicks(), megafail.GetMillisecond() * 1000000}; + if (sem_timedwait(&m_sema, &fail) == 0) + break; + YieldToMain(); + } +#endif + } +#else + Wait(); +#endif +} + Threading::Semaphore::Semaphore() { sem_init(&m_sema, false, 0); diff --git a/common/Threading.h b/common/Threading.h index 52ebf22eac..40ae7630a7 100644 --- a/common/Threading.h +++ b/common/Threading.h @@ -29,6 +29,7 @@ #include "common/Pcsx2Defs.h" #include "common/TraceLog.h" #include "common/General.h" +#include #undef Yield // release the burden of windows.h global namespace spam. @@ -235,6 +236,86 @@ namespace Threading } }; + /// A semaphore that may not have a fast userspace path + /// (Used in other semaphore-based algorithms where the semaphore is just used for its thread sleep/wake ability) + class KernelSemaphore + { +#if defined(_WIN32) + void* m_sema; +#elif defined(__APPLE__) + semaphore_t m_sema; +#else + sem_t m_sema; +#endif + public: + KernelSemaphore(); + ~KernelSemaphore(); + void Post(); + void Wait(); + void WaitWithYield(); + }; + + /// A semaphore for notifying a work-processing thread of new work in a (separate) queue + /// + /// Usage: + /// - Processing thread loops on `WaitForWork()` followed by processing all work in the queue + /// - Threads adding work first add their work to the queue, then call `NotifyOfWork()` + class WorkSema + { + /// Semaphore for sleeping the worker thread + KernelSemaphore m_sema; + /// Semaphore for sleeping thread waiting on worker queue empty + KernelSemaphore m_empty_sema; + /// Current state (see enum below) + std::atomic m_state{0}; + + // Expected call frequency is NotifyOfWork > WaitForWork > WaitForEmpty + // So optimize states for fast NotifyOfWork + enum + { + STATE_SPINNING = -2, ///< Worker thread is spinning waiting for work + STATE_SLEEPING = -1, ///< Worker thread is sleeping on m_sema + STATE_RUNNING_0 = 0, ///< Worker thread is processing work, but no work has been added since it last checked for new work + /* Any >0 state: STATE_RUNNING_N: Worker thread is processing work, and work has been added since it last checked for new work */ + STATE_FLAG_WAITING_EMPTY = 1 << 30, ///< Flag to indicate that a thread is sleeping on m_empty_sema (can be applied to any STATE_RUNNING) + }; + + bool IsReadyForSleep(s32 state) + { + s32 waiting_empty_cleared = state & (STATE_FLAG_WAITING_EMPTY - 1); + return waiting_empty_cleared == STATE_RUNNING_0; + } + + s32 NextStateWaitForWork(s32 current) + { + s32 new_state = IsReadyForSleep(current) ? STATE_SLEEPING : STATE_RUNNING_0; + return new_state | (current & STATE_FLAG_WAITING_EMPTY); // Preserve waiting empty flag for RUNNING_N → RUNNING_0 + } + + public: + /// Notify the worker thread that you've added new work to its queue + void NotifyOfWork() + { + // State change: + // SPINNING: Change state to RUNNING. Thread will notice and process the new data + // SLEEPING: Change state to RUNNING and wake worker. Thread will wake up and process the new data. + // RUNNING_0: Change state to RUNNING_N. + // RUNNING_N: Stay in RUNNING_N + s32 old = m_state.fetch_add(2, std::memory_order_release); + if (old == STATE_SLEEPING) + m_sema.Post(); + } + + /// Wait for work to be added to the queue + void WaitForWork(); + /// Wait for work to be added to the queue, spinning for a bit before sleeping the thread + void WaitForWorkWithSpin(); + /// Wait for the worker thread to finish processing all entries in the queue + void WaitForEmpty(); + /// Wait for the worker thread to finish processing all entries in the queue, spinning a bit before sleeping the thread + void WaitForEmptyWithSpin(); + }; + class Semaphore { protected: