Common: Add WorkSema

This commit is contained in:
TellowKrinkle 2022-03-25 05:12:52 -05:00 committed by refractionpcsx2
parent 93a9e5dd83
commit d733730950
3 changed files with 288 additions and 3 deletions

View File

@ -56,6 +56,61 @@ static void MACH_CHECK(kern_return_t mach_retval)
}
}
Threading::KernelSemaphore::KernelSemaphore()
{
MACH_CHECK(semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, 0));
}
Threading::KernelSemaphore::~KernelSemaphore()
{
MACH_CHECK(semaphore_destroy(mach_task_self(), m_sema));
}
void Threading::KernelSemaphore::Post()
{
MACH_CHECK(semaphore_signal(m_sema));
}
void Threading::KernelSemaphore::Wait()
{
pxAssertMsg(!wxThread::IsMain(), "Unyielding semaphore wait issued from the main/gui thread. Use WaitWithYield.");
MACH_CHECK(semaphore_wait(m_sema));
}
/// Wait up to the given time
/// Returns true if successful, false if timed out
static bool WaitUpTo(semaphore_t sema, wxTimeSpan wxtime)
{
mach_timespec_t time;
u64 ms = wxtime.GetMilliseconds().GetValue();
time.tv_sec = ms / 1000;
time.tv_nsec = (ms % 1000) * 1000000;
kern_return_t res = semaphore_timedwait(sema, time);
if (res == KERN_OPERATION_TIMED_OUT)
return false;
MACH_CHECK(res);
return true;
}
void Threading::KernelSemaphore::WaitWithYield()
{
#if wxUSE_GUI
if (!wxThread::IsMain() || (wxTheApp == NULL))
{
Wait();
}
else
{
while (!WaitUpTo(m_sema, def_yieldgui_interval))
{
YieldToMain();
}
}
#else
WaitWithoutYield();
#endif
}
Threading::Semaphore::Semaphore()
{
// other platforms explicitly make a thread-private (unshared) semaphore
@ -176,7 +231,9 @@ void Threading::Semaphore::Wait()
}
else
{
while (!WaitWithoutYield(def_yieldgui_interval))
if (__atomic_sub_fetch(&m_counter, 1, __ATOMIC_ACQUIRE) >= 0)
return;
while (!WaitUpTo(m_sema, def_yieldgui_interval))
{
YieldToMain();
}

View File

@ -13,8 +13,6 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(__APPLE__)
#include "common/Threading.h"
#include "common/ThreadingInternal.h"
@ -22,6 +20,155 @@
// Semaphore Implementations
// --------------------------------------------------------------------------------------
void Threading::WorkSema::WaitForWork()
{
// State change:
// SLEEPING, SPINNING: This is the worker thread and it's clearly not asleep or spinning, so these states should be impossible
// RUNNING_0: Change state to SLEEPING, wake up thread if WAITING_EMPTY
// RUNNING_N: Change state to RUNNING_0 (and preserve WAITING_EMPTY flag)
s32 value = m_state.load(std::memory_order_relaxed);
while (!m_state.compare_exchange_weak(value, NextStateWaitForWork(value), std::memory_order_acq_rel, std::memory_order_relaxed))
;
if (IsReadyForSleep(value))
{
if (value & STATE_FLAG_WAITING_EMPTY)
m_empty_sema.Post();
m_sema.Wait();
// Acknowledge any additional work added between wake up request and getting here
m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire);
}
}
void Threading::WorkSema::WaitForWorkWithSpin()
{
s32 value = m_state.load(std::memory_order_relaxed);
while (IsReadyForSleep(value))
{
if (m_state.compare_exchange_weak(value, STATE_SPINNING, std::memory_order_release, std::memory_order_relaxed))
{
if (value & STATE_FLAG_WAITING_EMPTY)
m_empty_sema.Post();
value = STATE_SPINNING;
break;
}
}
u32 waited = 0;
while (value < 0)
{
if (waited > SPIN_TIME_NS)
{
if (!m_state.compare_exchange_weak(value, STATE_SLEEPING, std::memory_order_relaxed))
continue;
m_sema.Wait();
break;
}
waited += ShortSpin();
value = m_state.load(std::memory_order_relaxed);
}
// Clear back to STATE_RUNNING_0 (but preserve waiting empty flag)
m_state.fetch_and(STATE_FLAG_WAITING_EMPTY, std::memory_order_acquire);
}
void Threading::WorkSema::WaitForEmpty()
{
s32 value = m_state.load(std::memory_order_acquire);
while (true)
{
if (value < 0)
return; // STATE_SLEEPING or STATE_SPINNING, queue is empty!
if (m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire))
break;
}
pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)");
m_empty_sema.WaitWithYield();
}
void Threading::WorkSema::WaitForEmptyWithSpin()
{
s32 value = m_state.load(std::memory_order_acquire);
u32 waited = 0;
while (true)
{
if (value < 0)
return; // STATE_SLEEPING or STATE_SPINNING, queue is empty!
if (waited > SPIN_TIME_NS && m_state.compare_exchange_weak(value, value | STATE_FLAG_WAITING_EMPTY, std::memory_order_relaxed, std::memory_order_acquire))
break;
waited += ShortSpin();
value = m_state.load(std::memory_order_acquire);
}
pxAssertDev(!(value & STATE_FLAG_WAITING_EMPTY), "Multiple threads attempted to wait for empty (not currently supported)");
m_empty_sema.WaitWithYield();
}
#if !defined(__APPLE__) // macOS implementations are in DarwinSemaphore
Threading::KernelSemaphore::KernelSemaphore()
{
#ifdef _WIN32
m_sema = CreateSemaphore(nullptr, 0, LONG_MAX, nullptr);
#else
sem_init(&m_sema, false, 0);
#endif
}
Threading::KernelSemaphore::~KernelSemaphore()
{
#ifdef _WIN32
CloseHandle(m_sema);
#else
sem_destroy(&m_sema);
#endif
}
void Threading::KernelSemaphore::Post()
{
#ifdef _WIN32
ReleaseSemaphore(m_sema, 1, nullptr);
#else
sem_post(&m_sema);
#endif
}
void Threading::KernelSemaphore::Wait()
{
pxAssertMsg(!wxThread::IsMain(), "Unyielding semaphore wait issued from the main/gui thread. Use WaitWithYield.");
#ifdef _WIN32
pthreadCancelableWait(m_sema);
#else
sem_wait(&m_sema);
#endif
}
void Threading::KernelSemaphore::WaitWithYield()
{
#if wxUSE_GUI
if (!wxThread::IsMain() || (wxTheApp == NULL))
{
Wait();
}
else
{
#ifdef _WIN32
u64 millis = def_yieldgui_interval.GetMilliseconds().GetValue();
while (pthreadCancelableTimedWait(m_sema, millis) == WAIT_TIMEOUT)
YieldToMain();
#else
while (true)
{
wxDateTime megafail(wxDateTime::UNow() + def_yieldgui_interval);
const timespec fail = {megafail.GetTicks(), megafail.GetMillisecond() * 1000000};
if (sem_timedwait(&m_sema, &fail) == 0)
break;
YieldToMain();
}
#endif
}
#else
Wait();
#endif
}
Threading::Semaphore::Semaphore()
{
sem_init(&m_sema, false, 0);

View File

@ -29,6 +29,7 @@
#include "common/Pcsx2Defs.h"
#include "common/TraceLog.h"
#include "common/General.h"
#include <atomic>
#undef Yield // release the burden of windows.h global namespace spam.
@ -235,6 +236,86 @@ namespace Threading
}
};
/// A semaphore that may not have a fast userspace path
/// (Used in other semaphore-based algorithms where the semaphore is just used for its thread sleep/wake ability)
class KernelSemaphore
{
#if defined(_WIN32)
void* m_sema;
#elif defined(__APPLE__)
semaphore_t m_sema;
#else
sem_t m_sema;
#endif
public:
KernelSemaphore();
~KernelSemaphore();
void Post();
void Wait();
void WaitWithYield();
};
/// A semaphore for notifying a work-processing thread of new work in a (separate) queue
///
/// Usage:
/// - Processing thread loops on `WaitForWork()` followed by processing all work in the queue
/// - Threads adding work first add their work to the queue, then call `NotifyOfWork()`
class WorkSema
{
/// Semaphore for sleeping the worker thread
KernelSemaphore m_sema;
/// Semaphore for sleeping thread waiting on worker queue empty
KernelSemaphore m_empty_sema;
/// Current state (see enum below)
std::atomic<s32> m_state{0};
// Expected call frequency is NotifyOfWork > WaitForWork > WaitForEmpty
// So optimize states for fast NotifyOfWork
enum
{
STATE_SPINNING = -2, ///< Worker thread is spinning waiting for work
STATE_SLEEPING = -1, ///< Worker thread is sleeping on m_sema
STATE_RUNNING_0 = 0, ///< Worker thread is processing work, but no work has been added since it last checked for new work
/* Any >0 state: STATE_RUNNING_N: Worker thread is processing work, and work has been added since it last checked for new work */
STATE_FLAG_WAITING_EMPTY = 1 << 30, ///< Flag to indicate that a thread is sleeping on m_empty_sema (can be applied to any STATE_RUNNING)
};
bool IsReadyForSleep(s32 state)
{
s32 waiting_empty_cleared = state & (STATE_FLAG_WAITING_EMPTY - 1);
return waiting_empty_cleared == STATE_RUNNING_0;
}
s32 NextStateWaitForWork(s32 current)
{
s32 new_state = IsReadyForSleep(current) ? STATE_SLEEPING : STATE_RUNNING_0;
return new_state | (current & STATE_FLAG_WAITING_EMPTY); // Preserve waiting empty flag for RUNNING_N → RUNNING_0
}
public:
/// Notify the worker thread that you've added new work to its queue
void NotifyOfWork()
{
// State change:
// SPINNING: Change state to RUNNING. Thread will notice and process the new data
// SLEEPING: Change state to RUNNING and wake worker. Thread will wake up and process the new data.
// RUNNING_0: Change state to RUNNING_N.
// RUNNING_N: Stay in RUNNING_N
s32 old = m_state.fetch_add(2, std::memory_order_release);
if (old == STATE_SLEEPING)
m_sema.Post();
}
/// Wait for work to be added to the queue
void WaitForWork();
/// Wait for work to be added to the queue, spinning for a bit before sleeping the thread
void WaitForWorkWithSpin();
/// Wait for the worker thread to finish processing all entries in the queue
void WaitForEmpty();
/// Wait for the worker thread to finish processing all entries in the queue, spinning a bit before sleeping the thread
void WaitForEmptyWithSpin();
};
class Semaphore
{
protected: