[threading linux] Implement Events

Remove file-descriptor specific wait implementation to PosixFdHandle class
which breaks on waits of non-fd handles.
Replace with PosixConditionHandle and extend to support auto reset and
initial values.
Simplify mutex and conditional variable use with stdlib versions which
wrap these primitives but provide better C++ interface.
Test Event and Reset
This commit is contained in:
Sandy Carter 2018-12-05 21:06:24 -08:00 committed by Rick Gibbed
parent f9d708265f
commit 9d20adfa77
2 changed files with 90 additions and 120 deletions

View File

@ -189,8 +189,27 @@ TEST_CASE("Wait on Event", "Event") {
// Call wait on now consumed Event // Call wait on now consumed Event
result = Wait(evt.get(), false, 50ms); result = Wait(evt.get(), false, 50ms);
REQUIRE(result == WaitResult::kTimeout); REQUIRE(result == WaitResult::kTimeout);
}
// TODO(bwrsandman): test Reset() and Pulse() TEST_CASE("Reset Event", "Event") {
auto evt = Event::CreateAutoResetEvent(false);
WaitResult result;
// Call wait on reset Event
evt->Set();
evt->Reset();
result = Wait(evt.get(), false, 50ms);
REQUIRE(result == WaitResult::kTimeout);
// Test resetting the unset event
evt->Reset();
result = Wait(evt.get(), false, 50ms);
REQUIRE(result == WaitResult::kTimeout);
// Test setting the reset event
evt->Set();
result = Wait(evt.get(), false, 50ms);
REQUIRE(result == WaitResult::kSuccess);
} }
TEST_CASE("Wait on Semaphore", "Semaphore") { TEST_CASE("Wait on Semaphore", "Semaphore") {

View File

@ -164,75 +164,64 @@ std::unique_ptr<HighResolutionTimer> HighResolutionTimer::CreateRepeating(
return std::unique_ptr<HighResolutionTimer>(timer.release()); return std::unique_ptr<HighResolutionTimer>(timer.release());
} }
// TODO(dougvj) There really is no native POSIX handle for a single wait/signal // There really is no native POSIX handle for a single wait/signal construct
// construct pthreads is at a lower level with more handles for such a mechanism // pthreads is at a lower level with more handles for such a mechanism.
// This simple wrapper class could function as our handle, but probably needs // This simple wrapper class functions as our handle and uses conditional
// some more functionality // variables for waits and signals.
class PosixCondition { class PosixCondition {
public: public:
PosixCondition() : signal_(false) { PosixCondition(bool manual_reset, bool initial_state)
pthread_mutex_init(&mutex_, NULL); : signal_(initial_state), manual_reset_(manual_reset) {}
pthread_cond_init(&cond_, NULL); virtual ~PosixCondition() = default;
}
~PosixCondition() {
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
void Signal() { void Signal() {
pthread_mutex_lock(&mutex_); auto lock = std::unique_lock<std::mutex>(mutex_);
signal_ = true; signal_ = true;
pthread_cond_broadcast(&cond_); if (manual_reset_) {
pthread_mutex_unlock(&mutex_); cond_.notify_all();
} else {
cond_.notify_one();
}
} }
void Reset() { void Reset() {
pthread_mutex_lock(&mutex_); auto lock = std::unique_lock<std::mutex>(mutex_);
signal_ = false; signal_ = false;
pthread_mutex_unlock(&mutex_);
} }
bool Wait(unsigned int timeout_ms) { WaitResult Wait(std::chrono::milliseconds timeout) {
// Assume 0 means no timeout, not instant timeout bool executed;
if (timeout_ms == 0) { auto predicate = [this] { return this->signaled(); };
Wait(); auto lock = std::unique_lock<std::mutex>(mutex_);
if (predicate()) {
executed = true;
} else {
if (timeout == std::chrono::milliseconds::max()) {
cond_.wait(lock, predicate);
executed = true; // Did not time out;
} else {
executed = cond_.wait_for(lock, timeout, predicate);
} }
struct timespec time_to_wait;
struct timeval now;
gettimeofday(&now, NULL);
// Add the number of seconds we want to wait to the current time
time_to_wait.tv_sec = now.tv_sec + (timeout_ms / 1000);
// Add the number of nanoseconds we want to wait to the current nanosecond
// stride
long nsec = (now.tv_usec + (timeout_ms % 1000)) * 1000;
// If we overflowed the nanosecond count then we add a second
time_to_wait.tv_sec += nsec / 1000000000UL;
// We only add nanoseconds within the 1 second stride
time_to_wait.tv_nsec = nsec % 1000000000UL;
pthread_mutex_lock(&mutex_);
while (!signal_) {
int status = pthread_cond_timedwait(&cond_, &mutex_, &time_to_wait);
if (status == ETIMEDOUT) return false; // We timed out
} }
pthread_mutex_unlock(&mutex_); if (executed) {
return true; // We didn't time out post_execution();
return WaitResult::kSuccess;
} else {
return WaitResult::kTimeout;
} }
bool Wait() {
pthread_mutex_lock(&mutex_);
while (!signal_) {
pthread_cond_wait(&cond_, &mutex_);
}
pthread_mutex_unlock(&mutex_);
return true; // Did not time out;
} }
private: private:
inline bool signaled() const { return signal_; }
inline void post_execution() {
if (!manual_reset_) {
signal_ = false;
}
}
bool signal_; bool signal_;
pthread_cond_t cond_; const bool manual_reset_;
pthread_mutex_t mutex_; std::condition_variable cond_;
std::mutex mutex_;
}; };
// Native posix thread handle // Native posix thread handle
@ -250,12 +239,14 @@ class PosixThreadHandle : public T {
pthread_t handle_; pthread_t handle_;
}; };
// This is wraps a condition object as our handle because posix has no single // This wraps a condition object as our handle because posix has no single
// native handle for higher level concurrency constructs such as semaphores // native handle for higher level concurrency constructs such as semaphores
template <typename T> template <typename T>
class PosixConditionHandle : public T { class PosixConditionHandle : public T {
public: public:
~PosixConditionHandle() override {} PosixConditionHandle(bool manual_reset, bool initial_state)
: handle_(manual_reset, initial_state) {}
~PosixConditionHandle() override = default;
protected: protected:
void* native_handle() const override { void* native_handle() const override {
@ -265,51 +256,10 @@ class PosixConditionHandle : public T {
PosixCondition handle_; PosixCondition handle_;
}; };
template <typename T>
class PosixFdHandle : public T {
public:
explicit PosixFdHandle(intptr_t handle) : handle_(handle) {}
~PosixFdHandle() override {
close(handle_);
handle_ = 0;
}
protected:
void* native_handle() const override {
return reinterpret_cast<void*>(handle_);
}
intptr_t handle_;
};
// TODO(dougvj)
WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, WaitResult Wait(WaitHandle* wait_handle, bool is_alertable,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
intptr_t handle = reinterpret_cast<intptr_t>(wait_handle->native_handle()); auto handle = reinterpret_cast<PosixCondition*>(wait_handle->native_handle());
return handle->Wait(timeout);
fd_set set;
struct timeval time_val;
int ret;
FD_ZERO(&set);
FD_SET(handle, &set);
time_val.tv_sec = timeout.count() / 1000;
time_val.tv_usec = timeout.count() * 1000;
ret = select(handle + 1, &set, NULL, NULL, &time_val);
if (ret == -1) {
return WaitResult::kFailed;
} else if (ret == 0) {
return WaitResult::kTimeout;
} else {
uint64_t buf = 0;
ret = read(handle, &buf, sizeof(buf));
if (ret < 8) {
return WaitResult::kTimeout;
}
return WaitResult::kSuccess;
}
} }
// TODO(dougvj) // TODO(dougvj)
@ -329,40 +279,37 @@ std::pair<WaitResult, size_t> WaitMultiple(WaitHandle* wait_handles[],
return std::pair<WaitResult, size_t>(WaitResult::kFailed, 0); return std::pair<WaitResult, size_t>(WaitResult::kFailed, 0);
} }
// TODO(dougvj) class PosixEvent : public PosixConditionHandle<Event> {
class PosixEvent : public PosixFdHandle<Event> {
public: public:
PosixEvent(intptr_t fd) : PosixFdHandle(fd) {} PosixEvent(bool manual_reset, bool initial_state)
: PosixConditionHandle(manual_reset, initial_state) {}
~PosixEvent() override = default; ~PosixEvent() override = default;
void Set() override { void Set() override { handle_.Signal(); }
uint64_t buf = 1; void Reset() override { handle_.Reset(); }
write(handle_, &buf, sizeof(buf)); void Pulse() override {
using namespace std::chrono_literals;
handle_.Signal();
MaybeYield();
Sleep(10us);
handle_.Reset();
} }
void Reset() override { assert_always(); }
void Pulse() override { assert_always(); }
private:
PosixCondition condition_;
}; };
std::unique_ptr<Event> Event::CreateManualResetEvent(bool initial_state) { std::unique_ptr<Event> Event::CreateManualResetEvent(bool initial_state) {
// Linux's eventfd doesn't appear to support manual reset natively. return std::make_unique<PosixEvent>(true, initial_state);
return nullptr;
} }
std::unique_ptr<Event> Event::CreateAutoResetEvent(bool initial_state) { std::unique_ptr<Event> Event::CreateAutoResetEvent(bool initial_state) {
int fd = eventfd(initial_state ? 1 : 0, EFD_CLOEXEC); return std::make_unique<PosixEvent>(false, initial_state);
if (fd == -1) {
return nullptr;
}
return std::make_unique<PosixEvent>(fd);
} }
// TODO(dougvj) // TODO(dougvj)
class PosixSemaphore : public PosixConditionHandle<Semaphore> { class PosixSemaphore : public PosixConditionHandle<Semaphore> {
public: public:
PosixSemaphore(int initial_count, int maximum_count) { assert_always(); } PosixSemaphore(int initial_count, int maximum_count)
: PosixConditionHandle(false, false) {
assert_always();
}
~PosixSemaphore() override = default; ~PosixSemaphore() override = default;
bool Release(int release_count, int* out_previous_count) override { bool Release(int release_count, int* out_previous_count) override {
assert_always(); assert_always();
@ -378,7 +325,9 @@ std::unique_ptr<Semaphore> Semaphore::Create(int initial_count,
// TODO(dougvj) // TODO(dougvj)
class PosixMutant : public PosixConditionHandle<Mutant> { class PosixMutant : public PosixConditionHandle<Mutant> {
public: public:
PosixMutant(bool initial_owner) { assert_always(); } PosixMutant(bool initial_owner) : PosixConditionHandle(false, false) {
assert_always();
}
~PosixMutant() = default; ~PosixMutant() = default;
bool Release() override { bool Release() override {
assert_always(); assert_always();
@ -393,7 +342,9 @@ std::unique_ptr<Mutant> Mutant::Create(bool initial_owner) {
// TODO(dougvj) // TODO(dougvj)
class PosixTimer : public PosixConditionHandle<Timer> { class PosixTimer : public PosixConditionHandle<Timer> {
public: public:
PosixTimer(bool manual_reset) { assert_always(); } PosixTimer(bool manual_reset) : PosixConditionHandle(manual_reset, false) {
assert_always();
}
~PosixTimer() = default; ~PosixTimer() = default;
bool SetOnce(std::chrono::nanoseconds due_time, bool SetOnce(std::chrono::nanoseconds due_time,
std::function<void()> opt_callback) override { std::function<void()> opt_callback) override {