This commit is contained in:
Jordan Woyak 2025-05-27 18:37:58 -05:00 committed by GitHub
commit c3748b8b0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 408 additions and 26 deletions

View File

@ -494,11 +494,8 @@ void Mixer::MixerFifo::Enqueue()
0.0002984010f, 0.0002102045f, 0.0001443499f, 0.0000961509f, 0.0000616906f, 0.0000377350f,
0.0000216492f, 0.0000113187f, 0.0000050749f, 0.0000016272f};
const std::size_t head = m_queue_head.load(std::memory_order_acquire);
// Check if we run out of space in the circular queue. (rare)
std::size_t next_head = (head + 1) & GRANULE_QUEUE_MASK;
if (next_head == m_queue_tail.load(std::memory_order_acquire))
if (m_queue.Writable() == 0)
{
WARN_LOG_FMT(AUDIO,
"Granule Queue has completely filled and audio samples are being dropped. "
@ -509,30 +506,29 @@ void Mixer::MixerFifo::Enqueue()
// By preconstructing the granule window, we have the best chance of
// the compiler optimizing this loop using SIMD instructions.
const std::size_t start_index = m_next_buffer_index;
for (std::size_t i = 0; i < GRANULE_SIZE; ++i)
m_queue[head][i] = m_next_buffer[(i + start_index) & GRANULE_MASK] * GRANULE_WINDOW[i];
auto& write_head = m_queue.GetWriteHead();
m_queue_head.store(next_head, std::memory_order_release);
for (std::size_t i = 0; i < GRANULE_SIZE; ++i)
write_head[i] = m_next_buffer[(i + start_index) & GRANULE_MASK] * GRANULE_WINDOW[i];
m_queue.AdvanceWriteHead(1);
m_queue_looping.store(false, std::memory_order_relaxed);
}
void Mixer::MixerFifo::Dequeue(Granule* granule)
{
const std::size_t granule_queue_size = m_granule_queue_size.load(std::memory_order_relaxed);
const std::size_t head = m_queue_head.load(std::memory_order_acquire);
std::size_t tail = m_queue_tail.load(std::memory_order_acquire);
// Checks to see if the queue has gotten too long.
if (granule_queue_size < ((head - tail) & GRANULE_QUEUE_MASK))
const auto size = m_queue.Size();
if (size > granule_queue_size)
{
// Jump the playhead to half the queue size behind the head.
const std::size_t gap = (granule_queue_size >> 1) + 1;
tail = (head - gap) & GRANULE_QUEUE_MASK;
const std::size_t desired_size = granule_queue_size / 2;
m_queue.AdvanceReadHead(size - desired_size);
}
// Checks to see if the queue is empty.
std::size_t next_tail = (tail + 1) & GRANULE_QUEUE_MASK;
if (next_tail == head)
else if (size == 0)
{
// Only fill gaps when running to prevent stutter on pause.
const bool is_running = Core::GetState(Core::System::GetInstance()) == Core::State::Running;
@ -540,18 +536,17 @@ void Mixer::MixerFifo::Dequeue(Granule* granule)
{
// Jump the playhead to half the queue size behind the head.
// This provides smoother audio playback than suddenly stopping.
const std::size_t gap = std::max<std::size_t>(2, granule_queue_size >> 1) - 1;
next_tail = (head - gap) & GRANULE_QUEUE_MASK;
const std::size_t desired_size = std::max<std::size_t>(1, granule_queue_size / 2);
m_queue.RewindReadHead(desired_size);
m_queue_looping.store(true, std::memory_order_relaxed);
}
else
{
std::fill(granule->begin(), granule->end(), StereoPair{0.0f, 0.0f});
std::ranges::fill(*granule, StereoPair{0.0f, 0.0f});
m_queue_looping.store(false, std::memory_order_relaxed);
return;
}
}
*granule = m_queue[tail];
m_queue_tail.store(next_tail, std::memory_order_release);
*granule = m_queue.Pop();
}

View File

@ -3,14 +3,13 @@
#pragma once
#include <algorithm>
#include <array>
#include <atomic>
#include <bit>
#include <cmath>
#include "AudioCommon/SurroundDecoder.h"
#include "AudioCommon/WaveFile.h"
#include "Common/CircularQueue.h"
#include "Common/CommonTypes.h"
#include "Common/Config/Config.h"
@ -61,7 +60,6 @@ private:
class MixerFifo final
{
static constexpr std::size_t MAX_GRANULE_QUEUE_SIZE = 256;
static constexpr std::size_t GRANULE_QUEUE_MASK = MAX_GRANULE_QUEUE_SIZE - 1;
struct StereoPair final
{
@ -123,9 +121,8 @@ private:
Granule m_front, m_back;
std::atomic<std::size_t> m_granule_queue_size{20};
std::array<Granule, MAX_GRANULE_QUEUE_SIZE> m_queue;
std::atomic<std::size_t> m_queue_head{0};
std::atomic<std::size_t> m_queue_tail{0};
Common::SPSCCircularArray<Granule, MAX_GRANULE_QUEUE_SIZE> m_queue;
std::atomic<bool> m_queue_looping{false};
float m_fade_volume = 1.0;

View File

@ -20,6 +20,7 @@ add_library(common
BlockingLoop.h
Buffer.h
ChunkFile.h
CircularQueue.h
CodeBlock.h
ColorUtil.cpp
ColorUtil.h

View File

@ -0,0 +1,258 @@
// Copyright 2025 Dolphin Emulator Project
// SPDX-License-Identifier: GPL-2.0-or-later
#pragma once
#include <algorithm>
#include <array>
#include <atomic>
#include <cassert>
#include <ranges>
// An array-backed (or other storage) circular queue.
//
// Push/Pop increases/decrease the Size()
//
// Capacity() gives size of the underlying buffer.
//
// Push'ing Size() beyond Capacity() is unchecked and legal.
// It will overwrite not-yet-read data.
//
// Pop'ing Size() below zero is unchecked and *not* allowed.
//
// Write/Read heads can be manually adjusted for your various weird needs.
// AdvanceWriteHead / AdvanceReadHead / RewindReadHead
//
// Thread-safe version: SPSCCircularQueue
namespace Common
{
namespace detail
{
template <typename ElementType, typename StorageType, bool UseAtomicSize,
bool IncludeWaitFunctionality>
class CircularQueueBase
{
public:
static_assert(std::is_trivially_copyable_v<ElementType>,
"Really only sane for trivially copyable types.");
// Arguments are forwarded to the storage constructor.
template <typename... Args>
explicit CircularQueueBase(Args&&... args) : m_storage(std::forward<Args>(args)...)
{
}
// "Push" functions are only safe from the "producer" thread.
ElementType& GetWriteHead() { return m_storage[m_write_index]; }
void Push(const ElementType& value)
{
GetWriteHead() = value;
AdvanceWriteHead(1);
}
// Returns a span from the write-head to the end of the buffer.
// Elements may be written here before using AdvanceWriteHead.
std::span<ElementType> GetWriteSpan()
{
return std::span(&GetWriteHead(), m_storage.size() - m_write_index);
}
// Push elements from range.
// Will overwrite existing data if Capacity is exceeded.
template <std::ranges::range R>
requires(std::is_convertible_v<typename R::value_type, ElementType>)
void PushRange(const R& range)
{
auto input_iter = range.begin();
const std::size_t range_size = std::size(range);
std::size_t remaining = range_size;
auto output_iter = m_storage.begin() + m_write_index;
while (remaining != 0)
{
const std::size_t count = std::min<std::size_t>(m_storage.end() - output_iter, remaining);
input_iter = std::ranges::copy_n(input_iter, count, output_iter).in;
remaining -= count;
output_iter = m_storage.begin();
}
AdvanceWriteHead(range_size);
}
// May be used by the "producer" thread.
// Moves the write position and increases the size without writing any data.
void AdvanceWriteHead(std::size_t count, std::memory_order order = std::memory_order_release)
{
m_write_index = (m_write_index + count) % Capacity();
AdjustSize(count, order);
}
// "Pop" functions are only safe from the "consumer" thread.
const ElementType& GetReadHead() const { return m_storage[m_read_index]; }
ElementType Pop()
{
assert(Size() != 0);
ElementType result = GetReadHead();
AdvanceReadHead(1);
return result;
}
// Returns a span from the read-head to the end of the buffer.
// Not Size() checked. Caller must verify elements have been written.
std::span<const ElementType> GetReadSpan() const
{
return std::span(&GetReadHead(), m_storage.size() - m_read_index);
}
// Pop elements to fill the provided range.
// Does not check first if this many elements are readable.
template <std::ranges::range R>
requires(std::is_convertible_v<ElementType, typename R::value_type>)
void PopRange(R* range)
{
auto output_iter = range->begin();
const std::size_t range_size = std::size(*range);
std::size_t remaining = range_size;
auto input_iter = m_storage.begin() + m_read_index;
while (remaining != 0)
{
const std::size_t count = std::min<std::size_t>(m_storage.end() - input_iter, remaining);
output_iter = std::ranges::copy_n(input_iter, count, output_iter).out;
remaining -= count;
input_iter = m_storage.begin();
}
AdvanceReadHead(range_size);
}
// Used by the "consumer" thread to move the read position without reading any data.
// Also reduces size by the given amount.
// Use RewindReadHead for negative values.
void AdvanceReadHead(std::size_t count)
{
m_read_index = (m_read_index + count) % Capacity();
AdjustSize(0 - count, std::memory_order_relaxed);
}
// Used by the "consumer" to move the read position backwards.
// Also increases the size by the given amount.
void RewindReadHead(std::size_t count)
{
const auto capacity = Capacity();
m_read_index = (m_read_index + capacity - count % capacity) % capacity;
AdjustSize(count, std::memory_order_relaxed);
}
// Only safe from the "consumer".
void Clear() { AdvanceReadHead(Size(std::memory_order_relaxed)); }
// The following are safe from any thread.
[[nodiscard]] bool Empty(std::memory_order order = std::memory_order_acquire) const
{
return Size(order) == 0;
}
// The size of the underlying storage (the size of the ring buffer).
[[nodiscard]] std::size_t Capacity() const { return m_storage.size(); }
// The number of Push'd elements yet to be Pop'd.
// Exceeding the Capacity which will cause data to be overwritten and read multiple times.
[[nodiscard]] std::size_t Size(std::memory_order order = std::memory_order_acquire) const
{
if constexpr (UseAtomicSize)
return m_size.load(order);
else
return m_size;
}
// Number of elements that can be written before reaching Capacity.
[[nodiscard]] std::size_t Writable() const
{
const auto capacity = Capacity();
return capacity - std::min(capacity, Size(std::memory_order_relaxed));
}
[[nodiscard]] std::size_t Readable() const { return Size(); }
// The following are "safe" from any thread,
// but e.g. using WaitForData from the producer would just block indefinitely.
// Blocks until size is something other than the provided value.
void WaitForSizeChange(std::size_t old_size,
std::memory_order order = std::memory_order_acquire) const
requires(IncludeWaitFunctionality)
{
m_size.wait(old_size, order);
}
// Blocks until (Size() + count) <= Capacity().
void WaitForWritable(std::size_t count = 1) const requires(IncludeWaitFunctionality)
{
assert(count <= Capacity());
std::size_t old_size = 0;
while ((old_size = Size(std::memory_order_relaxed)) > (Capacity() - count))
WaitForSizeChange(old_size, std::memory_order_relaxed);
}
// Blocks until Size() == 0.
void WaitForEmpty() const requires(IncludeWaitFunctionality) { WaitForWritable(Capacity()); }
// Blocks until Size() >= count.
void WaitForReadable(std::size_t count = 1) const requires(IncludeWaitFunctionality)
{
std::size_t old_size = 0;
while ((old_size = Size()) < count)
WaitForSizeChange(old_size);
}
private:
void AdjustSize(std::size_t count, std::memory_order order)
{
if constexpr (UseAtomicSize)
m_size.fetch_add(count, order);
else
m_size += count;
if constexpr (IncludeWaitFunctionality)
m_size.notify_one();
}
StorageType m_storage;
std::conditional_t<UseAtomicSize, std::atomic<std::size_t>, std::size_t> m_size = 0;
std::size_t m_read_index = 0;
std::size_t m_write_index = 0;
};
} // namespace detail
template <typename T, typename StorageType>
using CircularQueue = detail::CircularQueueBase<T, StorageType, false, false>;
template <typename T, std::size_t Capacity>
using CircularArray = CircularQueue<T, std::array<T, Capacity>>;
template <typename T, typename StorageType>
using SPSCCircularQueue = detail::CircularQueueBase<T, StorageType, true, false>;
template <typename T, std::size_t Capacity>
using SPSCCircularArray = SPSCCircularQueue<T, std::array<T, Capacity>>;
template <typename T, typename StorageType>
using WaitableSPSCCircularQueue = detail::CircularQueueBase<T, StorageType, true, true>;
template <typename T, std::size_t Capacity>
using WaitableSPSCCircularArray = WaitableSPSCCircularQueue<T, std::array<T, Capacity>>;
} // namespace Common

View File

@ -31,6 +31,7 @@
<ClInclude Include="Common\BlockingLoop.h" />
<ClInclude Include="Common\Buffer.h" />
<ClInclude Include="Common\ChunkFile.h" />
<ClInclude Include="Common\CircularQueue.h" />
<ClInclude Include="Common\CodeBlock.h" />
<ClInclude Include="Common\ColorUtil.h" />
<ClInclude Include="Common\Common.h" />

View File

@ -5,6 +5,7 @@ add_dolphin_test(BitUtilsTest BitUtilsTest.cpp)
add_dolphin_test(BlockingLoopTest BlockingLoopTest.cpp)
add_dolphin_test(BusyLoopTest BusyLoopTest.cpp)
add_dolphin_test(CommonFuncsTest CommonFuncsTest.cpp)
add_dolphin_test(CircularQueueTest CircularQueueTest.cpp)
add_dolphin_test(CryptoEcTest Crypto/EcTest.cpp)
add_dolphin_test(CryptoSHA1Test Crypto/SHA1Test.cpp)
add_dolphin_test(EnumFormatterTest EnumFormatterTest.cpp)

View File

@ -0,0 +1,128 @@
// Copyright 2025 Dolphin Emulator Project
// SPDX-License-Identifier: GPL-2.0-or-later
#include <gtest/gtest.h>
#include <thread>
#include "Common/CircularQueue.h"
TEST(CircularBuffer, Simple)
{
Common::CircularArray<int, 333> buffer;
EXPECT_EQ(buffer.Empty(), true);
{
Common::CircularQueue<int, std::vector<int>> vec_buffer{444};
EXPECT_EQ(vec_buffer.Size(), 0);
EXPECT_EQ(vec_buffer.Capacity(), 444);
}
constexpr int reps = 13;
constexpr int range_size = 11;
constexpr int pushpop_count = 2;
constexpr int magic_value = 7;
EXPECT_EQ(buffer.Writable(), buffer.Capacity());
for (int i = 0; i != int(buffer.Capacity()); ++i)
buffer.Push(0);
EXPECT_EQ(buffer.Writable(), 0);
// Pushing beyond capacity wraps around.
for (int i = 0; i != int(buffer.Capacity()); ++i)
buffer.Push(magic_value);
EXPECT_EQ(buffer.Writable(), 0);
for (int i = 0; i != 31; ++i)
buffer.RewindReadHead(11);
for (int i = 0; i != 31; ++i)
buffer.AdvanceReadHead(11);
EXPECT_EQ(buffer.Readable(), buffer.Capacity() * 2);
for (int i = 0; i != int(buffer.Capacity()) * 2; ++i)
{
EXPECT_EQ(buffer.GetReadHead(), magic_value);
buffer.AdvanceReadHead(1);
}
EXPECT_EQ(buffer.Empty(), true);
EXPECT_EQ(buffer.Readable(), 0);
std::array<int, range_size> fixed_data{};
std::ranges::copy_n(std::views::iota(0).begin(), range_size, fixed_data.begin());
for (int r = 0; r != reps; ++r)
{
for (int i = 0; i != pushpop_count; ++i)
{
buffer.PushRange(fixed_data);
EXPECT_EQ(buffer.Size(), (i + 1) * range_size);
}
for (int i = 0; i != pushpop_count; ++i)
{
std::array<int, range_size> data{};
buffer.PopRange(&data);
for (int c = 0; c != range_size; ++c)
EXPECT_EQ(data[c], fixed_data[c]);
}
EXPECT_EQ(buffer.Size(), 0);
}
}
TEST(CircularBuffer, MultiThreaded)
{
Common::WaitableSPSCCircularArray<int, 29> buffer;
constexpr int reps = 13;
constexpr int pushpop_count = 97;
auto inserter = [&]() {
for (int r = 0; r != reps; ++r)
{
for (int i = 0; i != pushpop_count; ++i)
{
buffer.WaitForWritable(1);
buffer.GetWriteHead() = i;
buffer.AdvanceWriteHead(1);
buffer.WaitForWritable(1);
buffer.Push(i);
buffer.WaitForWritable(1);
buffer.GetWriteSpan().front() = i;
buffer.AdvanceWriteHead(1);
}
}
};
auto popper = [&]() {
for (int r = 0; r != reps; ++r)
{
for (int i = 0; i != pushpop_count; ++i)
{
buffer.WaitForReadable(3);
// Pop works
EXPECT_EQ(buffer.Pop(), i);
// Manual read head works.
EXPECT_EQ(buffer.GetReadHead(), i);
buffer.AdvanceReadHead(1);
EXPECT_EQ(buffer.GetReadSpan().front(), i);
buffer.AdvanceReadHead(1);
}
}
EXPECT_EQ(buffer.Empty(), true);
};
std::thread popper_thread(popper);
std::thread inserter_thread(inserter);
popper_thread.join();
inserter_thread.join();
}

View File

@ -44,6 +44,7 @@
<ClCompile Include="Common\BlockingLoopTest.cpp" />
<ClCompile Include="Common\BusyLoopTest.cpp" />
<ClCompile Include="Common\CommonFuncsTest.cpp" />
<ClCompile Include="Common\CircularQueueTest.cpp" />
<ClCompile Include="Common\Crypto\EcTest.cpp" />
<ClCompile Include="Common\Crypto\SHA1Test.cpp" />
<ClCompile Include="Common\EnumFormatterTest.cpp" />