atomic.cpp: remove raw_notify and simplify internal logic

Also permit zero size in waiters.
This commit is contained in:
Nekotekina 2021-03-23 20:19:10 +03:00
parent e0790f758e
commit 820390b7ed
2 changed files with 21 additions and 230 deletions

View File

@ -216,56 +216,6 @@ static NEVER_INLINE bool ptr_cmp(const void* data, u32 _size, u128 old128, u128
return false; return false;
} }
// Returns true if mask overlaps, or the argument is invalid
static bool cmp_mask(u32 size1, u128 mask1, u128 val1, u32 size2, u128 mask2, u128 val2)
{
// Compare only masks, new value is not available in this mode
if (size1 == umax)
{
// Simple mask overlap
const u128 v0 = mask1 & mask2;
return !!(v0);
}
// Generate masked value inequality bits
const u128 v0 = (mask1 & mask2) & (val1 ^ val2);
using atomic_wait::op;
using atomic_wait::op_flag;
const u8 size = std::min<u8>(static_cast<u8>(size2), static_cast<u8>(size1));
const op flag{static_cast<u8>(size2 >> 8)};
if (flag != op::eq && flag != (op::eq | op_flag::inverse))
{
fmt::throw_exception("cmp_mask(): no operations are supported for notification with forced value yet.");
}
if (size <= 8)
{
// Generate sized mask
const u64 mask = UINT64_MAX >> ((64 - size * 8) & 63);
if (!(static_cast<u64>(v0) & mask))
{
return !!(flag & op_flag::inverse);
}
}
else if (size == 16)
{
if (!v0)
{
return !!(flag & op_flag::inverse);
}
}
else
{
fmt::throw_exception("bad size (size1=%u, size2=%u)", size1, size2);
}
return !(flag & op_flag::inverse);
}
static atomic_t<u64> s_min_tsc{0}; static atomic_t<u64> s_min_tsc{0};
namespace namespace
@ -670,6 +620,7 @@ static void cond_free(u32 cond_id, u32 tls_slot = -1)
{ {
// Fast finalization // Fast finalization
cond->sync.release(0); cond->sync.release(0);
cond->size = 0;
cond->mask = 0; cond->mask = 0;
*ptls = static_cast<u16>(cond_id); *ptls = static_cast<u16>(cond_id);
return; return;
@ -707,7 +658,7 @@ static void cond_free(u32 cond_id, u32 tls_slot = -1)
}); });
} }
static cond_handle* cond_id_lock(u32 cond_id, u32 size, u128 mask, u64 thread_id = 0, uptr iptr = 0) static cond_handle* cond_id_lock(u32 cond_id, u128 mask, uptr iptr = 0)
{ {
if (cond_id - 1 < u32{UINT16_MAX}) if (cond_id - 1 < u32{UINT16_MAX})
{ {
@ -734,16 +685,7 @@ static cond_handle* cond_id_lock(u32 cond_id, u32 size, u128 mask, u64 thread_id
return false; return false;
} }
const u128 mask12 = mask & cond->mask; if (!(mask & cond->mask) && cond->size)
if (thread_id)
{
if (atomic_storage<u64>::load(cond->tid) != thread_id)
{
return false;
}
}
else if (size && !mask12)
{ {
return false; return false;
} }
@ -799,7 +741,7 @@ namespace
static void slot_free(uptr ptr, atomic_t<u16>* slot, u32 tls_slot) noexcept; static void slot_free(uptr ptr, atomic_t<u16>* slot, u32 tls_slot) noexcept;
template <typename F> template <typename F>
static auto slot_search(uptr iptr, u32 size, u64 thread_id, u128 mask, F func) noexcept; static auto slot_search(uptr iptr, u128 mask, F func) noexcept;
}; };
static_assert(sizeof(root_info) == 64); static_assert(sizeof(root_info) == 64);
@ -991,7 +933,7 @@ void root_info::slot_free(uptr iptr, atomic_t<u16>* slot, u32 tls_slot) noexcept
} }
template <typename F> template <typename F>
FORCE_INLINE auto root_info::slot_search(uptr iptr, u32 size, u64 thread_id, u128 mask, F func) noexcept FORCE_INLINE auto root_info::slot_search(uptr iptr, u128 mask, F func) noexcept
{ {
u32 index = 0; u32 index = 0;
u32 total = 0; u32 total = 0;
@ -1021,7 +963,7 @@ FORCE_INLINE auto root_info::slot_search(uptr iptr, u32 size, u64 thread_id, u12
for (u32 i = 0; i < cond_count; i++) for (u32 i = 0; i < cond_count; i++)
{ {
if (cond_id_lock(cond_ids[i], size, mask, thread_id, iptr)) if (cond_id_lock(cond_ids[i], mask, iptr))
{ {
if (func(cond_ids[i])) if (func(cond_ids[i]))
{ {
@ -1296,7 +1238,7 @@ SAFE_BUFFERS(void) atomic_wait_engine::wait(const void* data, u32 size, u128 old
} }
template <bool NoAlert = false> template <bool NoAlert = false>
static u32 alert_sema(u32 cond_id, u64 tid, u32 size, u128 mask, u128 phantom) static u32 alert_sema(u32 cond_id, u128 mask)
{ {
ensure(cond_id); ensure(cond_id);
@ -1304,11 +1246,11 @@ static u32 alert_sema(u32 cond_id, u64 tid, u32 size, u128 mask, u128 phantom)
u32 ok = 0; u32 ok = 0;
if (!size ? (!tid || cond->tid == tid) : cmp_mask(size, mask, phantom, cond->size | (cond->flag << 8), cond->mask, cond->oldv)) if (!cond->size || mask & cond->mask)
{ {
// Redirect if necessary // Redirect if necessary
const auto _old = cond; const auto _old = cond;
const auto _new = _old->link ? cond_id_lock(_old->link, 0, u128(-1)) : _old; const auto _new = _old->link ? cond_id_lock(_old->link, u128(-1)) : _old;
if (_new && _new->tsc0 == _old->tsc0) if (_new && _new->tsc0 == _old->tsc0)
{ {
@ -1327,7 +1269,7 @@ static u32 alert_sema(u32 cond_id, u64 tid, u32 size, u128 mask, u128 phantom)
return ok; return ok;
} }
} }
else if ((!size && _new->forced_wakeup()) || (size && _new->wakeup(size == umax ? 1 : 2))) else if (_new->wakeup(1))
{ {
ok = cond_id; ok = cond_id;
{ {
@ -1364,149 +1306,7 @@ void atomic_wait_engine::set_notify_callback(void(*cb)(const void*, u64))
s_tls_notify_cb = cb; s_tls_notify_cb = cb;
} }
bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) void atomic_wait_engine::notify_one(const void* data, u32 /*size*/, u128 mask)
{
// Special operation mode. Note that this is not atomic.
if (!data)
{
// Extract total amount of allocated bits (but hard to tell which level4 slots are occupied)
const auto sem = s_cond_sem1.load();
u32 total = 0;
for (u32 i = 0; i < 8; i++)
{
if ((sem >> (i * 14)) & (8192 + 8191))
{
total = (i + 1) * 8192;
}
}
// Special path: search thread_id without pointer information
for (u32 i = 1; i <= total; i++)
{
if ((i & 63) == 0)
{
for (u64 bits = s_cond_bits[i / 64]; bits; bits &= bits - 1)
{
utils::prefetch_read(s_cond_list + i + std::countr_zero(bits));
}
}
if (!s_cond_bits[i / 64])
{
i |= 63;
continue;
}
if (~s_cond_bits[i / 64] & (1ull << i))
{
continue;
}
const auto cond = s_cond_list + i;
const auto [old, ok] = cond->ptr_ref.fetch_op([&](u64& val)
{
if (!val)
{
// Skip dead semaphores
return false;
}
u32 sync_val = cond->sync.load();
if (sync_val == 0 || sync_val >= 3)
{
// Skip forced signaled or secondary semaphores
return false;
}
if (thread_id)
{
// Check thread if provided
if (atomic_storage<u64>::load(cond->tid) != thread_id)
{
return false;
}
}
if ((val & s_ref_mask) < s_ref_mask)
{
val++;
return true;
}
return true;
});
if (ok) [[unlikely]]
{
const auto cond = s_cond_list + i;
if (!thread_id || cond->tid == thread_id)
{
if (!cond->link && cond->forced_wakeup())
{
cond->alert_native();
if (thread_id)
{
// Only if thread_id is speficied, stop only one and return true.
if ((old & s_ref_mask) < s_ref_mask)
{
cond_free(i);
}
return true;
}
}
}
if ((old & s_ref_mask) < s_ref_mask)
{
cond_free(i);
}
}
}
return false;
}
const uptr iptr = reinterpret_cast<uptr>(data) & (~s_ref_mask >> 17);
if (s_tls_notify_cb)
s_tls_notify_cb(data, 0);
u64 progress = 0;
root_info::slot_search(iptr, 0, thread_id, u128(-1), [&](u32 cond_id)
{
// Forced notification
if (alert_sema(cond_id, thread_id, 0, 0, 0))
{
if (s_tls_notify_cb)
s_tls_notify_cb(data, ++progress);
if (thread_id == 0)
{
// Works like notify_all in this case
return false;
}
return true;
}
return false;
});
if (s_tls_notify_cb)
s_tls_notify_cb(data, -1);
return progress != 0;
}
void atomic_wait_engine::notify_one(const void* data, u32 size, u128 mask, u128 new_value)
{ {
const uptr iptr = reinterpret_cast<uptr>(data) & (~s_ref_mask >> 17); const uptr iptr = reinterpret_cast<uptr>(data) & (~s_ref_mask >> 17);
@ -1515,9 +1315,9 @@ void atomic_wait_engine::notify_one(const void* data, u32 size, u128 mask, u128
u64 progress = 0; u64 progress = 0;
root_info::slot_search(iptr, size, 0, mask, [&](u32 cond_id) root_info::slot_search(iptr, mask, [&](u32 cond_id)
{ {
if (alert_sema(cond_id, -1, size, mask, new_value)) if (alert_sema(cond_id, mask))
{ {
if (s_tls_notify_cb) if (s_tls_notify_cb)
s_tls_notify_cb(data, ++progress); s_tls_notify_cb(data, ++progress);
@ -1531,7 +1331,7 @@ void atomic_wait_engine::notify_one(const void* data, u32 size, u128 mask, u128
s_tls_notify_cb(data, -1); s_tls_notify_cb(data, -1);
} }
SAFE_BUFFERS(void) atomic_wait_engine::notify_all(const void* data, u32 size, u128 mask) SAFE_BUFFERS(void) atomic_wait_engine::notify_all(const void* data, u32 /*size*/, u128 mask)
{ {
const uptr iptr = reinterpret_cast<uptr>(data) & (~s_ref_mask >> 17); const uptr iptr = reinterpret_cast<uptr>(data) & (~s_ref_mask >> 17);
@ -1546,9 +1346,9 @@ SAFE_BUFFERS(void) atomic_wait_engine::notify_all(const void* data, u32 size, u1
// Array itself. // Array itself.
u32 cond_ids[max_threads * max_distance + 128]; u32 cond_ids[max_threads * max_distance + 128];
root_info::slot_search(iptr, size, 0, mask, [&](u32 cond_id) root_info::slot_search(iptr, mask, [&](u32 cond_id)
{ {
u32 res = alert_sema<true>(cond_id, -1, size, mask, 0); u32 res = alert_sema<true>(cond_id, mask);
if (res && ~res <= UINT16_MAX) if (res && ~res <= UINT16_MAX)
{ {

View File

@ -313,13 +313,12 @@ private:
friend class atomic_wait::list; friend class atomic_wait::list;
static void wait(const void* data, u32 size, u128 old128, u64 timeout, u128 mask128, atomic_wait::info* extension = nullptr); static void wait(const void* data, u32 size, u128 old128, u64 timeout, u128 mask128, atomic_wait::info* extension = nullptr);
static void notify_one(const void* data, u32 size, u128 mask128, u128 val128); static void notify_one(const void* data, u32 size, u128 mask128);
static void notify_all(const void* data, u32 size, u128 mask128); static void notify_all(const void* data, u32 size, u128 mask128);
public: public:
static void set_wait_callback(bool(*cb)(const void* data, u64 attempts, u64 stamp0)); static void set_wait_callback(bool(*cb)(const void* data, u64 attempts, u64 stamp0));
static void set_notify_callback(void(*cb)(const void* data, u64 progress)); static void set_notify_callback(void(*cb)(const void* data, u64 progress));
static bool raw_notify(const void* data, u64 thread_id = 0);
}; };
template <uint Max, typename... T> template <uint Max, typename... T>
@ -1588,34 +1587,26 @@ public:
void notify_one() noexcept void notify_one() noexcept
{ {
atomic_wait_engine::notify_one(&m_data, -1, atomic_wait::default_mask<atomic_t>, 0); atomic_wait_engine::notify_one(&m_data, sizeof(T), atomic_wait::default_mask<atomic_t>);
} }
// Notify with mask, allowing to not wake up thread which doesn't wait on this mask // Notify with mask, allowing to not wake up thread which doesn't wait on this mask
void notify_one(type mask_value) noexcept void notify_one(type mask_value) noexcept
{ {
const u128 mask = std::bit_cast<get_uint_t<sizeof(T)>>(mask_value); const u128 mask = std::bit_cast<get_uint_t<sizeof(T)>>(mask_value);
atomic_wait_engine::notify_one(&m_data, -1, mask, 0); atomic_wait_engine::notify_one(&m_data, sizeof(T), mask);
}
// Notify with mask and value, allowing to not wake up thread which doesn't wait on them
[[deprecated("Incomplete")]] void notify_one(type mask_value, type phantom_value) noexcept
{
const u128 mask = std::bit_cast<get_uint_t<sizeof(T)>>(mask_value);
const u128 _new = std::bit_cast<get_uint_t<sizeof(T)>>(phantom_value);
atomic_wait_engine::notify_one(&m_data, sizeof(T), mask, _new);
} }
void notify_all() noexcept void notify_all() noexcept
{ {
atomic_wait_engine::notify_all(&m_data, -1, atomic_wait::default_mask<atomic_t>); atomic_wait_engine::notify_all(&m_data, sizeof(T), atomic_wait::default_mask<atomic_t>);
} }
// Notify all threads with mask, allowing to not wake up threads which don't wait on them // Notify all threads with mask, allowing to not wake up threads which don't wait on them
void notify_all(type mask_value) noexcept void notify_all(type mask_value) noexcept
{ {
const u128 mask = std::bit_cast<get_uint_t<sizeof(T)>>(mask_value); const u128 mask = std::bit_cast<get_uint_t<sizeof(T)>>(mask_value);
atomic_wait_engine::notify_all(&m_data, -1, mask); atomic_wait_engine::notify_all(&m_data, sizeof(T), mask);
} }
}; };