atomic.hpp: add timeout support

This commit is contained in:
Nekotekina 2019-09-09 02:42:05 +03:00
parent a45f86a4a2
commit aa99faa85d
2 changed files with 58 additions and 12 deletions

View File

@ -72,22 +72,41 @@ namespace
return s_waiter_maps[std::hash<const void*>()(ptr) % std::size(s_waiter_maps)]; return s_waiter_maps[std::hash<const void*>()(ptr) % std::size(s_waiter_maps)];
} }
void fallback_wait(const void* data, std::size_t size, u64 old_value) void fallback_wait(const void* data, std::size_t size, u64 old_value, u64 timeout)
{ {
auto& wmap = get_fallback_map(data); auto& wmap = get_fallback_map(data);
if (!timeout)
{
return;
}
// Update node key // Update node key
s_tls_waiter.key() = data; s_tls_waiter.key() = data;
if (std::unique_lock lock(wmap.mutex); ptr_cmp(data, size, old_value) && s_tls_wait_cb(data)) if (std::unique_lock lock(wmap.mutex); ptr_cmp(data, size, old_value) && s_tls_wait_cb(data))
{ {
// Add node to the waiter list // Add node to the waiter list
std::condition_variable& cond = wmap.list.insert(std::move(s_tls_waiter))->second.cond; const auto iter = wmap.list.insert(std::move(s_tls_waiter));
// Wait until the node is returned to its TLS location // Wait until the node is returned to its TLS location
if (timeout + 1)
{
if (!iter->second.cond.wait_for(lock, std::chrono::nanoseconds(timeout), [&]
{
return 1 && s_tls_waiter;
}))
{
// Put it back
s_tls_waiter = wmap.list.extract(iter);
}
return;
}
while (!s_tls_waiter) while (!s_tls_waiter)
{ {
cond.wait(lock); iter->second.cond.wait(lock);
} }
} }
} }
@ -127,9 +146,9 @@ namespace
#if !defined(_WIN32) && !defined(__linux__) #if !defined(_WIN32) && !defined(__linux__)
void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value) void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout)
{ {
fallback_wait(data, size, old_value); fallback_wait(data, size, old_value, timeout);
} }
void atomic_storage_futex::notify_one(const void* data) void atomic_storage_futex::notify_one(const void* data)
@ -144,8 +163,13 @@ void atomic_storage_futex::notify_all(const void* data)
#else #else
void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value) void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout)
{ {
if (!timeout)
{
return;
}
const std::intptr_t iptr = reinterpret_cast<std::intptr_t>(data); const std::intptr_t iptr = reinterpret_cast<std::intptr_t>(data);
atomic_t<s64>& entry = s_hashtable[iptr % s_hashtable_size]; atomic_t<s64>& entry = s_hashtable[iptr % s_hashtable_size];
@ -183,10 +207,26 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
if (ptr_cmp(data, size, old_value) && s_tls_wait_cb(data)) if (ptr_cmp(data, size, old_value) && s_tls_wait_cb(data))
{ {
#ifdef _WIN32 #ifdef _WIN32
NtWaitForKeyedEvent(nullptr, &entry, false, nullptr); LARGE_INTEGER qw;
return; qw.QuadPart = -static_cast<s64>(timeout / 100);
if (timeout % 100)
{
// Round up to closest 100ns unit
qw.QuadPart -= 1;
}
if (!NtWaitForKeyedEvent(nullptr, &entry, false, timeout + 1 ? &qw : nullptr))
{
// Return if no errors, continue if timed out
return;
}
#else #else
futex(reinterpret_cast<char*>(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, new_value, nullptr); struct timespec ts;
ts.tv_sec = timeout / 1'000'000'000;
ts.tv_nsec = timeout % 1'000'000'000;
futex(reinterpret_cast<char*>(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, new_value, timeout + 1 ? &ts : nullptr);
#endif #endif
} }

View File

@ -7,6 +7,12 @@
#include <atomic> #include <atomic>
#endif #endif
// Wait timeout extension (in nanoseconds)
enum class atomic_wait_timeout : u64
{
inf = 0xffff'ffff'ffff'ffff,
};
// Helper for waitable atomics (as in C++20 std::atomic) // Helper for waitable atomics (as in C++20 std::atomic)
struct atomic_storage_futex struct atomic_storage_futex
{ {
@ -14,7 +20,7 @@ private:
template <typename T> template <typename T>
friend class atomic_t; friend class atomic_t;
static void wait(const void* data, std::size_t size, u64 old_value); static void wait(const void* data, std::size_t size, u64 old_value, u64 timeout);
static void notify_one(const void* data); static void notify_one(const void* data);
static void notify_all(const void* data); static void notify_all(const void* data);
@ -1128,9 +1134,9 @@ public:
return atomic_storage<type>::btr(m_data, bit); return atomic_storage<type>::btr(m_data, bit);
} }
void wait(type old_value) const noexcept void wait(type old_value, atomic_wait_timeout timeout = atomic_wait_timeout::inf) const noexcept
{ {
atomic_storage_futex::wait(&m_data, sizeof(T), std::bit_cast<get_uint_t<sizeof(T)>>(old_value)); atomic_storage_futex::wait(&m_data, sizeof(T), std::bit_cast<get_uint_t<sizeof(T)>>(old_value), static_cast<u64>(timeout));
} }
void notify_one() noexcept void notify_one() noexcept