From 9d20adfa77eb5c9f3665ca0a10af17041b462d14 Mon Sep 17 00:00:00 2001 From: Sandy Carter Date: Wed, 5 Dec 2018 21:06:24 -0800 Subject: [PATCH] [threading linux] Implement Events Remove file-descriptor specific wait implementation to PosixFdHandle class which breaks on waits of non-fd handles. Replace with PosixConditionHandle and extend to support auto reset and initial values. Simplify mutex and conditional variable use with stdlib versions which wrap these primitives but provide better C++ interface. Test Event and Reset --- src/xenia/base/testing/threading_test.cc | 21 ++- src/xenia/base/threading_posix.cc | 189 +++++++++-------------- 2 files changed, 90 insertions(+), 120 deletions(-) diff --git a/src/xenia/base/testing/threading_test.cc b/src/xenia/base/testing/threading_test.cc index 11b4d559e..8f82dfb1c 100644 --- a/src/xenia/base/testing/threading_test.cc +++ b/src/xenia/base/testing/threading_test.cc @@ -189,8 +189,27 @@ TEST_CASE("Wait on Event", "Event") { // Call wait on now consumed Event result = Wait(evt.get(), false, 50ms); REQUIRE(result == WaitResult::kTimeout); +} - // TODO(bwrsandman): test Reset() and Pulse() +TEST_CASE("Reset Event", "Event") { + auto evt = Event::CreateAutoResetEvent(false); + WaitResult result; + + // Call wait on reset Event + evt->Set(); + evt->Reset(); + result = Wait(evt.get(), false, 50ms); + REQUIRE(result == WaitResult::kTimeout); + + // Test resetting the unset event + evt->Reset(); + result = Wait(evt.get(), false, 50ms); + REQUIRE(result == WaitResult::kTimeout); + + // Test setting the reset event + evt->Set(); + result = Wait(evt.get(), false, 50ms); + REQUIRE(result == WaitResult::kSuccess); } TEST_CASE("Wait on Semaphore", "Semaphore") { diff --git a/src/xenia/base/threading_posix.cc b/src/xenia/base/threading_posix.cc index beda0a48a..d565814c1 100644 --- a/src/xenia/base/threading_posix.cc +++ b/src/xenia/base/threading_posix.cc @@ -164,75 +164,64 @@ std::unique_ptr HighResolutionTimer::CreateRepeating( return std::unique_ptr(timer.release()); } -// TODO(dougvj) There really is no native POSIX handle for a single wait/signal -// construct pthreads is at a lower level with more handles for such a mechanism -// This simple wrapper class could function as our handle, but probably needs -// some more functionality +// There really is no native POSIX handle for a single wait/signal construct +// pthreads is at a lower level with more handles for such a mechanism. +// This simple wrapper class functions as our handle and uses conditional +// variables for waits and signals. class PosixCondition { public: - PosixCondition() : signal_(false) { - pthread_mutex_init(&mutex_, NULL); - pthread_cond_init(&cond_, NULL); - } - - ~PosixCondition() { - pthread_mutex_destroy(&mutex_); - pthread_cond_destroy(&cond_); - } + PosixCondition(bool manual_reset, bool initial_state) + : signal_(initial_state), manual_reset_(manual_reset) {} + virtual ~PosixCondition() = default; void Signal() { - pthread_mutex_lock(&mutex_); + auto lock = std::unique_lock(mutex_); signal_ = true; - pthread_cond_broadcast(&cond_); - pthread_mutex_unlock(&mutex_); + if (manual_reset_) { + cond_.notify_all(); + } else { + cond_.notify_one(); + } } void Reset() { - pthread_mutex_lock(&mutex_); + auto lock = std::unique_lock(mutex_); signal_ = false; - pthread_mutex_unlock(&mutex_); } - bool Wait(unsigned int timeout_ms) { - // Assume 0 means no timeout, not instant timeout - if (timeout_ms == 0) { - Wait(); + WaitResult Wait(std::chrono::milliseconds timeout) { + bool executed; + auto predicate = [this] { return this->signaled(); }; + auto lock = std::unique_lock(mutex_); + if (predicate()) { + executed = true; + } else { + if (timeout == std::chrono::milliseconds::max()) { + cond_.wait(lock, predicate); + executed = true; // Did not time out; + } else { + executed = cond_.wait_for(lock, timeout, predicate); + } } - struct timespec time_to_wait; - struct timeval now; - gettimeofday(&now, NULL); - - // Add the number of seconds we want to wait to the current time - time_to_wait.tv_sec = now.tv_sec + (timeout_ms / 1000); - // Add the number of nanoseconds we want to wait to the current nanosecond - // stride - long nsec = (now.tv_usec + (timeout_ms % 1000)) * 1000; - // If we overflowed the nanosecond count then we add a second - time_to_wait.tv_sec += nsec / 1000000000UL; - // We only add nanoseconds within the 1 second stride - time_to_wait.tv_nsec = nsec % 1000000000UL; - pthread_mutex_lock(&mutex_); - while (!signal_) { - int status = pthread_cond_timedwait(&cond_, &mutex_, &time_to_wait); - if (status == ETIMEDOUT) return false; // We timed out + if (executed) { + post_execution(); + return WaitResult::kSuccess; + } else { + return WaitResult::kTimeout; } - pthread_mutex_unlock(&mutex_); - return true; // We didn't time out - } - - bool Wait() { - pthread_mutex_lock(&mutex_); - while (!signal_) { - pthread_cond_wait(&cond_, &mutex_); - } - pthread_mutex_unlock(&mutex_); - return true; // Did not time out; } private: + inline bool signaled() const { return signal_; } + inline void post_execution() { + if (!manual_reset_) { + signal_ = false; + } + } bool signal_; - pthread_cond_t cond_; - pthread_mutex_t mutex_; + const bool manual_reset_; + std::condition_variable cond_; + std::mutex mutex_; }; // Native posix thread handle @@ -250,12 +239,14 @@ class PosixThreadHandle : public T { pthread_t handle_; }; -// This is wraps a condition object as our handle because posix has no single +// This wraps a condition object as our handle because posix has no single // native handle for higher level concurrency constructs such as semaphores template class PosixConditionHandle : public T { public: - ~PosixConditionHandle() override {} + PosixConditionHandle(bool manual_reset, bool initial_state) + : handle_(manual_reset, initial_state) {} + ~PosixConditionHandle() override = default; protected: void* native_handle() const override { @@ -265,51 +256,10 @@ class PosixConditionHandle : public T { PosixCondition handle_; }; -template -class PosixFdHandle : public T { - public: - explicit PosixFdHandle(intptr_t handle) : handle_(handle) {} - ~PosixFdHandle() override { - close(handle_); - handle_ = 0; - } - - protected: - void* native_handle() const override { - return reinterpret_cast(handle_); - } - - intptr_t handle_; -}; - -// TODO(dougvj) WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, std::chrono::milliseconds timeout) { - intptr_t handle = reinterpret_cast(wait_handle->native_handle()); - - fd_set set; - struct timeval time_val; - int ret; - - FD_ZERO(&set); - FD_SET(handle, &set); - - time_val.tv_sec = timeout.count() / 1000; - time_val.tv_usec = timeout.count() * 1000; - ret = select(handle + 1, &set, NULL, NULL, &time_val); - if (ret == -1) { - return WaitResult::kFailed; - } else if (ret == 0) { - return WaitResult::kTimeout; - } else { - uint64_t buf = 0; - ret = read(handle, &buf, sizeof(buf)); - if (ret < 8) { - return WaitResult::kTimeout; - } - - return WaitResult::kSuccess; - } + auto handle = reinterpret_cast(wait_handle->native_handle()); + return handle->Wait(timeout); } // TODO(dougvj) @@ -329,40 +279,37 @@ std::pair WaitMultiple(WaitHandle* wait_handles[], return std::pair(WaitResult::kFailed, 0); } -// TODO(dougvj) -class PosixEvent : public PosixFdHandle { +class PosixEvent : public PosixConditionHandle { public: - PosixEvent(intptr_t fd) : PosixFdHandle(fd) {} + PosixEvent(bool manual_reset, bool initial_state) + : PosixConditionHandle(manual_reset, initial_state) {} ~PosixEvent() override = default; - void Set() override { - uint64_t buf = 1; - write(handle_, &buf, sizeof(buf)); + void Set() override { handle_.Signal(); } + void Reset() override { handle_.Reset(); } + void Pulse() override { + using namespace std::chrono_literals; + handle_.Signal(); + MaybeYield(); + Sleep(10us); + handle_.Reset(); } - void Reset() override { assert_always(); } - void Pulse() override { assert_always(); } - - private: - PosixCondition condition_; }; std::unique_ptr Event::CreateManualResetEvent(bool initial_state) { - // Linux's eventfd doesn't appear to support manual reset natively. - return nullptr; + return std::make_unique(true, initial_state); } std::unique_ptr Event::CreateAutoResetEvent(bool initial_state) { - int fd = eventfd(initial_state ? 1 : 0, EFD_CLOEXEC); - if (fd == -1) { - return nullptr; - } - - return std::make_unique(fd); + return std::make_unique(false, initial_state); } // TODO(dougvj) class PosixSemaphore : public PosixConditionHandle { public: - PosixSemaphore(int initial_count, int maximum_count) { assert_always(); } + PosixSemaphore(int initial_count, int maximum_count) + : PosixConditionHandle(false, false) { + assert_always(); + } ~PosixSemaphore() override = default; bool Release(int release_count, int* out_previous_count) override { assert_always(); @@ -378,7 +325,9 @@ std::unique_ptr Semaphore::Create(int initial_count, // TODO(dougvj) class PosixMutant : public PosixConditionHandle { public: - PosixMutant(bool initial_owner) { assert_always(); } + PosixMutant(bool initial_owner) : PosixConditionHandle(false, false) { + assert_always(); + } ~PosixMutant() = default; bool Release() override { assert_always(); @@ -393,7 +342,9 @@ std::unique_ptr Mutant::Create(bool initial_owner) { // TODO(dougvj) class PosixTimer : public PosixConditionHandle { public: - PosixTimer(bool manual_reset) { assert_always(); } + PosixTimer(bool manual_reset) : PosixConditionHandle(manual_reset, false) { + assert_always(); + } ~PosixTimer() = default; bool SetOnce(std::chrono::nanoseconds due_time, std::function opt_callback) override {