[threading linux] Make PosixCondition base class
Add PosixConditionBase as base class for Waitables to use common primitives mutex and conditional variable Add abstract signaled() and post_execution() to use single WaitMultiple implementation.
This commit is contained in:
parent
6e13a38cad
commit
75d54e2fa2
|
@ -164,31 +164,8 @@ std::unique_ptr<HighResolutionTimer> HighResolutionTimer::CreateRepeating(
|
||||||
return std::unique_ptr<HighResolutionTimer>(timer.release());
|
return std::unique_ptr<HighResolutionTimer>(timer.release());
|
||||||
}
|
}
|
||||||
|
|
||||||
// There really is no native POSIX handle for a single wait/signal construct
|
class PosixConditionBase {
|
||||||
// pthreads is at a lower level with more handles for such a mechanism.
|
|
||||||
// This simple wrapper class functions as our handle and uses conditional
|
|
||||||
// variables for waits and signals.
|
|
||||||
class PosixCondition {
|
|
||||||
public:
|
public:
|
||||||
PosixCondition(bool manual_reset, bool initial_state)
|
|
||||||
: signal_(initial_state), manual_reset_(manual_reset) {}
|
|
||||||
virtual ~PosixCondition() = default;
|
|
||||||
|
|
||||||
void Signal() {
|
|
||||||
auto lock = std::unique_lock<std::mutex>(mutex_);
|
|
||||||
signal_ = true;
|
|
||||||
if (manual_reset_) {
|
|
||||||
cond_.notify_all();
|
|
||||||
} else {
|
|
||||||
cond_.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Reset() {
|
|
||||||
auto lock = std::unique_lock<std::mutex>(mutex_);
|
|
||||||
signal_ = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
WaitResult Wait(std::chrono::milliseconds timeout) {
|
WaitResult Wait(std::chrono::milliseconds timeout) {
|
||||||
bool executed;
|
bool executed;
|
||||||
auto predicate = [this] { return this->signaled(); };
|
auto predicate = [this] { return this->signaled(); };
|
||||||
|
@ -212,9 +189,9 @@ class PosixCondition {
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::pair<WaitResult, size_t> WaitMultiple(
|
static std::pair<WaitResult, size_t> WaitMultiple(
|
||||||
std::vector<PosixCondition*> handles, bool wait_all,
|
std::vector<PosixConditionBase*>&& handles, bool wait_all,
|
||||||
std::chrono::milliseconds timeout) {
|
std::chrono::milliseconds timeout) {
|
||||||
using iter_t = decltype(handles)::const_iterator;
|
using iter_t = std::vector<PosixConditionBase*>::const_iterator;
|
||||||
bool executed;
|
bool executed;
|
||||||
auto predicate = [](auto h) { return h->signaled(); };
|
auto predicate = [](auto h) { return h->signaled(); };
|
||||||
|
|
||||||
|
@ -225,7 +202,10 @@ class PosixCondition {
|
||||||
return operation(handles.cbegin(), handles.cend(), predicate);
|
return operation(handles.cbegin(), handles.cend(), predicate);
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(PosixCondition::mutex_);
|
// TODO(bwrsandman, Triang3l) This is controversial, see issue #1677
|
||||||
|
// This will probably cause a deadlock on the next thread doing any waiting
|
||||||
|
// if the thread is suspended between locking and waiting
|
||||||
|
std::unique_lock<std::mutex> lock(PosixConditionBase::mutex_);
|
||||||
|
|
||||||
// Check if the aggregate lambda (all or any) is already satisfied
|
// Check if the aggregate lambda (all or any) is already satisfied
|
||||||
if (aggregate()) {
|
if (aggregate()) {
|
||||||
|
@ -234,11 +214,11 @@ class PosixCondition {
|
||||||
// If the aggregate is not yet satisfied and the timeout is infinite,
|
// If the aggregate is not yet satisfied and the timeout is infinite,
|
||||||
// wait without timeout.
|
// wait without timeout.
|
||||||
if (timeout == std::chrono::milliseconds::max()) {
|
if (timeout == std::chrono::milliseconds::max()) {
|
||||||
PosixCondition::cond_.wait(lock, aggregate);
|
PosixConditionBase::cond_.wait(lock, aggregate);
|
||||||
executed = true;
|
executed = true;
|
||||||
} else {
|
} else {
|
||||||
// Wait with timeout.
|
// Wait with timeout.
|
||||||
executed = PosixCondition::cond_.wait_for(lock, timeout, aggregate);
|
executed = PosixConditionBase::cond_.wait_for(lock, timeout, aggregate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (executed) {
|
if (executed) {
|
||||||
|
@ -258,22 +238,58 @@ class PosixCondition {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
inline virtual bool signaled() const = 0;
|
||||||
|
inline virtual void post_execution() = 0;
|
||||||
|
static std::condition_variable cond_;
|
||||||
|
static std::mutex mutex_;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::condition_variable PosixConditionBase::cond_;
|
||||||
|
std::mutex PosixConditionBase::mutex_;
|
||||||
|
|
||||||
|
// There really is no native POSIX handle for a single wait/signal construct
|
||||||
|
// pthreads is at a lower level with more handles for such a mechanism.
|
||||||
|
// This simple wrapper class functions as our handle and uses conditional
|
||||||
|
// variables for waits and signals.
|
||||||
|
template <typename T>
|
||||||
|
class PosixCondition {};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
class PosixCondition<Event> : public PosixConditionBase {
|
||||||
|
public:
|
||||||
|
PosixCondition(bool manual_reset, bool initial_state)
|
||||||
|
: signal_(initial_state), manual_reset_(manual_reset) {}
|
||||||
|
virtual ~PosixCondition() = default;
|
||||||
|
|
||||||
|
void Signal() {
|
||||||
|
auto lock = std::unique_lock<std::mutex>(mutex_);
|
||||||
|
signal_ = true;
|
||||||
|
if (manual_reset_) {
|
||||||
|
cond_.notify_all();
|
||||||
|
} else {
|
||||||
|
// FIXME(bwrsandman): Potential cause for deadlock
|
||||||
|
// See issue #1678 for possible fix and discussion
|
||||||
|
cond_.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Reset() {
|
||||||
|
auto lock = std::unique_lock<std::mutex>(mutex_);
|
||||||
|
signal_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
inline bool signaled() const { return signal_; }
|
inline bool signaled() const override { return signal_; }
|
||||||
inline void post_execution() {
|
inline void post_execution() override {
|
||||||
if (!manual_reset_) {
|
if (!manual_reset_) {
|
||||||
signal_ = false;
|
signal_ = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bool signal_;
|
bool signal_;
|
||||||
const bool manual_reset_;
|
const bool manual_reset_;
|
||||||
static std::condition_variable cond_;
|
|
||||||
static std::mutex mutex_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
std::condition_variable PosixCondition::cond_;
|
|
||||||
std::mutex PosixCondition::mutex_;
|
|
||||||
|
|
||||||
// Native posix thread handle
|
// Native posix thread handle
|
||||||
template <typename T>
|
template <typename T>
|
||||||
class PosixThreadHandle : public T {
|
class PosixThreadHandle : public T {
|
||||||
|
@ -294,21 +310,41 @@ class PosixThreadHandle : public T {
|
||||||
template <typename T>
|
template <typename T>
|
||||||
class PosixConditionHandle : public T {
|
class PosixConditionHandle : public T {
|
||||||
public:
|
public:
|
||||||
PosixConditionHandle(bool manual_reset, bool initial_state)
|
PosixConditionHandle(bool manual_reset, bool initial_state);
|
||||||
: handle_(manual_reset, initial_state) {}
|
|
||||||
~PosixConditionHandle() override = default;
|
~PosixConditionHandle() override = default;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void* native_handle() const override {
|
void* native_handle() const override {
|
||||||
return reinterpret_cast<void*>(const_cast<PosixCondition*>(&handle_));
|
return reinterpret_cast<void*>(const_cast<PosixCondition<T>*>(&handle_));
|
||||||
}
|
}
|
||||||
|
|
||||||
PosixCondition handle_;
|
PosixCondition<T> handle_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
PosixConditionHandle<Semaphore>::PosixConditionHandle(bool manual_reset,
|
||||||
|
bool initial_state)
|
||||||
|
: handle_() {}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
PosixConditionHandle<Mutant>::PosixConditionHandle(bool manual_reset,
|
||||||
|
bool initial_state)
|
||||||
|
: handle_() {}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
PosixConditionHandle<Timer>::PosixConditionHandle(bool manual_reset,
|
||||||
|
bool initial_state)
|
||||||
|
: handle_() {}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
PosixConditionHandle<Event>::PosixConditionHandle(bool manual_reset,
|
||||||
|
bool initial_state)
|
||||||
|
: handle_(manual_reset, initial_state) {}
|
||||||
|
|
||||||
WaitResult Wait(WaitHandle* wait_handle, bool is_alertable,
|
WaitResult Wait(WaitHandle* wait_handle, bool is_alertable,
|
||||||
std::chrono::milliseconds timeout) {
|
std::chrono::milliseconds timeout) {
|
||||||
auto handle = reinterpret_cast<PosixCondition*>(wait_handle->native_handle());
|
auto handle =
|
||||||
|
reinterpret_cast<PosixConditionBase*>(wait_handle->native_handle());
|
||||||
return handle->Wait(timeout);
|
return handle->Wait(timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,12 +361,13 @@ std::pair<WaitResult, size_t> WaitMultiple(WaitHandle* wait_handles[],
|
||||||
size_t wait_handle_count,
|
size_t wait_handle_count,
|
||||||
bool wait_all, bool is_alertable,
|
bool wait_all, bool is_alertable,
|
||||||
std::chrono::milliseconds timeout) {
|
std::chrono::milliseconds timeout) {
|
||||||
std::vector<PosixCondition*> handles(wait_handle_count);
|
std::vector<PosixConditionBase*> handles(wait_handle_count);
|
||||||
for (int i = 0u; i < wait_handle_count; ++i) {
|
for (int i = 0u; i < wait_handle_count; ++i) {
|
||||||
handles[i] =
|
handles[i] =
|
||||||
reinterpret_cast<PosixCondition*>(wait_handles[i]->native_handle());
|
reinterpret_cast<PosixConditionBase*>(wait_handles[i]->native_handle());
|
||||||
}
|
}
|
||||||
return PosixCondition::WaitMultiple(handles, wait_all, timeout);
|
return PosixConditionBase::WaitMultiple(std::move(handles), wait_all,
|
||||||
|
timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
class PosixEvent : public PosixConditionHandle<Event> {
|
class PosixEvent : public PosixConditionHandle<Event> {
|
||||||
|
|
Loading…
Reference in New Issue