From c2f6f6a4dc9d85016145519f296fe8d01703dee8 Mon Sep 17 00:00:00 2001 From: Jordan Woyak Date: Sun, 16 Mar 2025 17:53:56 -0500 Subject: [PATCH 1/2] Common: Add thread-safe CircularQueue template. --- Source/Core/Common/CMakeLists.txt | 1 + Source/Core/Common/CircularQueue.h | 258 ++++++++++++++++++ Source/Core/DolphinLib.props | 1 + Source/UnitTests/Common/CMakeLists.txt | 1 + Source/UnitTests/Common/CircularQueueTest.cpp | 128 +++++++++ Source/UnitTests/UnitTests.vcxproj | 1 + 6 files changed, 390 insertions(+) create mode 100644 Source/Core/Common/CircularQueue.h create mode 100644 Source/UnitTests/Common/CircularQueueTest.cpp diff --git a/Source/Core/Common/CMakeLists.txt b/Source/Core/Common/CMakeLists.txt index aca142b9cd..a9006e69ff 100644 --- a/Source/Core/Common/CMakeLists.txt +++ b/Source/Core/Common/CMakeLists.txt @@ -20,6 +20,7 @@ add_library(common BlockingLoop.h Buffer.h ChunkFile.h + CircularQueue.h CodeBlock.h ColorUtil.cpp ColorUtil.h diff --git a/Source/Core/Common/CircularQueue.h b/Source/Core/Common/CircularQueue.h new file mode 100644 index 0000000000..a4cc305f85 --- /dev/null +++ b/Source/Core/Common/CircularQueue.h @@ -0,0 +1,258 @@ +// Copyright 2025 Dolphin Emulator Project +// SPDX-License-Identifier: GPL-2.0-or-later + +#pragma once + +#include +#include +#include +#include +#include + +// An array-backed (or other storage) circular queue. +// +// Push/Pop increases/decrease the Size() +// +// Capacity() gives size of the underlying buffer. +// +// Push'ing Size() beyond Capacity() is unchecked and legal. +// It will overwrite not-yet-read data. +// +// Pop'ing Size() below zero is unchecked and *not* allowed. +// +// Write/Read heads can be manually adjusted for your various weird needs. +// AdvanceWriteHead / AdvanceReadHead / RewindReadHead +// +// Thread-safe version: SPSCCircularQueue + +namespace Common +{ + +namespace detail +{ + +template +class CircularQueueBase +{ +public: + static_assert(std::is_trivially_copyable_v, + "Really only sane for trivially copyable types."); + + // Arguments are forwarded to the storage constructor. + template + explicit CircularQueueBase(Args&&... args) : m_storage(std::forward(args)...) + { + } + + // "Push" functions are only safe from the "producer" thread. + ElementType& GetWriteHead() { return m_storage[m_write_index]; } + + void Push(const ElementType& value) + { + GetWriteHead() = value; + AdvanceWriteHead(1); + } + + // Returns a span from the write-head to the end of the buffer. + // Elements may be written here before using AdvanceWriteHead. + std::span GetWriteSpan() + { + return std::span(&GetWriteHead(), m_storage.size() - m_write_index); + } + + // Push elements from range. + // Will overwrite existing data if Capacity is exceeded. + template + requires(std::is_convertible_v) + void PushRange(const R& range) + { + auto input_iter = range.begin(); + const std::size_t range_size = std::size(range); + std::size_t remaining = range_size; + auto output_iter = m_storage.begin() + m_write_index; + + while (remaining != 0) + { + const std::size_t count = std::min(m_storage.end() - output_iter, remaining); + input_iter = std::ranges::copy_n(input_iter, count, output_iter).in; + + remaining -= count; + output_iter = m_storage.begin(); + } + + AdvanceWriteHead(range_size); + } + + // May be used by the "producer" thread. + // Moves the write position and increases the size without writing any data. + void AdvanceWriteHead(std::size_t count, std::memory_order order = std::memory_order_release) + { + m_write_index = (m_write_index + count) % Capacity(); + AdjustSize(count, order); + } + + // "Pop" functions are only safe from the "consumer" thread. + const ElementType& GetReadHead() const { return m_storage[m_read_index]; } + + ElementType Pop() + { + assert(Size() != 0); + + ElementType result = GetReadHead(); + AdvanceReadHead(1); + return result; + } + + // Returns a span from the read-head to the end of the buffer. + // Not Size() checked. Caller must verify elements have been written. + std::span GetReadSpan() const + { + return std::span(&GetReadHead(), m_storage.size() - m_read_index); + } + + // Pop elements to fill the provided range. + // Does not check first if this many elements are readable. + template + requires(std::is_convertible_v) + void PopRange(R* range) + { + auto output_iter = range->begin(); + const std::size_t range_size = std::size(*range); + std::size_t remaining = range_size; + auto input_iter = m_storage.begin() + m_read_index; + + while (remaining != 0) + { + const std::size_t count = std::min(m_storage.end() - input_iter, remaining); + output_iter = std::ranges::copy_n(input_iter, count, output_iter).out; + + remaining -= count; + input_iter = m_storage.begin(); + } + + AdvanceReadHead(range_size); + } + + // Used by the "consumer" thread to move the read position without reading any data. + // Also reduces size by the given amount. + // Use RewindReadHead for negative values. + void AdvanceReadHead(std::size_t count) + { + m_read_index = (m_read_index + count) % Capacity(); + AdjustSize(0 - count, std::memory_order_relaxed); + } + + // Used by the "consumer" to move the read position backwards. + // Also increases the size by the given amount. + void RewindReadHead(std::size_t count) + { + const auto capacity = Capacity(); + m_read_index = (m_read_index + capacity - count % capacity) % capacity; + AdjustSize(count, std::memory_order_relaxed); + } + + // Only safe from the "consumer". + void Clear() { AdvanceReadHead(Size(std::memory_order_relaxed)); } + + // The following are safe from any thread. + [[nodiscard]] bool Empty(std::memory_order order = std::memory_order_acquire) const + { + return Size(order) == 0; + } + + // The size of the underlying storage (the size of the ring buffer). + [[nodiscard]] std::size_t Capacity() const { return m_storage.size(); } + + // The number of Push'd elements yet to be Pop'd. + // Exceeding the Capacity which will cause data to be overwritten and read multiple times. + [[nodiscard]] std::size_t Size(std::memory_order order = std::memory_order_acquire) const + { + if constexpr (UseAtomicSize) + return m_size.load(order); + else + return m_size; + } + + // Number of elements that can be written before reaching Capacity. + [[nodiscard]] std::size_t Writable() const + { + const auto capacity = Capacity(); + return capacity - std::min(capacity, Size(std::memory_order_relaxed)); + } + + [[nodiscard]] std::size_t Readable() const { return Size(); } + + // The following are "safe" from any thread, + // but e.g. using WaitForData from the producer would just block indefinitely. + + // Blocks until size is something other than the provided value. + void WaitForSizeChange(std::size_t old_size, + std::memory_order order = std::memory_order_acquire) const + requires(IncludeWaitFunctionality) + { + m_size.wait(old_size, order); + } + + // Blocks until (Size() + count) <= Capacity(). + void WaitForWritable(std::size_t count = 1) const requires(IncludeWaitFunctionality) + { + assert(count <= Capacity()); + + std::size_t old_size = 0; + while ((old_size = Size(std::memory_order_relaxed)) > (Capacity() - count)) + WaitForSizeChange(old_size, std::memory_order_relaxed); + } + + // Blocks until Size() == 0. + void WaitForEmpty() const requires(IncludeWaitFunctionality) { WaitForWritable(Capacity()); } + + // Blocks until Size() >= count. + void WaitForReadable(std::size_t count = 1) const requires(IncludeWaitFunctionality) + { + std::size_t old_size = 0; + while ((old_size = Size()) < count) + WaitForSizeChange(old_size); + } + +private: + void AdjustSize(std::size_t count, std::memory_order order) + { + if constexpr (UseAtomicSize) + m_size.fetch_add(count, order); + else + m_size += count; + + if constexpr (IncludeWaitFunctionality) + m_size.notify_one(); + } + + StorageType m_storage; + + std::conditional_t, std::size_t> m_size = 0; + + std::size_t m_read_index = 0; + std::size_t m_write_index = 0; +}; + +} // namespace detail + +template +using CircularQueue = detail::CircularQueueBase; + +template +using CircularArray = CircularQueue>; + +template +using SPSCCircularQueue = detail::CircularQueueBase; + +template +using SPSCCircularArray = SPSCCircularQueue>; + +template +using WaitableSPSCCircularQueue = detail::CircularQueueBase; + +template +using WaitableSPSCCircularArray = WaitableSPSCCircularQueue>; + +} // namespace Common diff --git a/Source/Core/DolphinLib.props b/Source/Core/DolphinLib.props index 57d0465959..f1a417c956 100644 --- a/Source/Core/DolphinLib.props +++ b/Source/Core/DolphinLib.props @@ -28,6 +28,7 @@ + diff --git a/Source/UnitTests/Common/CMakeLists.txt b/Source/UnitTests/Common/CMakeLists.txt index 3b0ffe79e6..c1e1036c07 100644 --- a/Source/UnitTests/Common/CMakeLists.txt +++ b/Source/UnitTests/Common/CMakeLists.txt @@ -5,6 +5,7 @@ add_dolphin_test(BitUtilsTest BitUtilsTest.cpp) add_dolphin_test(BlockingLoopTest BlockingLoopTest.cpp) add_dolphin_test(BusyLoopTest BusyLoopTest.cpp) add_dolphin_test(CommonFuncsTest CommonFuncsTest.cpp) +add_dolphin_test(CircularQueueTest CircularQueueTest.cpp) add_dolphin_test(CryptoEcTest Crypto/EcTest.cpp) add_dolphin_test(CryptoSHA1Test Crypto/SHA1Test.cpp) add_dolphin_test(EnumFormatterTest EnumFormatterTest.cpp) diff --git a/Source/UnitTests/Common/CircularQueueTest.cpp b/Source/UnitTests/Common/CircularQueueTest.cpp new file mode 100644 index 0000000000..6f0791d06c --- /dev/null +++ b/Source/UnitTests/Common/CircularQueueTest.cpp @@ -0,0 +1,128 @@ +// Copyright 2025 Dolphin Emulator Project +// SPDX-License-Identifier: GPL-2.0-or-later + +#include +#include + +#include "Common/CircularQueue.h" + +TEST(CircularBuffer, Simple) +{ + Common::CircularArray buffer; + + EXPECT_EQ(buffer.Empty(), true); + + { + Common::CircularQueue> vec_buffer{444}; + EXPECT_EQ(vec_buffer.Size(), 0); + EXPECT_EQ(vec_buffer.Capacity(), 444); + } + + constexpr int reps = 13; + constexpr int range_size = 11; + constexpr int pushpop_count = 2; + + constexpr int magic_value = 7; + + EXPECT_EQ(buffer.Writable(), buffer.Capacity()); + + for (int i = 0; i != int(buffer.Capacity()); ++i) + buffer.Push(0); + + EXPECT_EQ(buffer.Writable(), 0); + + // Pushing beyond capacity wraps around. + for (int i = 0; i != int(buffer.Capacity()); ++i) + buffer.Push(magic_value); + + EXPECT_EQ(buffer.Writable(), 0); + + for (int i = 0; i != 31; ++i) + buffer.RewindReadHead(11); + for (int i = 0; i != 31; ++i) + buffer.AdvanceReadHead(11); + + EXPECT_EQ(buffer.Readable(), buffer.Capacity() * 2); + + for (int i = 0; i != int(buffer.Capacity()) * 2; ++i) + { + EXPECT_EQ(buffer.GetReadHead(), magic_value); + buffer.AdvanceReadHead(1); + } + + EXPECT_EQ(buffer.Empty(), true); + EXPECT_EQ(buffer.Readable(), 0); + + std::array fixed_data{}; + std::ranges::copy_n(std::views::iota(0).begin(), range_size, fixed_data.begin()); + + for (int r = 0; r != reps; ++r) + { + for (int i = 0; i != pushpop_count; ++i) + { + buffer.PushRange(fixed_data); + EXPECT_EQ(buffer.Size(), (i + 1) * range_size); + } + + for (int i = 0; i != pushpop_count; ++i) + { + std::array data{}; + buffer.PopRange(&data); + for (int c = 0; c != range_size; ++c) + EXPECT_EQ(data[c], fixed_data[c]); + } + + EXPECT_EQ(buffer.Size(), 0); + } +} + +TEST(CircularBuffer, MultiThreaded) +{ + Common::WaitableSPSCCircularArray buffer; + + constexpr int reps = 13; + constexpr int pushpop_count = 97; + + auto inserter = [&]() { + for (int r = 0; r != reps; ++r) + { + for (int i = 0; i != pushpop_count; ++i) + { + buffer.WaitForWritable(1); + buffer.GetWriteHead() = i; + buffer.AdvanceWriteHead(1); + + buffer.WaitForWritable(1); + buffer.Push(i); + + buffer.WaitForWritable(1); + buffer.GetWriteSpan().front() = i; + buffer.AdvanceWriteHead(1); + } + } + }; + + auto popper = [&]() { + for (int r = 0; r != reps; ++r) + { + for (int i = 0; i != pushpop_count; ++i) + { + buffer.WaitForReadable(3); + // Pop works + EXPECT_EQ(buffer.Pop(), i); + // Manual read head works. + EXPECT_EQ(buffer.GetReadHead(), i); + buffer.AdvanceReadHead(1); + EXPECT_EQ(buffer.GetReadSpan().front(), i); + buffer.AdvanceReadHead(1); + } + } + EXPECT_EQ(buffer.Empty(), true); + }; + + std::thread popper_thread(popper); + std::thread inserter_thread(inserter); + + popper_thread.join(); + inserter_thread.join(); +} diff --git a/Source/UnitTests/UnitTests.vcxproj b/Source/UnitTests/UnitTests.vcxproj index b5e5256915..f7ad5621a0 100644 --- a/Source/UnitTests/UnitTests.vcxproj +++ b/Source/UnitTests/UnitTests.vcxproj @@ -44,6 +44,7 @@ + From e04e98f3f126857ea76c13aa13149c86379f8f1c Mon Sep 17 00:00:00 2001 From: Jordan Woyak Date: Tue, 6 May 2025 01:59:47 -0500 Subject: [PATCH 2/2] AudioCommon: Make Mixer use Common::SPSCCircularArray. --- Source/Core/AudioCommon/Mixer.cpp | 35 +++++++++++++------------------ Source/Core/AudioCommon/Mixer.h | 9 +++----- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/Source/Core/AudioCommon/Mixer.cpp b/Source/Core/AudioCommon/Mixer.cpp index b52371d994..386666330b 100644 --- a/Source/Core/AudioCommon/Mixer.cpp +++ b/Source/Core/AudioCommon/Mixer.cpp @@ -494,11 +494,8 @@ void Mixer::MixerFifo::Enqueue() 0.0002984010f, 0.0002102045f, 0.0001443499f, 0.0000961509f, 0.0000616906f, 0.0000377350f, 0.0000216492f, 0.0000113187f, 0.0000050749f, 0.0000016272f}; - const std::size_t head = m_queue_head.load(std::memory_order_acquire); - // Check if we run out of space in the circular queue. (rare) - std::size_t next_head = (head + 1) & GRANULE_QUEUE_MASK; - if (next_head == m_queue_tail.load(std::memory_order_acquire)) + if (m_queue.Writable() == 0) { WARN_LOG_FMT(AUDIO, "Granule Queue has completely filled and audio samples are being dropped. " @@ -509,30 +506,29 @@ void Mixer::MixerFifo::Enqueue() // By preconstructing the granule window, we have the best chance of // the compiler optimizing this loop using SIMD instructions. const std::size_t start_index = m_next_buffer_index; - for (std::size_t i = 0; i < GRANULE_SIZE; ++i) - m_queue[head][i] = m_next_buffer[(i + start_index) & GRANULE_MASK] * GRANULE_WINDOW[i]; + auto& write_head = m_queue.GetWriteHead(); - m_queue_head.store(next_head, std::memory_order_release); + for (std::size_t i = 0; i < GRANULE_SIZE; ++i) + write_head[i] = m_next_buffer[(i + start_index) & GRANULE_MASK] * GRANULE_WINDOW[i]; + + m_queue.AdvanceWriteHead(1); m_queue_looping.store(false, std::memory_order_relaxed); } void Mixer::MixerFifo::Dequeue(Granule* granule) { const std::size_t granule_queue_size = m_granule_queue_size.load(std::memory_order_relaxed); - const std::size_t head = m_queue_head.load(std::memory_order_acquire); - std::size_t tail = m_queue_tail.load(std::memory_order_acquire); // Checks to see if the queue has gotten too long. - if (granule_queue_size < ((head - tail) & GRANULE_QUEUE_MASK)) + const auto size = m_queue.Size(); + if (size > granule_queue_size) { // Jump the playhead to half the queue size behind the head. - const std::size_t gap = (granule_queue_size >> 1) + 1; - tail = (head - gap) & GRANULE_QUEUE_MASK; + const std::size_t desired_size = granule_queue_size / 2; + m_queue.AdvanceReadHead(size - desired_size); } - // Checks to see if the queue is empty. - std::size_t next_tail = (tail + 1) & GRANULE_QUEUE_MASK; - if (next_tail == head) + else if (size == 0) { // Only fill gaps when running to prevent stutter on pause. const bool is_running = Core::GetState(Core::System::GetInstance()) == Core::State::Running; @@ -540,18 +536,17 @@ void Mixer::MixerFifo::Dequeue(Granule* granule) { // Jump the playhead to half the queue size behind the head. // This provides smoother audio playback than suddenly stopping. - const std::size_t gap = std::max(2, granule_queue_size >> 1) - 1; - next_tail = (head - gap) & GRANULE_QUEUE_MASK; + const std::size_t desired_size = std::max(1, granule_queue_size / 2); + m_queue.RewindReadHead(desired_size); m_queue_looping.store(true, std::memory_order_relaxed); } else { - std::fill(granule->begin(), granule->end(), StereoPair{0.0f, 0.0f}); + std::ranges::fill(*granule, StereoPair{0.0f, 0.0f}); m_queue_looping.store(false, std::memory_order_relaxed); return; } } - *granule = m_queue[tail]; - m_queue_tail.store(next_tail, std::memory_order_release); + *granule = m_queue.Pop(); } diff --git a/Source/Core/AudioCommon/Mixer.h b/Source/Core/AudioCommon/Mixer.h index ba74c3d71b..3fd5629441 100644 --- a/Source/Core/AudioCommon/Mixer.h +++ b/Source/Core/AudioCommon/Mixer.h @@ -3,14 +3,13 @@ #pragma once -#include #include #include #include -#include #include "AudioCommon/SurroundDecoder.h" #include "AudioCommon/WaveFile.h" +#include "Common/CircularQueue.h" #include "Common/CommonTypes.h" #include "Common/Config/Config.h" @@ -61,7 +60,6 @@ private: class MixerFifo final { static constexpr std::size_t MAX_GRANULE_QUEUE_SIZE = 256; - static constexpr std::size_t GRANULE_QUEUE_MASK = MAX_GRANULE_QUEUE_SIZE - 1; struct StereoPair final { @@ -123,9 +121,8 @@ private: Granule m_front, m_back; std::atomic m_granule_queue_size{20}; - std::array m_queue; - std::atomic m_queue_head{0}; - std::atomic m_queue_tail{0}; + Common::SPSCCircularArray m_queue; + std::atomic m_queue_looping{false}; float m_fade_volume = 1.0;