Common/Event: Replace poll event with lock/condvar

This commit is contained in:
Connor McLaughlin 2020-10-25 19:47:22 +10:00
parent 34107dd573
commit 950318ee43
2 changed files with 191 additions and 56 deletions

View File

@ -2,22 +2,15 @@
#include "assert.h" #include "assert.h"
#if defined(WIN32) #if defined(WIN32)
#include <malloc.h>
#include "windows_headers.h" #include "windows_headers.h"
#elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__)
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#ifdef __APPLE__
#include <stdlib.h>
#else
#include <malloc.h> #include <malloc.h>
#endif #elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__)
#include <time.h>
#endif #endif
namespace Common { namespace Common {
#if defined(WIN32) #if defined(WIN32) && defined(USE_WIN32_EVENT_OBJECTS)
Event::Event(bool auto_reset /* = false */) Event::Event(bool auto_reset /* = false */)
{ {
@ -61,79 +54,194 @@ void Event::WaitForMultiple(Event** events, u32 num_events)
WaitForMultipleObjects(num_events, event_handles, TRUE, INFINITE); WaitForMultipleObjects(num_events, event_handles, TRUE, INFINITE);
} }
#elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__) #elif defined(WIN32)
Event::Event(bool auto_reset /*= false*/) : m_auto_reset(auto_reset) Event::Event(bool auto_reset /* = false */) : m_auto_reset(auto_reset)
{ {
m_pipe_fds[0] = m_pipe_fds[1] = -1; InitializeCriticalSection(&m_cs);
#if defined(__linux__) InitializeConditionVariable(&m_cv);
pipe2(m_pipe_fds, O_NONBLOCK);
#else
pipe(m_pipe_fds);
fcntl(m_pipe_fds[0], F_SETFL, fcntl(m_pipe_fds[0], F_GETFL) | O_NONBLOCK);
fcntl(m_pipe_fds[1], F_SETFL, fcntl(m_pipe_fds[1], F_GETFL) | O_NONBLOCK);
#endif
Assert(m_pipe_fds[0] >= 0 && m_pipe_fds[1] >= 0);
} }
Event::~Event() Event::~Event()
{ {
close(m_pipe_fds[0]); DeleteCriticalSection(&m_cs);
close(m_pipe_fds[1]);
} }
void Event::Signal() void Event::Signal()
{ {
char buf[1] = {0}; EnterCriticalSection(&m_cs);
write(m_pipe_fds[1], buf, sizeof(buf)); m_signaled.store(true);
WakeAllConditionVariable(&m_cv);
LeaveCriticalSection(&m_cs);
} }
void Event::Wait() void Event::Wait()
{ {
pollfd pd = {}; m_waiters.fetch_add(1);
pd.fd = m_pipe_fds[0];
pd.events = POLLRDNORM;
poll(&pd, 1, -1);
if (m_auto_reset) EnterCriticalSection(&m_cs);
Reset(); while (!m_signaled.load())
SleepConditionVariableCS(&m_cv, &m_cs, INFINITE);
if (m_waiters.fetch_sub(1) == 1 && m_auto_reset)
m_signaled.store(false);
LeaveCriticalSection(&m_cs);
} }
bool Event::TryWait(u32 timeout_in_ms) bool Event::TryWait(u32 timeout_in_ms)
{ {
pollfd pd; m_waiters.fetch_add(1);
pd.fd = m_pipe_fds[0];
pd.events = POLLRDNORM;
if (poll(&pd, 1, timeout_in_ms) == 0)
return false;
if (m_auto_reset) const u32 start = GetTickCount();
Reset();
return true; EnterCriticalSection(&m_cs);
while (!m_signaled.load() && (GetTickCount() - start) < timeout_in_ms)
SleepConditionVariableCS(&m_cv, &m_cs, INFINITE);
const bool result = m_signaled.load();
if (m_waiters.fetch_sub(1) == 1 && result && m_auto_reset)
m_signaled.store(false);
LeaveCriticalSection(&m_cs);
return result;
} }
void Event::Reset() void Event::Reset()
{ {
char buf[1]; EnterCriticalSection(&m_cs);
while (read(m_pipe_fds[0], buf, sizeof(buf)) > 0) m_signaled.store(false);
; LeaveCriticalSection(&m_cs);
} }
void Event::WaitForMultiple(Event** events, u32 num_events) void Event::WaitForMultiple(Event** events, u32 num_events)
{ {
DebugAssert(num_events > 0);
pollfd pd = {};
pd.events = POLLRDNORM;
for (u32 i = 0; i < num_events; i++) for (u32 i = 0; i < num_events; i++)
{ events[i]->Wait();
pd.fd = events[i]->m_pipe_fds[0]; }
poll(&pd, 1, -1);
if (events[i]->m_auto_reset) #elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__)
events[i]->Reset();
} Event::Event(bool auto_reset /* = false */) : m_auto_reset(auto_reset)
{
pthread_mutex_init(&m_mutex, nullptr);
pthread_cond_init(&m_cv, nullptr);
}
Event::~Event()
{
pthread_cond_destroy(&m_cv);
pthread_mutex_destroy(&m_mutex);
}
void Event::Signal()
{
pthread_mutex_lock(&m_mutex);
m_signaled.store(true);
pthread_cond_broadcast(&m_cv);
pthread_mutex_unlock(&m_mutex);
}
void Event::Wait()
{
m_waiters.fetch_add(1);
pthread_mutex_lock(&m_mutex);
while (!m_signaled.load())
pthread_cond_wait(&m_cv, &m_mutex);
if (m_waiters.fetch_sub(1) == 1 && m_auto_reset)
m_signaled.store(false);
pthread_mutex_unlock(&m_mutex);
}
bool Event::TryWait(u32 timeout_in_ms)
{
m_waiters.fetch_add(1);
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout_in_ms / 1000;
ts.tv_nsec += (timeout_in_ms % 1000) * 1000000;
pthread_mutex_lock(&m_mutex);
while (!m_signaled.load())
pthread_cond_timedwait(&m_cv, &m_mutex, &ts);
const bool result = m_signaled.load();
if (m_waiters.fetch_sub(1) == 1 && result && m_auto_reset)
m_signaled.store(false);
pthread_mutex_unlock(&m_mutex);
return result;
}
void Event::Reset()
{
pthread_mutex_lock(&m_mutex);
m_signaled.store(false);
pthread_mutex_unlock(&m_mutex);
}
void Event::WaitForMultiple(Event** events, u32 num_events)
{
for (u32 i = 0; i < num_events; i++)
events[i]->Wait();
}
#else
Event::Event(bool auto_reset /* = false */) : m_auto_reset(auto_reset) {}
Event::~Event() = default;
void Event::Signal()
{
std::unique_lock lock(m_mutex);
m_signaled.store(true);
m_cv.notify_all();
}
void Event::Wait()
{
m_waiters.fetch_add(1);
std::unique_lock lock(m_mutex);
m_cv.wait(lock, [this]() { return m_signaled.load(); });
if (m_waiters.fetch_sub(1) == 1 && m_auto_reset)
m_signaled.store(false);
}
bool Event::TryWait(u32 timeout_in_ms)
{
m_waiters.fetch_add(1);
std::unique_lock lock(m_mutex);
const bool result =
m_cv.wait_for(lock, std::chrono::milliseconds(timeout_in_ms), [this]() { return m_signaled.load(); });
if (m_waiters.fetch_sub(1) == 1 && result && m_auto_reset)
m_signaled.store(false);
return result;
}
void Event::Reset()
{
std::unique_lock lock(m_mutex);
m_signaled.store(false);
}
void Event::WaitForMultiple(Event** events, u32 num_events)
{
for (u32 i = 0; i < num_events; i++)
events[i]->Wait();
} }
#endif #endif

View File

@ -1,6 +1,20 @@
#pragma once #pragma once
#include "types.h" #include "types.h"
// #define USE_WIN32_EVENT_OBJECTS 1
#if defined(WIN32) && !defined(USE_WIN32_EVENT_OBJECTS)
#include "windows_headers.h"
#include <atomic>
#elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__)
#include <atomic>
#include <pthread.h>
#else
#include <atomic>
#include <condition_variable>
#include <mutex>
#endif
namespace Common { namespace Common {
class Event class Event
@ -17,13 +31,26 @@ public:
static void WaitForMultiple(Event** events, u32 num_events); static void WaitForMultiple(Event** events, u32 num_events);
private: private:
#ifdef WIN32 #if defined(WIN32) && defined(USE_WIN32_EVENT_OBJECTS)
void* m_event_handle; void* m_event_handle;
#elif defined(WIN32)
CRITICAL_SECTION m_cs;
CONDITION_VARIABLE m_cv;
std::atomic_uint32_t m_waiters{0};
std::atomic_bool m_signaled{false};
bool m_auto_reset = false;
#elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__) #elif defined(__linux__) || defined(__APPLE__) || defined(__HAIKU__)
int m_pipe_fds[2]; pthread_mutex_t m_mutex;
bool m_auto_reset; pthread_cond_t m_cv;
std::atomic_uint32_t m_waiters{0};
std::atomic_bool m_signaled{false};
bool m_auto_reset = false;
#else #else
#error Unknown platform. std::mutex m_mutex;
std::condition_variable m_cv;
std::atomic_uint32_t m_waiters{0};
std::atomic_bool m_signaled{false};
bool m_auto_reset = false;
#endif #endif
}; };