diff --git a/common/Semaphore.cpp b/common/Semaphore.cpp index d7caebb236..42b3247843 100644 --- a/common/Semaphore.cpp +++ b/common/Semaphore.cpp @@ -28,6 +28,7 @@ void Threading::WorkSema::WaitForWork() // RUNNING_0: Change state to SLEEPING, wake up thread if WAITING_EMPTY // RUNNING_N: Change state to RUNNING_0 (and preserve WAITING_EMPTY flag) s32 value = m_state.load(std::memory_order_relaxed); + pxAssert(!IsDead(value)); while (!m_state.compare_exchange_weak(value, NextStateWaitForWork(value), std::memory_order_acq_rel, std::memory_order_relaxed)) ; if (IsReadyForSleep(value)) @@ -43,6 +44,7 @@ void Threading::WorkSema::WaitForWork() void Threading::WorkSema::WaitForWorkWithSpin() { s32 value = m_state.load(std::memory_order_relaxed); + pxAssert(!IsDead(value)); while (IsReadyForSleep(value)) { if (m_state.compare_exchange_weak(value, STATE_SPINNING, std::memory_order_release, std::memory_order_relaxed)) @@ -70,28 +72,29 @@ void Threading::WorkSema::WaitForWorkWithSpin() m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire); } -void Threading::WorkSema::WaitForEmpty() +bool Threading::WorkSema::WaitForEmpty() { s32 value = m_state.load(std::memory_order_acquire); while (true) { if (value < 0) - return; // STATE_SLEEPING or STATE_SPINNING, queue is empty! + return !IsDead(value); // STATE_SLEEPING or STATE_SPINNING, queue is empty! if (m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire)) break; } pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)"); m_empty_sema.WaitWithYield(); + return !IsDead(m_state.load(std::memory_order_relaxed)); } -void Threading::WorkSema::WaitForEmptyWithSpin() +bool Threading::WorkSema::WaitForEmptyWithSpin() { s32 value = m_state.load(std::memory_order_acquire); u32 waited = 0; while (true) { if (value < 0) - return; // STATE_SLEEPING or STATE_SPINNING, queue is empty! + return !IsDead(value); // STATE_SLEEPING or STATE_SPINNING, queue is empty! if (waited > SPIN_TIME_NS && m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire)) break; waited += ShortSpin(); @@ -99,6 +102,19 @@ void Threading::WorkSema::WaitForEmptyWithSpin() } pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)"); m_empty_sema.WaitWithYield(); + return !IsDead(m_state.load(std::memory_order_relaxed)); +} + +void Threading::WorkSema::Kill() +{ + s32 value = m_state.exchange(std::numeric_limits::min(), std::memory_order_release); + if (value & STATE_FLAG_WAITING_EMPTY) + m_empty_sema.Post(); +} + +void Threading::WorkSema::Reset() +{ + m_state = STATE_RUNNING_0; } #if !defined(__APPLE__) // macOS implementations are in DarwinSemaphore diff --git a/common/Threading.h b/common/Threading.h index 40ae7630a7..64ba4dc1be 100644 --- a/common/Threading.h +++ b/common/Threading.h @@ -273,6 +273,7 @@ namespace Threading // So optimize states for fast NotifyOfWork enum { + /* Any <-2 state: STATE_DEAD: Thread has crashed and is awaiting revival */ STATE_SPINNING = -2, ///< Worker thread is spinning waiting for work STATE_SLEEPING = -1, ///< Worker thread is sleeping on m_sema STATE_RUNNING_0 = 0, ///< Worker thread is processing work, but no work has been added since it last checked for new work @@ -280,6 +281,11 @@ namespace Threading STATE_FLAG_WAITING_EMPTY = 1 << 30, ///< Flag to indicate that a thread is sleeping on m_empty_sema (can be applied to any STATE_RUNNING) }; + bool IsDead(s32 state) + { + return state < STATE_SPINNING; + } + bool IsReadyForSleep(s32 state) { s32 waiting_empty_cleared = state & (STATE_FLAG_WAITING_EMPTY - 1); @@ -297,6 +303,7 @@ namespace Threading void NotifyOfWork() { // State change: + // DEAD: Stay in DEAD (starting DEAD state is INT_MIN so we can assume we won't flip over to anything else) // SPINNING: Change state to RUNNING. Thread will notice and process the new data // SLEEPING: Change state to RUNNING and wake worker. Thread will wake up and process the new data. // RUNNING_0: Change state to RUNNING_N. @@ -310,10 +317,18 @@ namespace Threading void WaitForWork(); /// Wait for work to be added to the queue, spinning for a bit before sleeping the thread void WaitForWorkWithSpin(); - /// Wait for the worker thread to finish processing all entries in the queue - void WaitForEmpty(); - /// Wait for the worker thread to finish processing all entries in the queue, spinning a bit before sleeping the thread - void WaitForEmptyWithSpin(); + /// Wait for the worker thread to finish processing all entries in the queue or die + /// Returns false if the thread is dead + bool WaitForEmpty(); + /// Wait for the worker thread to finish processing all entries in the queue or die, spinning a bit before sleeping the thread + /// Returns false if the thread is dead + bool WaitForEmptyWithSpin(); + /// Called by the worker thread to notify others of its death + /// Dead threads don't process work, and WaitForEmpty will return instantly even though there may be work in the queue + void Kill(); + /// Reset the semaphore to the initial state + /// Should be called by the worker thread if it restarts after dying + void Reset(); }; class Semaphore