From 6d37bc62a5289aa2c4e6c051892165caeb2cff54 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Thu, 19 Nov 2020 09:12:01 +0300 Subject: [PATCH] CPU: remove cpu_counter from g_fxo Convert it to a namespace, simplify add/remove functions. Don't add() threads just creates, they are trapped in check_state() anyway. Don't add() threads which are leaving check_state() because stopped. --- rpcs3/Emu/CPU/CPUThread.cpp | 242 ++++++++++++++++-------------------- 1 file changed, 105 insertions(+), 137 deletions(-) diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 928fbc0229..b7540f0079 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -12,7 +12,6 @@ #include #include -#include #include DECLARE(cpu_thread::g_threads_created){0}; @@ -22,7 +21,7 @@ DECLARE(cpu_thread::g_suspend_counter){0}; LOG_CHANNEL(profiler); LOG_CHANNEL(sys_log, "SYS"); -static thread_local u64 s_tls_thread_slot = -1; +static thread_local u32 s_tls_thread_slot = -1; // Suspend counter stamp static thread_local u64 s_tls_sctr = -1; @@ -251,39 +250,35 @@ using cpu_profiler = named_thread; thread_local cpu_thread* g_tls_current_cpu_thread = nullptr; -struct cpu_counter +// Total number of CPU threads +static atomic_t s_cpu_counter{0}; + +// List of posted tasks for suspend_all +static atomic_t s_cpu_work[64]{}; + +// Linked list of pushed tasks for suspend_all +static atomic_t s_pushed{}; + +// Lock for suspend_all operations +static shared_mutex s_cpu_lock; + +// Bit allocator for threads which need to be suspended +static atomic_t s_cpu_bits{}; + +// List of active threads which need to be suspended +static atomic_t s_cpu_list[64]{}; + +namespace cpu_counter { - // For synchronizing suspend_all operation - alignas(64) shared_mutex cpu_suspend_lock; - - // Workload linked list - alignas(64) atomic_t cpu_suspend_work{}; - - // Semaphore for global thread array (global counter) - alignas(64) atomic_t cpu_array_sema{0}; - - // Semaphore subdivision for each array slot (64 x N in total) - alignas(64) atomic_t cpu_array_bits[3]{}; - - // Copy of array bits for internal use - alignas(64) u64 cpu_copy_bits[3]{}; - - // All registered threads - atomic_t cpu_array[sizeof(cpu_array_bits) * 8]{}; - - u64 add(cpu_thread* _this, bool restore = false) noexcept + void add(cpu_thread* _this) noexcept { - u64 array_slot = -1; + std::lock_guard lock(s_cpu_lock); - if (!restore && !cpu_array_sema.try_inc(sizeof(cpu_counter::cpu_array_bits) * 8)) - { - sys_log.fatal("Too many threads."); - return array_slot; - } + u32 id = -1; - for (u32 i = 0;; i = (i + 1) % ::size32(cpu_array_bits)) + for (u64 i = 0;; i++) { - const auto [bits, ok] = cpu_array_bits[i].fetch_op([](u64& bits) -> u64 + const auto [bits, ok] = s_cpu_bits.fetch_op([](u64& bits) -> u64 { if (~bits) [[likely]] { @@ -298,101 +293,89 @@ struct cpu_counter if (ok) [[likely]] { // Get actual slot number - array_slot = i * 64 + std::countr_one(bits); + id = std::countr_one(bits); // Register thread - if (cpu_array[array_slot].compare_and_swap_test(nullptr, _this)) [[likely]] + if (s_cpu_list[id].compare_and_swap_test(nullptr, _this)) [[likely]] { break; } - sys_log.fatal("Unexpected slot registration failure (%u).", array_slot); - cpu_array_bits[array_slot / 64] &= ~(1ull << (array_slot % 64)); + sys_log.fatal("Unexpected slot registration failure (%u).", id); + id = -1; continue; } + + if (i > 50) + { + sys_log.fatal("Too many threads."); + return; + } + + busy_wait(300); } - if (!restore) - { - // First time (thread created) - _this->state += cpu_flag::wait; - cpu_suspend_lock.lock_unlock(); - } - - return array_slot; + s_tls_thread_slot = id; } - void remove(cpu_thread* _this, u64 slot) noexcept + void remove(cpu_thread* _this) noexcept { // Unregister and wait if necessary - _this->state += cpu_flag::wait; + verify(HERE), _this->state & cpu_flag::wait; - if (slot >= std::size(cpu_array)) + u32 slot = s_tls_thread_slot; + + if (slot >= std::size(s_cpu_list)) { sys_log.fatal("Index out of bounds (%u)." HERE, slot); return; } - std::lock_guard lock(cpu_suspend_lock); + std::lock_guard lock(s_cpu_lock); - if (!cpu_array[slot].compare_and_swap_test(_this, nullptr)) + if (!s_cpu_list[slot].compare_and_swap_test(_this, nullptr)) { sys_log.fatal("Inconsistency for array slot %u", slot); return; } - cpu_array_bits[slot / 64] &= ~(1ull << (slot % 64)); - cpu_array_sema--; + s_cpu_bits &= ~(1ull << (slot % 64)); + + s_tls_thread_slot = -1; } - // Remove temporarily - void remove(cpu_thread* _this) noexcept + template + u64 for_all_cpu(/*mutable*/ u64 copy, F func) noexcept { - // Unregister temporarily (called from check_state) - const u64 index = s_tls_thread_slot; - - if (index >= std::size(cpu_array)) + for (u64 bits = copy; bits; bits &= bits - 1) { - sys_log.fatal("Index out of bounds (%u)." HERE, index); - return; - } + const u32 index = std::countr_zero(bits); - if (cpu_array[index].load() == _this && cpu_array[index].compare_and_swap_test(_this, nullptr)) - { - cpu_array_bits[index / 64] &= ~(1ull << (index % 64)); - return; - } - - sys_log.fatal("Thread not found in cpu_array (%s).", _this->get_name()); - } -}; - -template -void for_all_cpu(F func) noexcept -{ - const auto ctr = g_fxo->get(); - - for (u32 i = 0; i < ::size32(ctr->cpu_array_bits); i++) - { - for (u64 bits = (UseCopy ? ctr->cpu_copy_bits[i] : ctr->cpu_array_bits[i].load()); bits; bits &= bits - 1) - { - const u64 index = i * 64 + std::countr_zero(bits); - - if (cpu_thread* cpu = ctr->cpu_array[index].load()) + if (cpu_thread* cpu = s_cpu_list[index].load()) { - if constexpr (std::is_invocable_v) + if constexpr (std::is_invocable_v) { - func(cpu, index); + if (!func(cpu, index)) + copy &= ~(1ull << index); continue; } if constexpr (std::is_invocable_v) { - func(cpu); + if (!func(cpu)) + copy &= ~(1ull << index); continue; } + + sys_log.fatal("cpu_counter::for_all_cpu: bad callback"); + } + else + { + copy &= ~(1ull << index); } } + + return copy; } } @@ -424,7 +407,7 @@ void cpu_thread::operator()() } } - while (!g_fxo->get() && !g_fxo->get()) + while (!g_fxo->get()) { // Can we have a little race, right? First thread is started concurrently with g_fxo->init() std::this_thread::sleep_for(1ms); @@ -450,12 +433,7 @@ void cpu_thread::operator()() } // Register thread in g_cpu_array - s_tls_thread_slot = g_fxo->get()->add(this); - - if (s_tls_thread_slot == umax) - { - return; - } + s_cpu_counter++; atomic_wait_engine::set_notify_callback([](const void*, u64 progress) { @@ -521,9 +499,12 @@ void cpu_thread::operator()() g_tls_log_control = [](const char*, u64){}; - g_fxo->get()->remove(_this, s_tls_thread_slot); + if (s_tls_thread_slot != umax) + { + cpu_counter::remove(_this); + } - s_tls_thread_slot = -1; + s_cpu_counter--; g_tls_current_cpu_thread = nullptr; @@ -686,11 +667,10 @@ bool cpu_thread::check_state() noexcept if (escape) { - if (s_tls_thread_slot == umax) + if (s_tls_thread_slot == umax && !retval) { // Restore thread in the suspend list - std::lock_guard lock(g_fxo->get()->cpu_suspend_lock); - s_tls_thread_slot = g_fxo->get()->add(this, true); + cpu_counter::add(this); } verify(HERE), cpu_can_stop || !retval; @@ -705,9 +685,7 @@ bool cpu_thread::check_state() noexcept if (s_tls_thread_slot != umax) { // Exclude inactive threads from the suspend list (optimization) - std::lock_guard lock(g_fxo->get()->cpu_suspend_lock); - g_fxo->get()->remove(this); - s_tls_thread_slot = -1; + cpu_counter::remove(this); } continue; @@ -841,16 +819,10 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept // Can't allow pre-set wait bit (it'd be a problem) verify(HERE), !_this || !(_this->state & cpu_flag::wait); - // cpu_counter object - const auto ctr = g_fxo->get(); - - // Try to push workload - auto& queue = ctr->cpu_suspend_work; - do { // Load current head - next = queue.load(); + next = s_pushed.load(); if (!next && cancel_if_not_suspended) [[unlikely]] { @@ -861,11 +833,11 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept if (!_this && next) { // If _this == nullptr, it only works if this is the first workload pushed - ctr->cpu_suspend_lock.lock_unlock(); + s_cpu_lock.lock_unlock(); continue; } } - while (!queue.compare_and_swap_test(next, this)); + while (!s_pushed.compare_and_swap_test(next, this)); if (!next) { @@ -873,54 +845,53 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept perf_meter<"SUSPEND"_u64> perf0; // First thread to push the work to the workload list pauses all threads and processes it - std::lock_guard lock(ctr->cpu_suspend_lock); + std::lock_guard lock(s_cpu_lock); + + u64 copy = s_cpu_bits.load(); // Try to prefetch cpu->state earlier - for_all_cpu([&](cpu_thread* cpu) + copy = cpu_counter::for_all_cpu(copy, [&](cpu_thread* cpu) { if (cpu != _this) { _m_prefetchw(&cpu->state); + return true; } + + return false; }); // Initialization (first increment) g_suspend_counter += 2; - // Copy of thread bits - decltype(ctr->cpu_copy_bits) copy2{}; + // Copy snapshot for finalization + u64 copy2 = copy; - for (u32 i = 0; i < ::size32(ctr->cpu_copy_bits); i++) + copy = cpu_counter::for_all_cpu(copy, [&](cpu_thread* cpu, u32 index) { - copy2[i] = ctr->cpu_copy_bits[i] = ctr->cpu_array_bits[i].load(); - } - - for_all_cpu([&](cpu_thread* cpu, u64 index) - { - if (cpu == _this || cpu->state.fetch_add(cpu_flag::pause) & cpu_flag::wait) + if (cpu->state.fetch_add(cpu_flag::pause) & cpu_flag::wait) { // Clear bits as long as wait flag is set - ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64)); + return false; } - if (cpu == _this) - { - copy2[index / 64] &= ~(1ull << (index % 64)); - } + return true; }); - while (true) + while (copy) { // Check only CPUs which haven't acknowledged their waiting state yet - for_all_cpu([&](cpu_thread* cpu, u64 index) + copy = cpu_counter::for_all_cpu(copy, [&](cpu_thread* cpu, u32 index) { if (cpu->state & cpu_flag::wait) { - ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64)); + return false; } + + return true; }); - if (!std::accumulate(std::begin(ctr->cpu_copy_bits), std::end(ctr->cpu_copy_bits), u64{0}, std::bit_or())) + if (!copy) { break; } @@ -932,7 +903,7 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept g_suspend_counter++; // Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?) - auto* head = queue.exchange(nullptr); + auto* head = s_pushed.exchange(nullptr); u8 min_prio = head->prio; u8 max_prio = head->prio; @@ -964,9 +935,10 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept } } - for_all_cpu([&](cpu_thread* cpu) + cpu_counter::for_all_cpu(copy2, [&](cpu_thread* cpu) { _m_prefetchw(&cpu->state); + return true; }); // Execute all stored workload @@ -986,12 +958,10 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept // Finalization (last increment) verify(HERE), g_suspend_counter++ & 1; - // Exact bitset for flag pause removal - std::memcpy(ctr->cpu_copy_bits, copy2, sizeof(copy2)); - - for_all_cpu([&](cpu_thread* cpu) + cpu_counter::for_all_cpu(copy2, [&](cpu_thread* cpu) { cpu->state -= cpu_flag::pause; + return true; }); } else @@ -1030,15 +1000,13 @@ void cpu_thread::stop_all() noexcept sys_log.notice("All CPU threads have been signaled."); - while (g_fxo->get()->cpu_array_sema) + while (s_cpu_counter) { - std::this_thread::sleep_for(10ms); + std::this_thread::sleep_for(1ms); } sys_log.notice("All CPU threads have been stopped. [+: %u]", +g_threads_created); - std::lock_guard lock(g_fxo->get()->cpu_suspend_lock); - g_threads_deleted -= g_threads_created.load(); g_threads_created = 0; }