diff --git a/Source/Core/AudioCommon/CubebStream.cpp b/Source/Core/AudioCommon/CubebStream.cpp index f4566cb57e..92ab886580 100644 --- a/Source/Core/AudioCommon/CubebStream.cpp +++ b/Source/Core/AudioCommon/CubebStream.cpp @@ -39,7 +39,7 @@ void CubebStream::StateCallback(cubeb_stream* stream, void* user_data, cubeb_sta CubebStream::CubebStream() #ifdef _WIN32 - : m_work_queue([](const std::function& func) { func(); }) + : m_work_queue("Cubeb Worker", [](const std::function& func) { func(); }) { Common::Event sync_event; m_work_queue.EmplaceItem([this, &sync_event] { diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 6039f7b3b9..be3c3198d4 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -3,12 +3,14 @@ #pragma once +#include +#include #include #include +#include +#include #include -#include "Common/Event.h" -#include "Common/Flag.h" #include "Common/Thread.h" // A thread that executes the given function for every item placed into its queue. @@ -20,89 +22,151 @@ class WorkQueueThread { public: WorkQueueThread() = default; - WorkQueueThread(std::function function) { Reset(std::move(function)); } + WorkQueueThread(const std::string_view name, std::function function) + { + Reset(name, std::move(function)); + } ~WorkQueueThread() { Shutdown(); } - void Reset(std::function function) + + // Shuts the current work thread down (if any) and starts a new thread with the given function + // Note: Some consumers of this API push items to the queue before starting the thread. + void Reset(const std::string_view name, std::function function) { Shutdown(); - m_shutdown.Clear(); - m_cancelled.Clear(); + std::lock_guard lg(m_lock); + m_thread_name = name; + m_shutdown = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); } + // Adds an item to the work queue template void EmplaceItem(Args&&... args) { - if (!m_cancelled.IsSet()) - { - std::lock_guard lg(m_lock); - m_items.emplace(std::forward(args)...); - } - m_wakeup.Set(); + std::lock_guard lg(m_lock); + if (m_shutdown) + return; + + m_items.emplace(std::forward(args)...); + m_idle = false; + m_worker_cond_var.notify_one(); } - void Clear() + // Adds an item to the work queue + void Push(T&& item) { - { - std::lock_guard lg(m_lock); - m_items = std::queue(); - } - m_wakeup.Set(); + std::lock_guard lg(m_lock); + if (m_shutdown) + return; + + m_items.push(std::move(item)); + m_idle = false; + m_worker_cond_var.notify_one(); } + // Adds an item to the work queue + void Push(const T& item) + { + std::lock_guard lg(m_lock); + if (m_shutdown) + return; + + m_items.push(item); + m_idle = false; + m_worker_cond_var.notify_one(); + } + + // Empties the queue + // If the worker polls IsCanceling(), it can abort it's work when Cancelling void Cancel() { - m_cancelled.Set(); - Clear(); - Shutdown(); + std::unique_lock lg(m_lock); + if (m_shutdown) + return; + + m_cancelling = true; + m_items = std::queue(); + m_worker_cond_var.notify_one(); } - bool IsCancelled() const { return m_cancelled.IsSet(); } - - void Shutdown() + // Tells the worker to shut down when it's queue is empty + // Blocks until the worker thread exits. + // If cancel is true, will Cancel before before telling the worker to exit + // Otherwise, all currently queued items will complete before the worker exits + void Shutdown(bool cancel = false) { - if (m_thread.joinable()) { - m_shutdown.Set(); - m_wakeup.Set(); - m_thread.join(); + std::unique_lock lg(m_lock); + if (m_shutdown || !m_thread.joinable()) + return; + + if (cancel) + { + m_cancelling = true; + m_items = std::queue(); + } + + m_shutdown = true; + m_worker_cond_var.notify_one(); } + + m_thread.join(); } + // Blocks until all items in the queue have been processed (or cancelled) + void WaitForCompletion() + { + std::unique_lock lg(m_lock); + // don't check m_shutdown, because it gets set to request a shutdown, and we want to wait until + // after the shutdown completes. + // We also check m_cancelling, because we want to ensure the worker acknowledges our cancel. + if (m_idle && !m_cancelling.load()) + return; + + m_wait_cond_var.wait(lg, [&] { return m_idle && m_cancelling.load(); }); + } + + // If the worker polls IsCanceling(), it can abort its work when Cancelling + bool IsCancelling() const { return m_cancelling.load(); } + private: void ThreadLoop() { - Common::SetCurrentThreadName("WorkQueueThread"); + Common::SetCurrentThreadName(m_thread_name.c_str()); while (true) { - m_wakeup.Wait(); - - while (true) + std::unique_lock lg(m_lock); + while (m_items.empty()) { - std::unique_lock lg(m_lock); - if (m_items.empty()) - break; - T item{std::move(m_items.front())}; - m_items.pop(); - lg.unlock(); + m_idle = true; + m_cancelling = false; + m_wait_cond_var.notify_all(); + if (m_shutdown) + return; - m_function(std::move(item)); + m_worker_cond_var.wait( + lg, [&] { return !m_items.empty() || m_shutdown || m_cancelling.load(); }); } + T item{std::move(m_items.front())}; + m_items.pop(); + lg.unlock(); - if (m_shutdown.IsSet()) - break; + m_function(std::move(item)); } } std::function m_function; + std::string m_thread_name; std::thread m_thread; - Common::Event m_wakeup; - Common::Flag m_shutdown; - Common::Flag m_cancelled; std::mutex m_lock; std::queue m_items; + std::condition_variable m_wait_cond_var; + std::condition_variable m_worker_cond_var; + std::atomic m_cancelling = false; + bool m_idle = true; + bool m_shutdown = false; }; } // namespace Common diff --git a/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp b/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp index 31de1ddb5a..e88cd6162c 100644 --- a/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp +++ b/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp @@ -200,7 +200,7 @@ CEXIMic::CEXIMic(int index) : slot(index) #ifdef _WIN32 , - m_work_queue([](const std::function& func) { func(); }) + m_work_queue("Mic Worker", [](const std::function& func) { func(); }) #endif { m_position = 0; diff --git a/Source/Core/Core/IOS/Network/IP/Top.cpp b/Source/Core/Core/IOS/Network/IP/Top.cpp index ac512be37c..27487918d0 100644 --- a/Source/Core/Core/IOS/Network/IP/Top.cpp +++ b/Source/Core/Core/IOS/Network/IP/Top.cpp @@ -65,7 +65,7 @@ enum SOResultCode : s32 NetIPTopDevice::NetIPTopDevice(Kernel& ios, const std::string& device_name) : Device(ios, device_name) { - m_work_queue.Reset([this](AsyncTask task) { + m_work_queue.Reset("Network Worker", [this](AsyncTask task) { const IPCReply reply = task.handler(); { std::lock_guard lg(m_async_reply_lock); diff --git a/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp b/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp index 1e8063a60d..8c5f80b6d4 100644 --- a/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp +++ b/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp @@ -150,7 +150,7 @@ s32 NWC24MakeUserID(u64* nwc24_id, u32 hollywood_id, u16 id_ctr, HardwareModel h NetKDRequestDevice::NetKDRequestDevice(Kernel& ios, const std::string& device_name) : Device(ios, device_name), config{ios.GetFS()}, m_dl_list{ios.GetFS()} { - m_work_queue.Reset([this](AsyncTask task) { + m_work_queue.Reset("WiiConnect24 Worker", [this](AsyncTask task) { const IPCReply reply = task.handler(); { std::lock_guard lg(m_async_reply_lock); diff --git a/Source/Core/Core/State.cpp b/Source/Core/Core/State.cpp index 1329dbe070..bbe80370f0 100644 --- a/Source/Core/Core/State.cpp +++ b/Source/Core/Core/State.cpp @@ -724,7 +724,7 @@ void Init() if (lzo_init() != LZO_E_OK) PanicAlertFmtT("Internal LZO Error - lzo_init() failed"); - s_save_thread.Reset([](CompressAndDumpState_args args) { + s_save_thread.Reset("Savestate Worker", [](CompressAndDumpState_args args) { CompressAndDumpState(args); { diff --git a/Source/Core/DolphinQt/GameList/GameTracker.cpp b/Source/Core/DolphinQt/GameList/GameTracker.cpp index 7ee0c04e3a..9428729997 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.cpp +++ b/Source/Core/DolphinQt/GameList/GameTracker.cpp @@ -37,7 +37,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) connect(qApp, &QApplication::aboutToQuit, this, [this] { m_processing_halted = true; - m_load_thread.Cancel(); + m_load_thread.Shutdown(true); }); connect(this, &QFileSystemWatcher::directoryChanged, this, &GameTracker::UpdateDirectory); connect(this, &QFileSystemWatcher::fileChanged, this, &GameTracker::UpdateFile); @@ -55,7 +55,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) m_load_thread.EmplaceItem(Command{CommandType::UpdateMetadata, {}}); }); - m_load_thread.Reset([this](Command command) { + m_load_thread.Reset("GameList Tracker", [this](Command command) { switch (command.type) { case CommandType::LoadCache: @@ -203,7 +203,7 @@ void GameTracker::RemoveDirectory(const QString& dir) void GameTracker::RefreshAll() { m_processing_halted = true; - m_load_thread.Clear(); + m_load_thread.Cancel(); m_load_thread.EmplaceItem(Command{CommandType::ResumeProcessing, {}}); if (m_needs_purge)