Common: Add dead thread support to WorkSema

MTGS supports having exceptions kill the thread and send their result back to a thread calling WaitGS...
This commit is contained in:
TellowKrinkle 2022-03-26 23:56:00 -05:00 committed by refractionpcsx2
parent 63fd349e3c
commit b28779b0f6
2 changed files with 39 additions and 8 deletions

View File

@ -28,6 +28,7 @@ void Threading::WorkSema::WaitForWork()
// RUNNING_0: Change state to SLEEPING, wake up thread if WAITING_EMPTY // RUNNING_0: Change state to SLEEPING, wake up thread if WAITING_EMPTY
// RUNNING_N: Change state to RUNNING_0 (and preserve WAITING_EMPTY flag) // RUNNING_N: Change state to RUNNING_0 (and preserve WAITING_EMPTY flag)
s32 value = m_state.load(std::memory_order_relaxed); s32 value = m_state.load(std::memory_order_relaxed);
pxAssert(!IsDead(value));
while (!m_state.compare_exchange_weak(value, NextStateWaitForWork(value), std::memory_order_acq_rel, std::memory_order_relaxed)) while (!m_state.compare_exchange_weak(value, NextStateWaitForWork(value), std::memory_order_acq_rel, std::memory_order_relaxed))
; ;
if (IsReadyForSleep(value)) if (IsReadyForSleep(value))
@ -43,6 +44,7 @@ void Threading::WorkSema::WaitForWork()
void Threading::WorkSema::WaitForWorkWithSpin() void Threading::WorkSema::WaitForWorkWithSpin()
{ {
s32 value = m_state.load(std::memory_order_relaxed); s32 value = m_state.load(std::memory_order_relaxed);
pxAssert(!IsDead(value));
while (IsReadyForSleep(value)) while (IsReadyForSleep(value))
{ {
if (m_state.compare_exchange_weak(value, STATE_SPINNING, std::memory_order_release, std::memory_order_relaxed)) if (m_state.compare_exchange_weak(value, STATE_SPINNING, std::memory_order_release, std::memory_order_relaxed))
@ -70,28 +72,29 @@ void Threading::WorkSema::WaitForWorkWithSpin()
m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire); m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire);
} }
void Threading::WorkSema::WaitForEmpty() bool Threading::WorkSema::WaitForEmpty()
{ {
s32 value = m_state.load(std::memory_order_acquire); s32 value = m_state.load(std::memory_order_acquire);
while (true) while (true)
{ {
if (value < 0) if (value < 0)
return; // STATE_SLEEPING or STATE_SPINNING, queue is empty! return !IsDead(value); // STATE_SLEEPING or STATE_SPINNING, queue is empty!
if (m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire)) if (m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire))
break; break;
} }
pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)"); pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)");
m_empty_sema.WaitWithYield(); m_empty_sema.WaitWithYield();
return !IsDead(m_state.load(std::memory_order_relaxed));
} }
void Threading::WorkSema::WaitForEmptyWithSpin() bool Threading::WorkSema::WaitForEmptyWithSpin()
{ {
s32 value = m_state.load(std::memory_order_acquire); s32 value = m_state.load(std::memory_order_acquire);
u32 waited = 0; u32 waited = 0;
while (true) while (true)
{ {
if (value < 0) if (value < 0)
return; // STATE_SLEEPING or STATE_SPINNING, queue is empty! return !IsDead(value); // STATE_SLEEPING or STATE_SPINNING, queue is empty!
if (waited > SPIN_TIME_NS && m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire)) if (waited > SPIN_TIME_NS && m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire))
break; break;
waited += ShortSpin(); waited += ShortSpin();
@ -99,6 +102,19 @@ void Threading::WorkSema::WaitForEmptyWithSpin()
} }
pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)"); pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)");
m_empty_sema.WaitWithYield(); m_empty_sema.WaitWithYield();
return !IsDead(m_state.load(std::memory_order_relaxed));
}
void Threading::WorkSema::Kill()
{
s32 value = m_state.exchange(std::numeric_limits<s32>::min(), std::memory_order_release);
if (value & STATE_FLAG_WAITING_EMPTY)
m_empty_sema.Post();
}
void Threading::WorkSema::Reset()
{
m_state = STATE_RUNNING_0;
} }
#if !defined(__APPLE__) // macOS implementations are in DarwinSemaphore #if !defined(__APPLE__) // macOS implementations are in DarwinSemaphore

View File

@ -273,6 +273,7 @@ namespace Threading
// So optimize states for fast NotifyOfWork // So optimize states for fast NotifyOfWork
enum enum
{ {
/* Any <-2 state: STATE_DEAD: Thread has crashed and is awaiting revival */
STATE_SPINNING = -2, ///< Worker thread is spinning waiting for work STATE_SPINNING = -2, ///< Worker thread is spinning waiting for work
STATE_SLEEPING = -1, ///< Worker thread is sleeping on m_sema STATE_SLEEPING = -1, ///< Worker thread is sleeping on m_sema
STATE_RUNNING_0 = 0, ///< Worker thread is processing work, but no work has been added since it last checked for new work STATE_RUNNING_0 = 0, ///< Worker thread is processing work, but no work has been added since it last checked for new work
@ -280,6 +281,11 @@ namespace Threading
STATE_FLAG_WAITING_EMPTY = 1 << 30, ///< Flag to indicate that a thread is sleeping on m_empty_sema (can be applied to any STATE_RUNNING) STATE_FLAG_WAITING_EMPTY = 1 << 30, ///< Flag to indicate that a thread is sleeping on m_empty_sema (can be applied to any STATE_RUNNING)
}; };
bool IsDead(s32 state)
{
return state < STATE_SPINNING;
}
bool IsReadyForSleep(s32 state) bool IsReadyForSleep(s32 state)
{ {
s32 waiting_empty_cleared = state & (STATE_FLAG_WAITING_EMPTY - 1); s32 waiting_empty_cleared = state & (STATE_FLAG_WAITING_EMPTY - 1);
@ -297,6 +303,7 @@ namespace Threading
void NotifyOfWork() void NotifyOfWork()
{ {
// State change: // State change:
// DEAD: Stay in DEAD (starting DEAD state is INT_MIN so we can assume we won't flip over to anything else)
// SPINNING: Change state to RUNNING. Thread will notice and process the new data // SPINNING: Change state to RUNNING. Thread will notice and process the new data
// SLEEPING: Change state to RUNNING and wake worker. Thread will wake up and process the new data. // SLEEPING: Change state to RUNNING and wake worker. Thread will wake up and process the new data.
// RUNNING_0: Change state to RUNNING_N. // RUNNING_0: Change state to RUNNING_N.
@ -310,10 +317,18 @@ namespace Threading
void WaitForWork(); void WaitForWork();
/// Wait for work to be added to the queue, spinning for a bit before sleeping the thread /// Wait for work to be added to the queue, spinning for a bit before sleeping the thread
void WaitForWorkWithSpin(); void WaitForWorkWithSpin();
/// Wait for the worker thread to finish processing all entries in the queue /// Wait for the worker thread to finish processing all entries in the queue or die
void WaitForEmpty(); /// Returns false if the thread is dead
/// Wait for the worker thread to finish processing all entries in the queue, spinning a bit before sleeping the thread bool WaitForEmpty();
void WaitForEmptyWithSpin(); /// Wait for the worker thread to finish processing all entries in the queue or die, spinning a bit before sleeping the thread
/// Returns false if the thread is dead
bool WaitForEmptyWithSpin();
/// Called by the worker thread to notify others of its death
/// Dead threads don't process work, and WaitForEmpty will return instantly even though there may be work in the queue
void Kill();
/// Reset the semaphore to the initial state
/// Should be called by the worker thread if it restarts after dying
void Reset();
}; };
class Semaphore class Semaphore