[threading linux] Make PosixCondition base class

Add PosixConditionBase as base class for Waitables to use common
primitives mutex and conditional variable
Add abstract signaled() and post_execution() to use single WaitMultiple
implementation.
This commit is contained in:
Sandy Carter 2018-12-10 19:57:51 -08:00
parent fb505eaed1
commit 4289349654
1 changed files with 76 additions and 44 deletions

View File

@ -165,31 +165,8 @@ std::unique_ptr<HighResolutionTimer> HighResolutionTimer::CreateRepeating(
return std::unique_ptr<HighResolutionTimer>(timer.release()); return std::unique_ptr<HighResolutionTimer>(timer.release());
} }
// There really is no native POSIX handle for a single wait/signal construct class PosixConditionBase {
// pthreads is at a lower level with more handles for such a mechanism.
// This simple wrapper class functions as our handle and uses conditional
// variables for waits and signals.
class PosixCondition {
public: public:
PosixCondition(bool manual_reset, bool initial_state)
: signal_(initial_state), manual_reset_(manual_reset) {}
virtual ~PosixCondition() = default;
void Signal() {
auto lock = std::unique_lock<std::mutex>(mutex_);
signal_ = true;
if (manual_reset_) {
cond_.notify_all();
} else {
cond_.notify_one();
}
}
void Reset() {
auto lock = std::unique_lock<std::mutex>(mutex_);
signal_ = false;
}
WaitResult Wait(std::chrono::milliseconds timeout) { WaitResult Wait(std::chrono::milliseconds timeout) {
bool executed; bool executed;
auto predicate = [this] { return this->signaled(); }; auto predicate = [this] { return this->signaled(); };
@ -213,9 +190,9 @@ class PosixCondition {
} }
static std::pair<WaitResult, size_t> WaitMultiple( static std::pair<WaitResult, size_t> WaitMultiple(
std::vector<PosixCondition*> handles, bool wait_all, std::vector<PosixConditionBase*>&& handles, bool wait_all,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
using iter_t = decltype(handles)::const_iterator; using iter_t = std::vector<PosixConditionBase*>::const_iterator;
bool executed; bool executed;
auto predicate = [](auto h) { return h->signaled(); }; auto predicate = [](auto h) { return h->signaled(); };
@ -226,7 +203,7 @@ class PosixCondition {
return operation(handles.cbegin(), handles.cend(), predicate); return operation(handles.cbegin(), handles.cend(), predicate);
}; };
std::unique_lock<std::mutex> lock(PosixCondition::mutex_); std::unique_lock<std::mutex> lock(PosixConditionBase::mutex_);
// Check if the aggregate lambda (all or any) is already satisfied // Check if the aggregate lambda (all or any) is already satisfied
if (aggregate()) { if (aggregate()) {
@ -235,11 +212,11 @@ class PosixCondition {
// If the aggregate is not yet satisfied and the timeout is infinite, // If the aggregate is not yet satisfied and the timeout is infinite,
// wait without timeout. // wait without timeout.
if (timeout == std::chrono::milliseconds::max()) { if (timeout == std::chrono::milliseconds::max()) {
PosixCondition::cond_.wait(lock, aggregate); PosixConditionBase::cond_.wait(lock, aggregate);
executed = true; executed = true;
} else { } else {
// Wait with timeout. // Wait with timeout.
executed = PosixCondition::cond_.wait_for(lock, timeout, aggregate); executed = PosixConditionBase::cond_.wait_for(lock, timeout, aggregate);
} }
} }
if (executed) { if (executed) {
@ -259,22 +236,56 @@ class PosixCondition {
} }
} }
protected:
inline virtual bool signaled() const = 0;
inline virtual void post_execution() = 0;
static std::condition_variable cond_;
static std::mutex mutex_;
};
std::condition_variable PosixConditionBase::cond_;
std::mutex PosixConditionBase::mutex_;
// There really is no native POSIX handle for a single wait/signal construct
// pthreads is at a lower level with more handles for such a mechanism.
// This simple wrapper class functions as our handle and uses conditional
// variables for waits and signals.
template <typename T>
class PosixCondition {};
template <>
class PosixCondition<Event> : public PosixConditionBase {
public:
PosixCondition(bool manual_reset, bool initial_state)
: signal_(initial_state), manual_reset_(manual_reset) {}
virtual ~PosixCondition() = default;
void Signal() {
auto lock = std::unique_lock<std::mutex>(mutex_);
signal_ = true;
if (manual_reset_) {
cond_.notify_all();
} else {
cond_.notify_one();
}
}
void Reset() {
auto lock = std::unique_lock<std::mutex>(mutex_);
signal_ = false;
}
private: private:
inline bool signaled() const { return signal_; } inline bool signaled() const override { return signal_; }
inline void post_execution() { inline void post_execution() override {
if (!manual_reset_) { if (!manual_reset_) {
signal_ = false; signal_ = false;
} }
} }
bool signal_; bool signal_;
const bool manual_reset_; const bool manual_reset_;
static std::condition_variable cond_;
static std::mutex mutex_;
}; };
std::condition_variable PosixCondition::cond_;
std::mutex PosixCondition::mutex_;
// Native posix thread handle // Native posix thread handle
template <typename T> template <typename T>
class PosixThreadHandle : public T { class PosixThreadHandle : public T {
@ -295,21 +306,41 @@ class PosixThreadHandle : public T {
template <typename T> template <typename T>
class PosixConditionHandle : public T { class PosixConditionHandle : public T {
public: public:
PosixConditionHandle(bool manual_reset, bool initial_state) PosixConditionHandle(bool manual_reset, bool initial_state);
: handle_(manual_reset, initial_state) {}
~PosixConditionHandle() override = default; ~PosixConditionHandle() override = default;
protected: protected:
void* native_handle() const override { void* native_handle() const override {
return reinterpret_cast<void*>(const_cast<PosixCondition*>(&handle_)); return reinterpret_cast<void*>(const_cast<PosixCondition<T>*>(&handle_));
} }
PosixCondition handle_; PosixCondition<T> handle_;
}; };
template <>
PosixConditionHandle<Semaphore>::PosixConditionHandle(bool manual_reset,
bool initial_state)
: handle_() {}
template <>
PosixConditionHandle<Mutant>::PosixConditionHandle(bool manual_reset,
bool initial_state)
: handle_() {}
template <>
PosixConditionHandle<Timer>::PosixConditionHandle(bool manual_reset,
bool initial_state)
: handle_() {}
template <>
PosixConditionHandle<Event>::PosixConditionHandle(bool manual_reset,
bool initial_state)
: handle_(manual_reset, initial_state) {}
WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, WaitResult Wait(WaitHandle* wait_handle, bool is_alertable,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
auto handle = reinterpret_cast<PosixCondition*>(wait_handle->native_handle()); auto handle =
reinterpret_cast<PosixConditionBase*>(wait_handle->native_handle());
return handle->Wait(timeout); return handle->Wait(timeout);
} }
@ -326,12 +357,13 @@ std::pair<WaitResult, size_t> WaitMultiple(WaitHandle* wait_handles[],
size_t wait_handle_count, size_t wait_handle_count,
bool wait_all, bool is_alertable, bool wait_all, bool is_alertable,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
std::vector<PosixCondition*> handles(wait_handle_count); std::vector<PosixConditionBase*> handles(wait_handle_count);
for (int i = 0u; i < wait_handle_count; ++i) { for (int i = 0u; i < wait_handle_count; ++i) {
handles[i] = handles[i] =
reinterpret_cast<PosixCondition*>(wait_handles[i]->native_handle()); reinterpret_cast<PosixConditionBase*>(wait_handles[i]->native_handle());
} }
return PosixCondition::WaitMultiple(handles, wait_all, timeout); return PosixConditionBase::WaitMultiple(std::move(handles), wait_all,
timeout);
} }
class PosixEvent : public PosixConditionHandle<Event> { class PosixEvent : public PosixConditionHandle<Event> {