diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index c3315ac26f..f7c70edee8 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -3,13 +3,13 @@ #pragma once +#include +#include #include #include #include #include -#include "Common/Event.h" -#include "Common/Flag.h" #include "Common/Thread.h" // A thread that executes the given function for every item placed into its queue. @@ -26,120 +26,104 @@ public: Reset(std::move(function)); } ~WorkQueueThread() { Shutdown(); } + + // Shuts the current work thread down (if any) and starts a new thread with the given function + // Note: Some consumers of this API push items to the queue before starting the thread. void Reset(std::function function) { Shutdown(); std::lock_guard lg(m_lock); - m_cancelled = false; + m_shutdown = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); } + // Adds an item to the work queue template void EmplaceItem(Args&&... args) { std::lock_guard lg(m_lock); if (m_shutdown) - { return; - } + m_items.emplace(std::forward(args)...); m_idle = false; m_worker_cond_var.notify_one(); } + // Adds an item to the work queue void Push(T&& item) { std::lock_guard lg(m_lock); - if (m_cancelled) - { + if (m_shutdown) return; - } + m_items.push(item); m_idle = false; m_worker_cond_var.notify_one(); } + // Adds an item to the work queue void Push(const T& item) { - std::lock_guard lg(m_lock); - if (m_cancelled) - { - return; - } - m_items.push(item); - m_idle = false; - m_worker_cond_var.notify_one(); + std::lock_guard lg(m_lock); + if (m_shutdown) + return; + + m_items.push(item); + m_idle = false; + m_worker_cond_var.notify_one(); } - void Clear() + // Empties the queue + // If the worker polls IsCanceling(), it can abort it's work when Cancelling + void Cancel() { - std::lock_guard lg(m_lock); + std::unique_lock lg(m_lock); + if (m_shutdown) + return; + + m_cancelling = true; m_items = std::queue(); m_worker_cond_var.notify_one(); } - void Cancel() + // Tells the worker to shut down when it's queue is empty + // Blocks until the worker thread exits. + // If cancel is true, will Cancel before before telling the worker to exit + void Shutdown(bool cancel = false) { - if (!m_thread.joinable()) - { - return; - } - { std::unique_lock lg(m_lock); - m_items = std::queue(); - m_cancelled = true; + if (m_shutdown || !m_thread.joinable()) + return; + + if (cancel) + { + m_cancelling = true; + m_items = std::queue(); + } + m_shutdown = true; m_worker_cond_var.notify_one(); } + m_thread.join(); } - void Shutdown() + // Blocks until all items in the queue have been processed (or cancelled) + void WaitForCompletion() { - if (!m_thread.joinable()) - { - return; - } - - { - std::unique_lock lg(m_lock); - m_shutdown = true; - m_worker_cond_var.notify_one(); - } - m_thread.join(); - } - - // Doesn't return until the most recent function invocation has finished. - void ClearAndFlush() - { - if (!m_thread.joinable()) - { - return; - } - std::unique_lock lg(m_lock); - m_items = std::queue(); - m_wait_cond_var.wait(lg, [&] { - return m_idle; - }); - } - - // Doesn't return until the most recent function invocation has finished. - void Flush() - { - if (!m_thread.joinable()) - { + if (m_idle) // Only check m_idle, we want this to work even another thread called Shutdown return; - } - std::unique_lock lg(m_lock); - m_wait_cond_var.wait(lg, [&] { - return m_idle; - }); + m_wait_cond_var.wait(lg, [&] { return m_idle; }); } + // For the worker to check if it should abort it's work early. + bool IsCancelling() const { return m_cancelling.load(); } + private: void ThreadLoop() { @@ -151,14 +135,13 @@ private: if (m_items.empty()) { m_idle = true; + m_cancelling = false; m_wait_cond_var.notify_all(); - m_worker_cond_var.wait(lg, [&] { - return m_shutdown || !m_items.empty(); - }); + m_worker_cond_var.wait( + lg, [&] { return m_shutdown || m_cancelling.load() || !m_items.empty(); }); + if (m_shutdown) - { break; - } continue; } T item{std::move(m_items.front())}; @@ -176,9 +159,9 @@ private: std::queue m_items; std::condition_variable m_wait_cond_var; std::condition_variable m_worker_cond_var; + std::atomic m_cancelling = false; bool m_idle = true; bool m_shutdown = false; - bool m_cancelled = false; }; } // namespace Common diff --git a/Source/Core/DolphinQt/GameList/GameTracker.cpp b/Source/Core/DolphinQt/GameList/GameTracker.cpp index 7ee0c04e3a..6298bbdca1 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.cpp +++ b/Source/Core/DolphinQt/GameList/GameTracker.cpp @@ -37,7 +37,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) connect(qApp, &QApplication::aboutToQuit, this, [this] { m_processing_halted = true; - m_load_thread.Cancel(); + m_load_thread.Shutdown(true); }); connect(this, &QFileSystemWatcher::directoryChanged, this, &GameTracker::UpdateDirectory); connect(this, &QFileSystemWatcher::fileChanged, this, &GameTracker::UpdateFile); @@ -203,7 +203,7 @@ void GameTracker::RemoveDirectory(const QString& dir) void GameTracker::RefreshAll() { m_processing_halted = true; - m_load_thread.Clear(); + m_load_thread.Cancel(); m_load_thread.EmplaceItem(Command{CommandType::ResumeProcessing, {}}); if (m_needs_purge)