Wait primitives.

This commit is contained in:
Ben Vanik 2015-07-14 20:22:15 -07:00
parent bd490d5833
commit bd058feb39
2 changed files with 186 additions and 15 deletions

View File

@ -14,9 +14,11 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <cstdint> #include <cstdint>
#include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector>
namespace xe { namespace xe {
namespace threading { namespace threading {
@ -67,15 +69,107 @@ void Sleep(std::chrono::duration<Rep, Period> duration) {
Sleep(std::chrono::duration_cast<std::chrono::microseconds>(duration)); Sleep(std::chrono::duration_cast<std::chrono::microseconds>(duration));
} }
// Results for a WaitHandle operation.
enum class WaitResult {
// The state of the specified object is signaled.
// In a WaitAny the tuple will contain the index of the wait handle that
// caused the wait to be satisfied.
kSuccess,
// The wait was ended by one or more user-mode callbacks queued to the thread.
// This will occur when is_alertable is set true.
kUserCallback,
// The time-out interval elapsed, and the object's state is nonsignaled.
kTimeout,
// The specified object is a mutex object that was not released by the thread
// that owned the mutex object before the owning thread terminated. Ownership
// of the mutex object is granted to the calling thread and the mutex is set
// to nonsignaled.
// In a WaitAny the tuple will contain the index of the wait handle that
// caused the wait to be abandoned.
kAbandoned,
// The function has failed.
kFailed,
};
class WaitHandle { class WaitHandle {
public: public:
// bool Wait(); virtual ~WaitHandle() = default;
// static bool Wait();
// static bool SignalAndWait(); // Waits until the wait handle is in the signaled state, an alert triggers and
// static bool WaitMultiple(); // a user callback is queued to the thread, or the timeout interval elapses.
// If timeout is zero the call will return immediately instead of waiting and
// if the timeout is max() the wait will not time out.
inline WaitResult Wait(
bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
return WaitHandle::Wait(this, is_alertable, timeout);
}
// Waits until the wait handle is in the signaled state, an alert triggers and
// a user callback is queued to the thread, or the timeout interval elapses.
// If timeout is zero the call will return immediately instead of waiting and
// if the timeout is max() the wait will not time out.
static WaitResult Wait(
WaitHandle* wait_handle, bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
// Signals one object and waits on another object as a single operation.
// Waits until the wait handle is in the signaled state, an alert triggers and
// a user callback is queued to the thread, or the timeout interval elapses.
// If timeout is zero the call will return immediately instead of waiting and
// if the timeout is max() the wait will not time out.
static WaitResult SignalAndWait(
WaitHandle* wait_handle_to_signal, WaitHandle* wait_handle_to_wait_on,
bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
// Waits until all of the specified objects are in the signaled state, a
// user callback is queued to the thread, or the time-out interval elapses.
// If timeout is zero the call will return immediately instead of waiting and
// if the timeout is max() the wait will not time out.
inline static WaitResult WaitAll(
WaitHandle* wait_handles[], size_t wait_handle_count, bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
return WaitMultiple(wait_handles, wait_handle_count, true, is_alertable,
timeout).first;
}
inline static WaitResult WaitAll(
std::vector<WaitHandle*> wait_handles, bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
return WaitAll(wait_handles.data(), wait_handles.size(), is_alertable,
timeout);
}
// Waits until any of the specified objects are in the signaled state, a
// user callback is queued to the thread, or the time-out interval elapses.
// If timeout is zero the call will return immediately instead of waiting and
// if the timeout is max() the wait will not time out.
// The second argument of the return tuple indicates which wait handle caused
// the wait to be satisfied or abandoned.
inline static std::pair<WaitResult, size_t> WaitAny(
WaitHandle* wait_handles[], size_t wait_handle_count, bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
return WaitMultiple(wait_handles, wait_handle_count, false, is_alertable,
timeout);
}
inline static std::pair<WaitResult, size_t> WaitAny(
std::vector<WaitHandle*> wait_handles, bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
return WaitAny(wait_handles.data(), wait_handles.size(), is_alertable,
timeout);
}
protected: protected:
WaitHandle() = default; WaitHandle() = default;
// Returns the native handle of the object on the host system.
// This value is platform specific.
virtual void* native_handle() const = 0;
static std::pair<WaitResult, size_t> WaitMultiple(
WaitHandle* wait_handles[], size_t wait_handle_count, bool wait_all,
bool is_alertable,
std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
}; };
// Models a Win32-like event object. // Models a Win32-like event object.
@ -118,7 +212,8 @@ class Semaphore : public WaitHandle {
// decreased by one whenever a wait function releases a thread that was // decreased by one whenever a wait function releases a thread that was
// waiting for the semaphore. The count is increased by a specified amount by // waiting for the semaphore. The count is increased by a specified amount by
// calling the Release() function. // calling the Release() function.
std::unique_ptr<Semaphore> Create(int initial_count, int maximum_count); static std::unique_ptr<Semaphore> Create(int initial_count,
int maximum_count);
// Increases the count of the specified semaphore object by a specified // Increases the count of the specified semaphore object by a specified
// amount. // amount.
@ -134,7 +229,7 @@ class Mutant : public WaitHandle {
public: public:
// Creates a new mutant object, initially owned by the calling thread if // Creates a new mutant object, initially owned by the calling thread if
// specified. // specified.
std::unique_ptr<Mutant> Create(bool initial_owner); static std::unique_ptr<Mutant> Create(bool initial_owner);
// Releases ownership of the specified mutex object. // Releases ownership of the specified mutex object.
// Returns false if the calling thread does not own the mutant object. // Returns false if the calling thread does not own the mutant object.
@ -145,11 +240,11 @@ class Timer : public WaitHandle {
public: public:
// Creates a timer whose state remains signaled until SetOnce() or // Creates a timer whose state remains signaled until SetOnce() or
// SetRepeating() is called to establish a new due time. // SetRepeating() is called to establish a new due time.
std::unique_ptr<Timer> CreateManualResetTimer(); static std::unique_ptr<Timer> CreateManualResetTimer();
// Creates a timer whose state remains signaled until a thread completes a // Creates a timer whose state remains signaled until a thread completes a
// wait operation on the timer object. // wait operation on the timer object.
std::unique_ptr<Timer> CreateSynchronizationTimer(); static std::unique_ptr<Timer> CreateSynchronizationTimer();
// Activates the specified waitable timer. When the due time arrives, the // Activates the specified waitable timer. When the due time arrives, the
// timer is signaled and the thread that set the timer calls the optional // timer is signaled and the thread that set the timer calls the optional

View File

@ -67,22 +67,98 @@ void Sleep(std::chrono::microseconds duration) {
} }
} }
class Win32Handle { template <typename T>
class Win32Handle : public T {
public: public:
Win32Handle(HANDLE handle) : handle_(handle) {} Win32Handle(HANDLE handle) : handle_(handle) {}
virtual ~Win32Handle() { ~Win32Handle() override {
CloseHandle(handle_); CloseHandle(handle_);
handle_ = nullptr; handle_ = nullptr;
} }
protected: protected:
void* native_handle() const override { return handle_; }
HANDLE handle_ = nullptr; HANDLE handle_ = nullptr;
}; };
class Win32Event : public Event, public Win32Handle { WaitResult WaitHandle::Wait(WaitHandle* wait_handle, bool is_alertable,
std::chrono::milliseconds timeout) {
HANDLE handle = wait_handle->native_handle();
DWORD result = WaitForSingleObjectEx(handle, DWORD(timeout.count()),
is_alertable ? TRUE : FALSE);
switch (result) {
case WAIT_OBJECT_0:
return WaitResult::kSuccess;
case WAIT_ABANDONED:
return WaitResult::kAbandoned;
case WAIT_IO_COMPLETION:
return WaitResult::kUserCallback;
case WAIT_TIMEOUT:
return WaitResult::kTimeout;
default:
case WAIT_FAILED:
return WaitResult::kFailed;
}
}
WaitResult WaitHandle::SignalAndWait(WaitHandle* wait_handle_to_signal,
WaitHandle* wait_handle_to_wait_on,
bool is_alertable,
std::chrono::milliseconds timeout) {
HANDLE handle_to_signal = wait_handle_to_signal->native_handle();
HANDLE handle_to_wait_on = wait_handle_to_wait_on->native_handle();
DWORD result =
SignalObjectAndWait(handle_to_signal, handle_to_wait_on,
DWORD(timeout.count()), is_alertable ? TRUE : FALSE);
switch (result) {
case WAIT_OBJECT_0:
return WaitResult::kSuccess;
case WAIT_ABANDONED:
return WaitResult::kAbandoned;
case WAIT_IO_COMPLETION:
return WaitResult::kUserCallback;
case WAIT_TIMEOUT:
return WaitResult::kTimeout;
default:
case WAIT_FAILED:
return WaitResult::kFailed;
}
}
std::pair<WaitResult, size_t> WaitHandle::WaitMultiple(
WaitHandle* wait_handles[], size_t wait_handle_count, bool wait_all,
bool is_alertable, std::chrono::milliseconds timeout) {
std::vector<HANDLE> handles(wait_handle_count);
for (size_t i = 0; i < wait_handle_count; ++i) {
handles[i] = wait_handles[i]->native_handle();
}
DWORD result = WaitForMultipleObjectsEx(
DWORD(handles.size()), handles.data(), wait_all ? TRUE : FALSE,
DWORD(timeout.count()), is_alertable ? TRUE : FALSE);
if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + handles.size()) {
return std::pair<WaitResult, size_t>(WaitResult::kSuccess,
result - WAIT_OBJECT_0);
} else if (result >= WAIT_ABANDONED_0 &&
result < WAIT_ABANDONED_0 + handles.size()) {
return std::pair<WaitResult, size_t>(WaitResult::kAbandoned,
result - WAIT_ABANDONED_0);
}
switch (result) {
case WAIT_IO_COMPLETION:
return std::pair<WaitResult, size_t>(WaitResult::kUserCallback, 0);
case WAIT_TIMEOUT:
return std::pair<WaitResult, size_t>(WaitResult::kTimeout, 0);
default:
case WAIT_FAILED:
return std::pair<WaitResult, size_t>(WaitResult::kFailed, 0);
}
}
class Win32Event : public Win32Handle<Event> {
public: public:
Win32Event(HANDLE handle) : Win32Handle(handle) {} Win32Event(HANDLE handle) : Win32Handle(handle) {}
~Win32Event() = default; ~Win32Event() override = default;
void Set() override { SetEvent(handle_); } void Set() override { SetEvent(handle_); }
void Reset() override { ResetEvent(handle_); } void Reset() override { ResetEvent(handle_); }
void Pulse() override { PulseEvent(handle_); } void Pulse() override { PulseEvent(handle_); }
@ -98,7 +174,7 @@ std::unique_ptr<Event> Event::CreateAutoResetEvent(bool initial_state) {
CreateEvent(nullptr, FALSE, initial_state ? TRUE : FALSE, nullptr)); CreateEvent(nullptr, FALSE, initial_state ? TRUE : FALSE, nullptr));
} }
class Win32Semaphore : public Semaphore, public Win32Handle { class Win32Semaphore : public Win32Handle<Semaphore> {
public: public:
Win32Semaphore(HANDLE handle) : Win32Handle(handle) {} Win32Semaphore(HANDLE handle) : Win32Handle(handle) {}
~Win32Semaphore() override = default; ~Win32Semaphore() override = default;
@ -116,7 +192,7 @@ std::unique_ptr<Semaphore> Semaphore::Create(int initial_count,
CreateSemaphore(nullptr, initial_count, maximum_count, nullptr)); CreateSemaphore(nullptr, initial_count, maximum_count, nullptr));
} }
class Win32Mutant : public Mutant, public Win32Handle { class Win32Mutant : public Win32Handle<Mutant> {
public: public:
Win32Mutant(HANDLE handle) : Win32Handle(handle) {} Win32Mutant(HANDLE handle) : Win32Handle(handle) {}
~Win32Mutant() = default; ~Win32Mutant() = default;
@ -128,7 +204,7 @@ std::unique_ptr<Mutant> Mutant::Create(bool initial_owner) {
CreateMutex(nullptr, initial_owner ? TRUE : FALSE, nullptr)); CreateMutex(nullptr, initial_owner ? TRUE : FALSE, nullptr));
} }
class Win32Timer : public Timer, public Win32Handle { class Win32Timer : public Win32Handle<Timer> {
public: public:
Win32Timer(HANDLE handle) : Win32Handle(handle) {} Win32Timer(HANDLE handle) : Win32Handle(handle) {}
~Win32Timer() = default; ~Win32Timer() = default;