diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index 5fa65d9741..9c0be06044 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -15,29 +15,14 @@ #include #include #include +#include #include +#include -// Hashtable size factor (can be set to 0 to stress-test collisions) -static constexpr uint s_hashtable_power = 16; +#include "asm.hpp" // Total number of entries, should be a power of 2. -static constexpr std::uintptr_t s_hashtable_size = 1u << s_hashtable_power; - -// Pointer mask without bits used as hash, assuming 47-bit pointers. -static constexpr u64 s_pointer_mask = s_hashtable_power > 7 ? 0x7fff'ffff'ffff & ~((s_hashtable_size - 1)) : 0x7fff'ffff'ffff; - -// Max number of waiters is 65535. -static constexpr u64 s_waiter_mask = s_hashtable_power > 7 ? 0x7fff'8000'0000'0000 : 0x7f80'0000'0000'0000; - -// Bit indicates that more than one. -static constexpr u64 s_collision_bit = 0x8000'0000'0000'0000; - -// Allocated slot with secondary table. -static constexpr u64 s_slot_mask = ~(s_waiter_mask | s_pointer_mask | s_collision_bit); - -// Helper to get least significant set bit from 64-bit masks -template -static constexpr u64 one_v = Mask & (0 - Mask); +static constexpr std::size_t s_hashtable_size = 1u << 18; // Callback for wait() function, returns false if wait should return static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*){ return true; }; @@ -167,28 +152,124 @@ static atomic_t s_min_tsc{0}; namespace atomic_wait { +#ifdef USE_STD + // Just madness to keep some members uninitialized and get zero initialization otherwise + template + struct un_t + { + alignas(T) std::byte data[sizeof(T)]{}; + + constexpr un_t() noexcept = default; + + un_t(const un_t&) = delete; + + un_t& operator =(const un_t&) = delete; + + ~un_t() = default; + + T* get() noexcept + { + return std::launder(reinterpret_cast(+data)); + } + + const T* get() const noexcept + { + return std::launder(reinterpret_cast(+data)); + } + + T& operator =(const T& r) noexcept + { + return *get() = r; + } + + T* operator ->() noexcept + { + return get(); + } + + const T* operator ->() const noexcept + { + return get(); + } + + operator T&() noexcept + { + return *get(); + } + + operator const T&() const noexcept + { + return *get(); + } + + static void init(un_t& un) + { + new (un.data) T(); + } + + void destroy() + { + get()->~T(); + } + }; +#endif + // Essentially a fat semaphore struct alignas(64) cond_handle { -#ifdef _WIN32 - u64 tid = GetCurrentThreadId(); -#else - u64 tid = reinterpret_cast(pthread_self()); -#endif - atomic_t sync{}; - u16 link{0}; - u16 size{}; - u64 tsc0{}; - const void* ptr{}; + constexpr cond_handle() noexcept = default; + + atomic_t ptr{}; + u64 tid{}; __m128i mask{}; __m128i oldv{}; + // Temporarily reduced unique tsc stamp to 48 bits to make space for refs (TODO) + u64 tsc0 : 48 = 0; + u64 link : 16 = 0; + u16 size{}; + atomic_t refs{}; + atomic_t sync{}; + #ifdef USE_STD // Standard CV/mutex pair (often contains pthread_cond_t/pthread_mutex_t) - std::condition_variable cv; - std::mutex mtx; + un_t cv{}; + un_t mtx{}; #endif + void init(const void* data) + { + ptr.release(data); +#ifdef _WIN32 + tid = GetCurrentThreadId(); +#else + tid = reinterpret_cast(pthread_self()); +#endif + +#ifdef USE_STD + cv.init(cv); + mtx.init(mtx); +#endif + + // Initialize first reference + verify(HERE), !refs++; + } + + void destroy() + { + ptr.release(nullptr); + tid = 0; + tsc0 = 0; + link = 0; + size = 0; + sync = 0; + +#ifdef USE_STD + mtx.destroy(); + cv.destroy(); +#endif + } + bool forced_wakeup() { const auto [_old, ok] = sync.fetch_op([](u32& val) @@ -230,9 +311,9 @@ namespace atomic_wait futex(&sync, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); #elif defined(USE_STD) // Not super efficient: locking is required to avoid lost notifications - mtx.lock(); - mtx.unlock(); - cv.notify_all(); + mtx->lock(); + mtx->unlock(); + cv->notify_all(); #elif defined(_WIN32) if (NtWaitForAlertByThreadId) { @@ -253,10 +334,10 @@ namespace atomic_wait return false; #elif defined(USE_STD) // Optimistic non-blocking path - if (mtx.try_lock()) + if (mtx->try_lock()) { - mtx.unlock(); - cv.notify_all(); + mtx->unlock(); + cv->notify_all(); return true; } @@ -287,10 +368,7 @@ namespace atomic_wait } // Max allowed thread number is chosen to fit in 16 bits -static std::aligned_storage_t s_cond_list[UINT16_MAX + 1]{}; - -// Used to allow concurrent notifying -static atomic_t s_cond_refs[UINT16_MAX + 1]{}; +static atomic_wait::cond_handle s_cond_list[UINT16_MAX + 1]{}; // Allocation bits static atomic_t s_cond_bits[(UINT16_MAX + 1) / 64]{}; @@ -298,7 +376,7 @@ static atomic_t s_cond_bits[(UINT16_MAX + 1) / 64]{}; // Allocation semaphore static atomic_t s_cond_sema{0}; -static u32 cond_alloc() +static u32 cond_alloc(const void* data) { // Determine whether there is a free slot or not if (!s_cond_sema.try_inc(UINT16_MAX + 1)) @@ -344,11 +422,8 @@ static u32 cond_alloc() continue; } - // Construct inplace before it can be used - new (s_cond_list + id) atomic_wait::cond_handle(); - - // Add first reference - verify(HERE), !s_cond_refs[id]++; + // Initialize new "semaphore" + s_cond_list[id].init(data); return id; } @@ -359,16 +434,6 @@ static u32 cond_alloc() return 0; } -static atomic_wait::cond_handle* cond_get(u32 cond_id) -{ - if (cond_id - 1 < u32{UINT16_MAX}) [[likely]] - { - return std::launder(reinterpret_cast(s_cond_list + cond_id)); - } - - return nullptr; -} - static void cond_free(u32 cond_id) { if (cond_id - 1 >= u32{UINT16_MAX}) @@ -377,417 +442,119 @@ static void cond_free(u32 cond_id) std::abort(); } + const auto cond = s_cond_list + cond_id; + // Dereference, destroy on last ref - if (--s_cond_refs[cond_id]) + if (--cond->refs) { return; } - // Call the destructor - cond_get(cond_id)->~cond_handle(); + // Call the destructor if necessary + cond->destroy(); // Remove the allocation bit s_cond_bits[cond_id / 64] &= ~(1ull << (cond_id % 64)); // Release the semaphore - s_cond_sema--; + verify(HERE), s_cond_sema--; } -static atomic_wait::cond_handle* cond_id_lock(u32 cond_id) +static atomic_wait::cond_handle* cond_id_lock(u32 cond_id, u64 thread_id = 0, const void* data = nullptr) { if (cond_id - 1 < u32{UINT16_MAX}) { - const auto [old, ok] = s_cond_refs[cond_id].fetch_op([](u32& ref) + const auto cond = s_cond_list + cond_id; + + const auto [old, ok] = cond->refs.fetch_op([&](u16& ref) { - if (!ref || ref == UINT32_MAX) + if (!ref || ref == UINT16_MAX) { // Don't reference already deallocated semaphore return false; } + const u32 sync_val = cond->sync; + + if (sync_val == 0 || sync_val == 3) + { + return false; + } + + if (thread_id) + { + if (atomic_storage::load(cond->tid) != thread_id) + { + return false; + } + } + + if (data) + { + if (cond->ptr != data) + { + return false; + } + } + ref++; return true; }); if (ok) { - return cond_get(cond_id); + return cond; } - if (old == UINT32_MAX) + if (old == UINT16_MAX) { - fmt::raw_error("Thread limit " STRINGIZE(UINT32_MAX) " for a single address reached in atomic notifier."); + fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in an atomic notifier."); } } return nullptr; } -static u32 cond_lock(atomic_t* sema) -{ - while (const u32 cond_id = sema->load()) - { - if (cond_id_lock(cond_id)) - { - return cond_id; - } - - if (sema->load() != cond_id) - { - // Try again if it changed - continue; - } - else - { - break; - } - } - - return 0; -} - namespace atomic_wait { -#define MAX_THREADS (56) + // Need to spare 16 bits for max distance + static constexpr u64 max_threads = 48; - struct alignas(128) sync_var + static constexpr u64 thread_mask = (1ull << max_threads) - 1; + + // Thread list + struct alignas(64) root_info { - constexpr sync_var() noexcept = default; + constexpr root_info() noexcept = default; - // Reference counter, owning pointer, collision bit and optionally selected slot - atomic_t addr_ref{}; + // Allocation bits (least significant) + atomic_t bits{}; - private: - // Semaphores (allocated in reverse order), empty are zeros - atomic_t sema_data[MAX_THREADS]{}; + // Allocation pool, pointers to allocated semaphores + atomic_t slots[max_threads]{}; - // Allocated semaphore bits (to make total size 128) - atomic_t sema_bits{}; + // For collision statistics (32 middle bits) + atomic_t first_ptr{}; - public: - atomic_t* sema_alloc() - { - const auto [bits, ok] = sema_bits.fetch_op([](u64& bits) - { - if (bits + 1 < (1ull << MAX_THREADS)) - { - // Set lowest clear bit - bits |= bits + 1; - return true; - } + // For collision statistics (bit difference stick flags) + atomic_t diff_lz{}, diff_tz{}, diff_pop{}; - return false; - }); + // Total reference counter + atomic_t threads{}; - if (ok) [[likely]] - { - // Find lowest clear bit - return get_sema(std::countr_one(bits)); - } + atomic_t* slot_alloc(std::uintptr_t ptr) noexcept; - // TODO: support extension if reached - fmt::raw_error("Thread limit " STRINGIZE(MAX_THREADS) " for a single address reached in atomic wait."); - return nullptr; - } + root_info* slot_free(atomic_t* slot) noexcept; - atomic_t* get_sema(u32 id) - { - verify(HERE), id < MAX_THREADS; - - return &sema_data[(MAX_THREADS - 1) - id]; - } - - u64 get_sema_bits() const - { - return sema_bits & ((1ull << MAX_THREADS) - 1); - } - - void reset_sema_bit(atomic_t* sema) - { - verify(HERE), sema >= sema_data && sema < std::end(sema_data); - - sema_bits &= ~(1ull << ((MAX_THREADS - 1) - (sema - sema_data))); - } - - void sema_free(atomic_t* sema) - { - if (sema < sema_data || sema >= std::end(sema_data)) - { - fprintf(stderr, "sema_free(): bad sema ptr %p" HERE "\n", sema); - std::abort(); - } - - // Try to deallocate semaphore (may be delegated to a notifier) - cond_free(sema->exchange(0)); - - // Clear sema bit - reset_sema_bit(sema); - } + template + auto slot_search(const void* data, u64 thread_id, F func) noexcept; }; - static_assert(sizeof(sync_var) == 128); - -#undef MAX_THREADS + static_assert(sizeof(root_info) == 128); } // Main hashtable for atomic wait. -alignas(128) static atomic_wait::sync_var s_hashtable[s_hashtable_size]{}; - -namespace atomic_wait -{ - struct slot_info - { - constexpr slot_info() noexcept = default; - - // Branch extension - atomic_wait::sync_var branch[48 - s_hashtable_power]{}; - }; -} - -// Number of search groups (defines max slot branch count as gcount * 64) -#define MAX_SLOTS (4096) - -// Array of slot branch objects -alignas(128) static atomic_wait::slot_info s_slot_list[MAX_SLOTS]{}; - -// Allocation bits -static atomic_t s_slot_bits[MAX_SLOTS / 64]{}; - -// Allocation semaphore -static atomic_t s_slot_sema{0}; - -static_assert(MAX_SLOTS % 64 == 0); - -static u64 slot_alloc() -{ - // Determine whether there is a free slot or not - if (!s_slot_sema.try_inc(MAX_SLOTS + 1)) - { - fmt::raw_error("Hashtable extension slot limit " STRINGIZE(MAX_SLOTS) " reached in atomic wait."); - return 0; - } - - // Diversify search start points to reduce contention and increase immediate success chance -#ifdef _WIN32 - const u32 start = GetCurrentProcessorNumber(); -#elif __linux__ - const u32 start = sched_getcpu(); -#else - const u32 start = __rdtsc(); -#endif - - for (u32 i = start;; i++) - { - const u32 group = i % ::size32(s_slot_bits); - - const auto [bits, ok] = s_slot_bits[group].fetch_op([](u64& bits) - { - if (~bits) - { - // Set lowest clear bit - bits |= bits + 1; - return true; - } - - return false; - }); - - if (ok) - { - // Find lowest clear bit - return group * 64 + std::countr_one(bits); - } - } - - // Unreachable - std::abort(); - return 0; -} - -#undef MAX_SLOTS - -static atomic_wait::sync_var* slot_get(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = 0) -{ - if (!loc) - { - return nullptr; - } - - const u64 value = loc->addr_ref.load(); - - if ((value & s_waiter_mask) == 0) - { - return nullptr; - } - - if ((value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - return loc; - } - - if ((value & s_collision_bit) == 0) - { - return nullptr; - } - - // Get the number of leading equal bits to determine subslot - const u64 eq_bits = std::countl_zero((((iptr ^ value) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16); - - // Proceed recursively, increment level - return slot_get(iptr, s_slot_list[(value & s_slot_mask) / one_v].branch + eq_bits, eq_bits + 1); -} - -static void slot_free(u64 id) -{ - // Reset allocation bit - id = (id & s_slot_mask) / one_v; - s_slot_bits[id / 64] &= ~(1ull << (id % 64)); - - // Reset semaphore - s_slot_sema--; -} - -static void slot_free(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = 0) -{ - const u64 value = loc->addr_ref.load(); - - if ((value & s_pointer_mask) != (iptr & s_pointer_mask)) - { - ASSERT(value & s_waiter_mask); - ASSERT(value & s_collision_bit); - - // Get the number of leading equal bits to determine subslot - const u64 eq_bits = std::countl_zero((((iptr ^ value) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16); - - // Proceed recursively, to deallocate deepest branch first - slot_free(iptr, s_slot_list[(value & s_slot_mask) / one_v].branch + eq_bits, eq_bits + 1); - } - - // Actual cleanup in reverse order - auto [_old, ok] = loc->addr_ref.fetch_op([&](u64& value) - { - ASSERT(value & s_waiter_mask); - { - value -= one_v; - - if (!(value & s_waiter_mask)) - { - // Reset on last waiter - value = 0; - return 2; - } - - return 1; - } - }); - - if (ok > 1 && _old & s_collision_bit) - { - // Deallocate slot on last waiter - slot_free(_old); - } -} - -static void slot_free(const void* data) -{ - const std::uintptr_t iptr = reinterpret_cast(data); - - slot_free(iptr, &s_hashtable[iptr % s_hashtable_size]); -} - -static atomic_wait::sync_var* slot_alloc(const void* data) -{ - const std::uintptr_t iptr = reinterpret_cast(data); - - // Allocated slot index - u64 slot_a = -1; - - // Found slot object - atomic_wait::sync_var* slot = nullptr; - - auto install_op = [&](u64& value) -> u64 - { - if ((value & s_waiter_mask) == s_waiter_mask) - { - // Return immediately on waiter overflow - return 0; - } - - if (!value || (value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - // Store pointer bits - value |= (iptr & s_pointer_mask); - } - else - { - if ((value & s_collision_bit) == 0) - { - if (slot_a + 1 == 0) - { - // Second waiter: allocate slot and install it - slot_a = slot_alloc() * one_v; - } - - value |= slot_a; - } - - // Set collision bit - value |= s_collision_bit; - } - - // Add waiter - value += one_v; - return value; - }; - - // Search detail - u64 lv = 0; - - for (atomic_wait::sync_var* ptr = &s_hashtable[iptr % s_hashtable_size];;) - { - auto [_old, ok] = ptr->addr_ref.fetch_op(install_op); - - if (slot_a + 1) - { - if ((_old & s_collision_bit) == 0 && (ok & s_collision_bit) && (ok & s_slot_mask) == slot_a) - { - // Slot set successfully - slot_a = -1; - } - } - - if (!ok) - { - // Expected only on top level - fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in atomic wait hashtable."); - return nullptr; - } - - if (!_old || (_old & s_pointer_mask) == (iptr & s_pointer_mask)) - { - // Success - if (slot_a + 1) - { - // Cleanup slot if unused - slot_free(slot_a); - slot_a = -1; - } - - slot = ptr; - break; - } - - // Get the number of leading equal bits (between iptr and slot owner) - const u64 eq_bits = std::countl_zero((((iptr ^ ok) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16); - - // Collision; need to go deeper - ptr = s_slot_list[(ok & s_slot_mask) / one_v].branch + eq_bits; - - lv = eq_bits + 1; - } - - return slot; -} +alignas(64) static atomic_wait::root_info s_hashtable[s_hashtable_size]{}; u64 atomic_wait::get_unique_tsc() { @@ -808,6 +575,229 @@ u64 atomic_wait::get_unique_tsc() }); } +atomic_t* atomic_wait::root_info::slot_alloc(std::uintptr_t ptr) noexcept +{ + if (!threads.try_inc(UINT16_MAX + 1)) + { + fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in a single hashtable slot."); + return nullptr; + } + + auto* _this = this; + + u64 limit = 0; + + while (true) + { + const auto [_bits, ok] = _this->bits.fetch_op([](u64& bits) + { + // Check free slot + if (~bits & thread_mask) + { + // Set lowest clear bit + bits |= bits + 1; + return true; + } + + return false; + }); + + if (ok) + { + const u32 slot_n = std::countr_one(_bits); + { + const u16 v = _this->slots[slot_n].load(); + } + + return &_this->slots[slot_n]; + } + + // Keep trying adjacent slots in the hashtable, they are often free due to alignment. + _this++; + limit++; + + if (_this == std::end(s_hashtable)) [[unlikely]] + { + _this = s_hashtable; + } + } + + if (limit) + { + // Make slot "extended" + bits.fetch_op([&](u64& val) + { + if ((val >> max_threads) >= limit) [[likely]] + { + return false; + } + + // Replace with max value + val &= thread_mask; + val |= limit << max_threads; + return true; + }); + } + + u32 ptr32 = static_cast(ptr >> 16); + u32 first = first_ptr.load(); + + if (!first && first != ptr32) + { + // Register first used pointer + first = first_ptr.compare_and_swap(0, ptr32); + } + + if (first && first != ptr32) + { + // Difference bits between pointers + u32 diff = first ^ ptr32; + + // The most significant different bit + u32 diff1 = std::countl_zero(diff); + + if (diff1 < 32) + { + diff_lz |= 1u << diff1; + } + + u32 diff2 = std::countr_zero(diff); + + if (diff2 < 32) + { + diff_tz |= 1u << diff2; + } + + diff = (diff & 0xaaaaaaaa) / 2 + (diff & 0x55555555); + diff = (diff & 0xcccccccc) / 4 + (diff & 0x33333333); + diff = (diff & 0xf0f0f0f0) / 16 + (diff & 0x0f0f0f0f); + diff = (diff & 0xff00ff00) / 256 + (diff & 0x00ff00ff); + + diff_pop |= 1u << static_cast((diff >> 16) + diff - 1); + } +} + +atomic_wait::root_info* atomic_wait::root_info::slot_free(atomic_t* slot) noexcept +{ + const auto begin = reinterpret_cast(std::begin(s_hashtable)); + + const auto end = reinterpret_cast(std::end(s_hashtable)); + + const auto ptr = reinterpret_cast(slot) - begin; + + if (ptr >= sizeof(s_hashtable)) + { + fmt::raw_error("Failed to find slot in hashtable slot deallocation." HERE); + return nullptr; + } + + root_info* _this = &s_hashtable[ptr / sizeof(root_info)]; + + if (!(slot >= _this->slots && slot < std::end(_this->slots))) + { + fmt::raw_error("Failed to find slot in hashtable slot deallocation." HERE); + return nullptr; + } + + verify(HERE), slot == &_this->slots[slot - _this->slots]; + + const u32 cond_id = slot->exchange(0); + + if (cond_id) + { + cond_free(cond_id); + } + + _this->bits &= ~(1ull << (slot - _this->slots)); + + auto cnt = this->threads--; + + verify(HERE), cnt; + + if (cnt > 1) + { + return _this; + } + + // Only the last waiter does opportunistic cleanup attempt + while (this->threads < max_threads) + { + auto [old, ok] = this->bits.fetch_op([this](u64& val) + { + if (!val || !(~val & thread_mask) || this->threads >= max_threads) + { + return false; + } + + // Try to clean distance mask + val &= thread_mask; + return true; + }); + + if (!old || ok) + { + break; + } + } + + return _this; +} + +template +auto atomic_wait::root_info::slot_search(const void* data, u64 thread_id, F func) noexcept +{ + const u64 bits_val = this->bits.load(); + const u64 max_order = bits_val >> max_threads; + + auto* _this = this; + + u32 order = 0; + u32 count = 0; + + u64 new_val = bits_val & thread_mask; + + while (new_val) + { + u32 cond_ids[max_threads]; + u32 cond_max = 0; + + for (u64 bits = new_val; bits; bits &= bits - 1) + { + if (const u32 cond_id = _this->slots[std::countr_zero(bits)]) + { + utils::prefetch_read(s_cond_list + cond_id); + cond_ids[cond_max++] = cond_id; + } + } + + for (u32 i = 0; i < cond_max; i++) + { + if (cond_id_lock(cond_ids[i], thread_id, data)) + { + if (func(cond_ids[i])) + { + return; + } + } + } + + _this++; + order++; + + if (order >= max_order) + { + return; + } + + if (_this >= std::end(s_hashtable)) + { + _this = s_hashtable; + } + + new_val = _this->bits.load() & thread_mask; + } +} + SAFE_BUFFERS void #ifdef _WIN32 __vectorcall @@ -816,12 +806,16 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time { const auto stamp0 = atomic_wait::get_unique_tsc(); - const auto slot = slot_alloc(data); + const std::uintptr_t iptr = reinterpret_cast(data); - std::array slot_ext{}; + const auto root = &s_hashtable[iptr % s_hashtable_size]; uint ext_size = 0; + std::uintptr_t iptr_ext[atomic_wait::max_list - 1]{}; + + atomic_wait::root_info* root_ext[atomic_wait::max_list - 1]{}; + if (ext) [[unlikely]] { for (auto e = ext; e->data; e++) @@ -839,48 +833,44 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time } } - // Allocate additional slots - slot_ext[ext_size++] = slot_alloc(e->data); + iptr_ext[ext_size] = reinterpret_cast(e->data); + root_ext[ext_size] = &s_hashtable[iptr & s_hashtable_size]; + ext_size++; } } - const u32 cond_id = cond_alloc(); - - verify(HERE), cond_id; + const u32 cond_id = cond_alloc(data); u32 cond_id_ext[atomic_wait::max_list - 1]{}; for (u32 i = 0; i < ext_size; i++) { - cond_id_ext[i] = cond_alloc(); + cond_id_ext[i] = cond_alloc(ext[i].data); } - const auto sema = slot->sema_alloc(); + const auto slot = root->slot_alloc(iptr); - verify(HERE), sema; - - std::array*, atomic_wait::max_list - 1> sema_ext{}; + std::array*, atomic_wait::max_list - 1> slot_ext{}; std::array cond_ext{}; for (u32 i = 0; i < ext_size; i++) { - // Allocate cond id location ("sema") in *corresponding* slot - sema_ext[i] = slot_ext[i]->sema_alloc(); + // Allocate slot for cond id location + slot_ext[i] = root_ext[i]->slot_alloc(iptr_ext[i]); - // Get actual semaphores - cond_ext[i] = cond_get(cond_id_ext[i]); + // Get pointers to the semaphores + cond_ext[i] = s_cond_list + cond_id_ext[i]; } // Save for notifiers - const auto cond = cond_get(cond_id); + const auto cond = s_cond_list + cond_id; // Store some info for notifiers (some may be unused) cond->link = 0; cond->size = static_cast(size); cond->mask = mask; cond->oldv = old_value; - cond->ptr = data; cond->tsc0 = stamp0; for (u32 i = 0; i < ext_size; i++) @@ -890,8 +880,7 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time cond_ext[i]->size = static_cast(ext[i].size); cond_ext[i]->mask = ext[i].mask; cond_ext[i]->oldv = ext[i].old; - cond_ext[i]->ptr = ext[i].data; - cond_ext[i]->tsc0 = cond->tsc0; + cond_ext[i]->tsc0 = stamp0; // Cannot be notified, should be redirected to main semaphore cond_ext[i]->sync.release(4); @@ -901,15 +890,15 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time for (u32 i = 0; i < ext_size; i++) { - // Final deployment - sema_ext[i]->release(static_cast(cond_id_ext[i])); + slot_ext[i]->release(static_cast(cond_id_ext[i])); } - sema->store(static_cast(cond_id)); + // Final deployment + slot->store(static_cast(cond_id)); #ifdef USE_STD // Lock mutex - std::unique_lock lock(cond->mtx); + std::unique_lock lock(*cond->mtx.get()); #endif // Can skip unqueue process if true @@ -949,11 +938,11 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time if (timeout + 1) { - cond->cv.wait_for(lock, std::chrono::nanoseconds(timeout)); + cond->cv->wait_for(lock, std::chrono::nanoseconds(timeout)); } else { - cond->cv.wait(lock); + cond->cv->wait(lock); } #elif defined(_WIN32) LARGE_INTEGER qw; @@ -1047,38 +1036,23 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time // Release resources in reverse order for (u32 i = ext_size - 1; i != umax; i--) { - slot_ext[i]->sema_free(sema_ext[i]); + verify(HERE), root_ext[i] == root_ext[i]->slot_free(slot_ext[i]); } - slot->sema_free(sema); - - for (u32 i = ext_size - 1; i != umax; i--) - { - slot_free(ext[i].data); - } - - slot_free(data); + verify(HERE), root == root->slot_free(slot); s_tls_wait_cb(nullptr); } -// Platform specific wake-up function -static NEVER_INLINE bool +static bool #ifdef _WIN32 __vectorcall #endif -alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i mask, __m128i new_value) +alert_sema(u32 cond_id, const void* data, u64 info, u32 size, __m128i mask, __m128i new_value) { - const u32 cond_id = cond_lock(sema); + verify(HERE), cond_id; - if (!cond_id) - { - return false; - } - - const auto cond = cond_get(cond_id); - - verify(HERE), cond; + const auto cond = s_cond_list + cond_id; bool ok = false; @@ -1140,9 +1114,11 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) if (!data) { // Special path: search thread_id without pointer information - for (u32 i = 1; i < UINT16_MAX; i++) + for (u32 i = 1; i <= UINT16_MAX; i++) { - const auto [_, ok] = s_cond_refs[i].fetch_op([&](u32& ref) + const auto cond = s_cond_list + i; + + const auto [old, ok] = cond->refs.fetch_op([&](u16& ref) { if (!ref) { @@ -1150,31 +1126,27 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) return false; } - u32 val = 0; - std::memcpy(&val, reinterpret_cast(s_cond_list + i) + offsetof(atomic_wait::cond_handle, sync), sizeof(val)); + u32 val = cond->sync.load(); if (val == 0 || val >= 3) { - // Dirty optimization, read possibly uninitialized memory and skip forced signaled or secondary semaphores + // Skip forced signaled or secondary semaphores return false; } if (thread_id) { - u64 tid = 0; - std::memcpy(&tid, reinterpret_cast(s_cond_list + i) + offsetof(atomic_wait::cond_handle, tid), sizeof(tid)); - - if (tid != thread_id) + // Check thread if provided + if (s_cond_list[i].tid != thread_id) { - // Check thread first without locking (memory may be uninitialized) return false; } } - if (ref < UINT32_MAX) + if (ref < UINT16_MAX) { - // Need to busy loop otherwise (TODO) ref++; + return true; } return true; @@ -1182,7 +1154,7 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) if (ok) [[unlikely]] { - const auto cond = cond_get(i); + const auto cond = s_cond_list + i; if (!thread_id || cond->tid == thread_id) { @@ -1192,14 +1164,16 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) if (thread_id) { - // Only if thread_id is speficied, stop only it and return true. - cond_free(i); + // Only if thread_id is speficied, stop only one and return true. + if (old < UINT16_MAX) + cond_free(i); return true; } } } - cond_free(i); + if (old < UINT16_MAX) + cond_free(i); } } @@ -1208,37 +1182,33 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) const std::uintptr_t iptr = reinterpret_cast(data); - const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]); - - if (!slot) - { - return false; - } + auto* const root = &s_hashtable[iptr % s_hashtable_size]; s_tls_notify_cb(data, 0); u64 progress = 0; - for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1) + root->slot_search(data, thread_id, [&](u32 cond_id) { - const auto sema = slot->get_sema(std::countr_zero(bits)); - // Forced notification - if (alert_sema(sema, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128())) + if (alert_sema(cond_id, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128())) { s_tls_notify_cb(data, ++progress); if (thread_id == 0) { // Works like notify_all in this case - continue; + return false; } - break; + return true; } - } + + return false; + }); s_tls_notify_cb(data, -1); + return progress != 0; } @@ -1250,27 +1220,22 @@ atomic_wait_engine::notify_one(const void* data, u32 size, __m128i mask, __m128i { const std::uintptr_t iptr = reinterpret_cast(data); - const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]); - - if (!slot) - { - return; - } + auto* const root = &s_hashtable[iptr % s_hashtable_size]; s_tls_notify_cb(data, 0); u64 progress = 0; - for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1) + root->slot_search(data, 0, [&](u32 cond_id) { - const auto sema = slot->get_sema(std::countr_zero(bits)); - - if (alert_sema(sema, data, progress, size, mask, new_value)) + if (alert_sema(cond_id, data, progress, size, mask, new_value)) { s_tls_notify_cb(data, ++progress); - break; + return true; } - } + + return false; + }); s_tls_notify_cb(data, -1); } @@ -1283,125 +1248,21 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i { const std::uintptr_t iptr = reinterpret_cast(data); - const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]); - - if (!slot) - { - return; - } + auto* const root = &s_hashtable[iptr % s_hashtable_size]; s_tls_notify_cb(data, 0); u64 progress = 0; + + root->slot_search(data, 0, [&](u32 cond_id) { - // Make a copy to filter out waiters that fail some checks - u64 copy = slot->get_sema_bits(); - u64 lock = 0; - u32 lock_ids[64]{}; - u32 lock_id2[64]{}; - - for (u64 bits = copy; bits; bits &= bits - 1) + if (alert_sema(cond_id, data, progress, size, mask, new_value)) { - const u32 id = std::countr_zero(bits); - - const auto sema = slot->get_sema(id); - - if (const u32 cond_id = cond_lock(sema)) - { - // Add lock bit for cleanup - lock |= 1ull << id; - lock_ids[id] = cond_id; - - const auto cond = cond_get(cond_id); - - verify(HERE), cond; - - u32 cmp_res = 0; - - if (cond->sync && cond->ptr == data && ((cmp_res = cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)))) - { - const auto _old = cond; - const auto _new = _old->link ? cond_id_lock(_old->link) : _old; - - if (_new && _old != _new) - { - lock_id2[id] = _old->link; - } - - if (_new && _new->tsc0 == _old->tsc0 && _new->wakeup(cmp_res)) - { - // Ok. - continue; - } - } - } - - // Remove the bit from next stage - copy &= ~(1ull << id); - } - - // If only one waiter exists, there is no point in trying to optimize - if (copy & (copy - 1)) - { - for (u64 bits = copy; bits; bits &= bits - 1) - { - const u32 id = std::countr_zero(bits); - - const auto cond_id = lock_id2[id] ? lock_id2[id] : lock_ids[id]; - - if (cond_get(cond_id)->try_alert_native()) - { - s_tls_notify_cb(data, ++progress); - - // Remove the bit from next stage - copy &= ~(1ull << id); - } - } - } - - // Proceed with remaining bits using "normal" blocking waiting - for (u64 bits = copy; bits; bits &= bits - 1) - { - const u32 id = std::countr_zero(bits); - - const auto cond_id = lock_id2[id] ? lock_id2[id] : lock_ids[id]; - - cond_get(cond_id)->alert_native(); - s_tls_notify_cb(data, ++progress); } - // Cleanup locked notifiers - for (u64 bits = lock; bits; bits &= bits - 1) - { - const u32 id = std::countr_zero(bits); - - if (u32 cond_id = lock_id2[id]) - { - cond_free(cond_id); - } - - if (u32 cond_id = lock_ids[id]) - { - cond_free(cond_id); - } - } - - s_tls_notify_cb(data, -1); - return; - } - - // Unused, let's keep for reference - for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1) - { - const auto sema = slot->get_sema(std::countr_zero(bits)); - - if (alert_sema(sema, data, progress, size, mask, new_value)) - { - s_tls_notify_cb(data, ++progress); - continue; - } - } + return false; + }); s_tls_notify_cb(data, -1); } diff --git a/rpcs3/util/atomic.hpp b/rpcs3/util/atomic.hpp index a5dbb656ae..8a81ccff56 100644 --- a/rpcs3/util/atomic.hpp +++ b/rpcs3/util/atomic.hpp @@ -19,8 +19,7 @@ namespace atomic_wait { constexpr uint max_list = 8; - struct sync_var; - struct slot_info; + struct root_info; struct sema_handle; struct info