CPU: remove cpu_counter from g_fxo

Convert it to a namespace, simplify add/remove functions.
Don't add() threads just creates, they are trapped in check_state() anyway.
Don't add() threads which are leaving check_state() because stopped.
This commit is contained in:
Nekotekina 2020-11-19 09:12:01 +03:00
parent e9f7c100a0
commit 6d37bc62a5
1 changed files with 105 additions and 137 deletions

View File

@ -12,7 +12,6 @@
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <numeric>
#include <map> #include <map>
DECLARE(cpu_thread::g_threads_created){0}; DECLARE(cpu_thread::g_threads_created){0};
@ -22,7 +21,7 @@ DECLARE(cpu_thread::g_suspend_counter){0};
LOG_CHANNEL(profiler); LOG_CHANNEL(profiler);
LOG_CHANNEL(sys_log, "SYS"); LOG_CHANNEL(sys_log, "SYS");
static thread_local u64 s_tls_thread_slot = -1; static thread_local u32 s_tls_thread_slot = -1;
// Suspend counter stamp // Suspend counter stamp
static thread_local u64 s_tls_sctr = -1; static thread_local u64 s_tls_sctr = -1;
@ -251,39 +250,35 @@ using cpu_profiler = named_thread<cpu_prof>;
thread_local cpu_thread* g_tls_current_cpu_thread = nullptr; thread_local cpu_thread* g_tls_current_cpu_thread = nullptr;
struct cpu_counter // Total number of CPU threads
static atomic_t<u64, 64> s_cpu_counter{0};
// List of posted tasks for suspend_all
static atomic_t<cpu_thread::suspend_work*> s_cpu_work[64]{};
// Linked list of pushed tasks for suspend_all
static atomic_t<cpu_thread::suspend_work*> s_pushed{};
// Lock for suspend_all operations
static shared_mutex s_cpu_lock;
// Bit allocator for threads which need to be suspended
static atomic_t<u64> s_cpu_bits{};
// List of active threads which need to be suspended
static atomic_t<cpu_thread*> s_cpu_list[64]{};
namespace cpu_counter
{ {
// For synchronizing suspend_all operation void add(cpu_thread* _this) noexcept
alignas(64) shared_mutex cpu_suspend_lock;
// Workload linked list
alignas(64) atomic_t<cpu_thread::suspend_work*> cpu_suspend_work{};
// Semaphore for global thread array (global counter)
alignas(64) atomic_t<u32> cpu_array_sema{0};
// Semaphore subdivision for each array slot (64 x N in total)
alignas(64) atomic_t<u64> cpu_array_bits[3]{};
// Copy of array bits for internal use
alignas(64) u64 cpu_copy_bits[3]{};
// All registered threads
atomic_t<cpu_thread*> cpu_array[sizeof(cpu_array_bits) * 8]{};
u64 add(cpu_thread* _this, bool restore = false) noexcept
{ {
u64 array_slot = -1; std::lock_guard lock(s_cpu_lock);
if (!restore && !cpu_array_sema.try_inc(sizeof(cpu_counter::cpu_array_bits) * 8)) u32 id = -1;
{
sys_log.fatal("Too many threads.");
return array_slot;
}
for (u32 i = 0;; i = (i + 1) % ::size32(cpu_array_bits)) for (u64 i = 0;; i++)
{ {
const auto [bits, ok] = cpu_array_bits[i].fetch_op([](u64& bits) -> u64 const auto [bits, ok] = s_cpu_bits.fetch_op([](u64& bits) -> u64
{ {
if (~bits) [[likely]] if (~bits) [[likely]]
{ {
@ -298,101 +293,89 @@ struct cpu_counter
if (ok) [[likely]] if (ok) [[likely]]
{ {
// Get actual slot number // Get actual slot number
array_slot = i * 64 + std::countr_one(bits); id = std::countr_one(bits);
// Register thread // Register thread
if (cpu_array[array_slot].compare_and_swap_test(nullptr, _this)) [[likely]] if (s_cpu_list[id].compare_and_swap_test(nullptr, _this)) [[likely]]
{ {
break; break;
} }
sys_log.fatal("Unexpected slot registration failure (%u).", array_slot); sys_log.fatal("Unexpected slot registration failure (%u).", id);
cpu_array_bits[array_slot / 64] &= ~(1ull << (array_slot % 64)); id = -1;
continue; continue;
} }
if (i > 50)
{
sys_log.fatal("Too many threads.");
return;
}
busy_wait(300);
} }
if (!restore) s_tls_thread_slot = id;
{
// First time (thread created)
_this->state += cpu_flag::wait;
cpu_suspend_lock.lock_unlock();
}
return array_slot;
} }
void remove(cpu_thread* _this, u64 slot) noexcept void remove(cpu_thread* _this) noexcept
{ {
// Unregister and wait if necessary // Unregister and wait if necessary
_this->state += cpu_flag::wait; verify(HERE), _this->state & cpu_flag::wait;
if (slot >= std::size(cpu_array)) u32 slot = s_tls_thread_slot;
if (slot >= std::size(s_cpu_list))
{ {
sys_log.fatal("Index out of bounds (%u)." HERE, slot); sys_log.fatal("Index out of bounds (%u)." HERE, slot);
return; return;
} }
std::lock_guard lock(cpu_suspend_lock); std::lock_guard lock(s_cpu_lock);
if (!cpu_array[slot].compare_and_swap_test(_this, nullptr)) if (!s_cpu_list[slot].compare_and_swap_test(_this, nullptr))
{ {
sys_log.fatal("Inconsistency for array slot %u", slot); sys_log.fatal("Inconsistency for array slot %u", slot);
return; return;
} }
cpu_array_bits[slot / 64] &= ~(1ull << (slot % 64)); s_cpu_bits &= ~(1ull << (slot % 64));
cpu_array_sema--;
s_tls_thread_slot = -1;
} }
// Remove temporarily template <typename F>
void remove(cpu_thread* _this) noexcept u64 for_all_cpu(/*mutable*/ u64 copy, F func) noexcept
{ {
// Unregister temporarily (called from check_state) for (u64 bits = copy; bits; bits &= bits - 1)
const u64 index = s_tls_thread_slot;
if (index >= std::size(cpu_array))
{ {
sys_log.fatal("Index out of bounds (%u)." HERE, index); const u32 index = std::countr_zero(bits);
return;
}
if (cpu_array[index].load() == _this && cpu_array[index].compare_and_swap_test(_this, nullptr)) if (cpu_thread* cpu = s_cpu_list[index].load())
{
cpu_array_bits[index / 64] &= ~(1ull << (index % 64));
return;
}
sys_log.fatal("Thread not found in cpu_array (%s).", _this->get_name());
}
};
template <bool UseCopy = false, typename F>
void for_all_cpu(F func) noexcept
{
const auto ctr = g_fxo->get<cpu_counter>();
for (u32 i = 0; i < ::size32(ctr->cpu_array_bits); i++)
{
for (u64 bits = (UseCopy ? ctr->cpu_copy_bits[i] : ctr->cpu_array_bits[i].load()); bits; bits &= bits - 1)
{
const u64 index = i * 64 + std::countr_zero(bits);
if (cpu_thread* cpu = ctr->cpu_array[index].load())
{ {
if constexpr (std::is_invocable_v<F, cpu_thread*, u64>) if constexpr (std::is_invocable_v<F, cpu_thread*, u32>)
{ {
func(cpu, index); if (!func(cpu, index))
copy &= ~(1ull << index);
continue; continue;
} }
if constexpr (std::is_invocable_v<F, cpu_thread*>) if constexpr (std::is_invocable_v<F, cpu_thread*>)
{ {
func(cpu); if (!func(cpu))
copy &= ~(1ull << index);
continue; continue;
} }
sys_log.fatal("cpu_counter::for_all_cpu: bad callback");
}
else
{
copy &= ~(1ull << index);
} }
} }
return copy;
} }
} }
@ -424,7 +407,7 @@ void cpu_thread::operator()()
} }
} }
while (!g_fxo->get<cpu_counter>() && !g_fxo->get<cpu_profiler>()) while (!g_fxo->get<cpu_profiler>())
{ {
// Can we have a little race, right? First thread is started concurrently with g_fxo->init() // Can we have a little race, right? First thread is started concurrently with g_fxo->init()
std::this_thread::sleep_for(1ms); std::this_thread::sleep_for(1ms);
@ -450,12 +433,7 @@ void cpu_thread::operator()()
} }
// Register thread in g_cpu_array // Register thread in g_cpu_array
s_tls_thread_slot = g_fxo->get<cpu_counter>()->add(this); s_cpu_counter++;
if (s_tls_thread_slot == umax)
{
return;
}
atomic_wait_engine::set_notify_callback([](const void*, u64 progress) atomic_wait_engine::set_notify_callback([](const void*, u64 progress)
{ {
@ -521,9 +499,12 @@ void cpu_thread::operator()()
g_tls_log_control = [](const char*, u64){}; g_tls_log_control = [](const char*, u64){};
g_fxo->get<cpu_counter>()->remove(_this, s_tls_thread_slot); if (s_tls_thread_slot != umax)
{
cpu_counter::remove(_this);
}
s_tls_thread_slot = -1; s_cpu_counter--;
g_tls_current_cpu_thread = nullptr; g_tls_current_cpu_thread = nullptr;
@ -686,11 +667,10 @@ bool cpu_thread::check_state() noexcept
if (escape) if (escape)
{ {
if (s_tls_thread_slot == umax) if (s_tls_thread_slot == umax && !retval)
{ {
// Restore thread in the suspend list // Restore thread in the suspend list
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock); cpu_counter::add(this);
s_tls_thread_slot = g_fxo->get<cpu_counter>()->add(this, true);
} }
verify(HERE), cpu_can_stop || !retval; verify(HERE), cpu_can_stop || !retval;
@ -705,9 +685,7 @@ bool cpu_thread::check_state() noexcept
if (s_tls_thread_slot != umax) if (s_tls_thread_slot != umax)
{ {
// Exclude inactive threads from the suspend list (optimization) // Exclude inactive threads from the suspend list (optimization)
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock); cpu_counter::remove(this);
g_fxo->get<cpu_counter>()->remove(this);
s_tls_thread_slot = -1;
} }
continue; continue;
@ -841,16 +819,10 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
// Can't allow pre-set wait bit (it'd be a problem) // Can't allow pre-set wait bit (it'd be a problem)
verify(HERE), !_this || !(_this->state & cpu_flag::wait); verify(HERE), !_this || !(_this->state & cpu_flag::wait);
// cpu_counter object
const auto ctr = g_fxo->get<cpu_counter>();
// Try to push workload
auto& queue = ctr->cpu_suspend_work;
do do
{ {
// Load current head // Load current head
next = queue.load(); next = s_pushed.load();
if (!next && cancel_if_not_suspended) [[unlikely]] if (!next && cancel_if_not_suspended) [[unlikely]]
{ {
@ -861,11 +833,11 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
if (!_this && next) if (!_this && next)
{ {
// If _this == nullptr, it only works if this is the first workload pushed // If _this == nullptr, it only works if this is the first workload pushed
ctr->cpu_suspend_lock.lock_unlock(); s_cpu_lock.lock_unlock();
continue; continue;
} }
} }
while (!queue.compare_and_swap_test(next, this)); while (!s_pushed.compare_and_swap_test(next, this));
if (!next) if (!next)
{ {
@ -873,54 +845,53 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
perf_meter<"SUSPEND"_u64> perf0; perf_meter<"SUSPEND"_u64> perf0;
// First thread to push the work to the workload list pauses all threads and processes it // First thread to push the work to the workload list pauses all threads and processes it
std::lock_guard lock(ctr->cpu_suspend_lock); std::lock_guard lock(s_cpu_lock);
u64 copy = s_cpu_bits.load();
// Try to prefetch cpu->state earlier // Try to prefetch cpu->state earlier
for_all_cpu([&](cpu_thread* cpu) copy = cpu_counter::for_all_cpu(copy, [&](cpu_thread* cpu)
{ {
if (cpu != _this) if (cpu != _this)
{ {
_m_prefetchw(&cpu->state); _m_prefetchw(&cpu->state);
return true;
} }
return false;
}); });
// Initialization (first increment) // Initialization (first increment)
g_suspend_counter += 2; g_suspend_counter += 2;
// Copy of thread bits // Copy snapshot for finalization
decltype(ctr->cpu_copy_bits) copy2{}; u64 copy2 = copy;
for (u32 i = 0; i < ::size32(ctr->cpu_copy_bits); i++) copy = cpu_counter::for_all_cpu(copy, [&](cpu_thread* cpu, u32 index)
{ {
copy2[i] = ctr->cpu_copy_bits[i] = ctr->cpu_array_bits[i].load(); if (cpu->state.fetch_add(cpu_flag::pause) & cpu_flag::wait)
}
for_all_cpu([&](cpu_thread* cpu, u64 index)
{
if (cpu == _this || cpu->state.fetch_add(cpu_flag::pause) & cpu_flag::wait)
{ {
// Clear bits as long as wait flag is set // Clear bits as long as wait flag is set
ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64)); return false;
} }
if (cpu == _this) return true;
{
copy2[index / 64] &= ~(1ull << (index % 64));
}
}); });
while (true) while (copy)
{ {
// Check only CPUs which haven't acknowledged their waiting state yet // Check only CPUs which haven't acknowledged their waiting state yet
for_all_cpu<true>([&](cpu_thread* cpu, u64 index) copy = cpu_counter::for_all_cpu(copy, [&](cpu_thread* cpu, u32 index)
{ {
if (cpu->state & cpu_flag::wait) if (cpu->state & cpu_flag::wait)
{ {
ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64)); return false;
} }
return true;
}); });
if (!std::accumulate(std::begin(ctr->cpu_copy_bits), std::end(ctr->cpu_copy_bits), u64{0}, std::bit_or())) if (!copy)
{ {
break; break;
} }
@ -932,7 +903,7 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
g_suspend_counter++; g_suspend_counter++;
// Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?) // Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?)
auto* head = queue.exchange(nullptr); auto* head = s_pushed.exchange(nullptr);
u8 min_prio = head->prio; u8 min_prio = head->prio;
u8 max_prio = head->prio; u8 max_prio = head->prio;
@ -964,9 +935,10 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
} }
} }
for_all_cpu<true>([&](cpu_thread* cpu) cpu_counter::for_all_cpu(copy2, [&](cpu_thread* cpu)
{ {
_m_prefetchw(&cpu->state); _m_prefetchw(&cpu->state);
return true;
}); });
// Execute all stored workload // Execute all stored workload
@ -986,12 +958,10 @@ bool cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
// Finalization (last increment) // Finalization (last increment)
verify(HERE), g_suspend_counter++ & 1; verify(HERE), g_suspend_counter++ & 1;
// Exact bitset for flag pause removal cpu_counter::for_all_cpu(copy2, [&](cpu_thread* cpu)
std::memcpy(ctr->cpu_copy_bits, copy2, sizeof(copy2));
for_all_cpu<true>([&](cpu_thread* cpu)
{ {
cpu->state -= cpu_flag::pause; cpu->state -= cpu_flag::pause;
return true;
}); });
} }
else else
@ -1030,15 +1000,13 @@ void cpu_thread::stop_all() noexcept
sys_log.notice("All CPU threads have been signaled."); sys_log.notice("All CPU threads have been signaled.");
while (g_fxo->get<cpu_counter>()->cpu_array_sema) while (s_cpu_counter)
{ {
std::this_thread::sleep_for(10ms); std::this_thread::sleep_for(1ms);
} }
sys_log.notice("All CPU threads have been stopped. [+: %u]", +g_threads_created); sys_log.notice("All CPU threads have been stopped. [+: %u]", +g_threads_created);
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
g_threads_deleted -= g_threads_created.load(); g_threads_deleted -= g_threads_created.load();
g_threads_created = 0; g_threads_created = 0;
} }