From 75357caeaf12f235aa027595871f9fbf7e424c48 Mon Sep 17 00:00:00 2001 From: Joel Linn Date: Sat, 5 Mar 2022 11:45:08 +0100 Subject: [PATCH] [Base] Add TimerQueue - Cross platform functionality similar to Windows' `CreateTimerQueue` with `WT_EXECUTEINTIMERTHREAD` --- src/xenia/base/threading_timer_queue.cc | 211 ++++++++++++++++++++++++ src/xenia/base/threading_timer_queue.h | 79 +++++++++ 2 files changed, 290 insertions(+) create mode 100644 src/xenia/base/threading_timer_queue.cc create mode 100644 src/xenia/base/threading_timer_queue.h diff --git a/src/xenia/base/threading_timer_queue.cc b/src/xenia/base/threading_timer_queue.cc new file mode 100644 index 000000000..ee4e6d751 --- /dev/null +++ b/src/xenia/base/threading_timer_queue.cc @@ -0,0 +1,211 @@ +/** + ****************************************************************************** + * Xenia : Xbox 360 Emulator Research Project * + ****************************************************************************** + * Copyright 2022 Ben Vanik. All rights reserved. * + * Released under the BSD license - see LICENSE in the root for more details. * + ****************************************************************************** + */ + +#include +#include + +#include "third_party/disruptorplus/include/disruptorplus/multi_threaded_claim_strategy.hpp" +#include "third_party/disruptorplus/include/disruptorplus/ring_buffer.hpp" +#include "third_party/disruptorplus/include/disruptorplus/sequence_barrier.hpp" +#include "third_party/disruptorplus/include/disruptorplus/spin_wait.hpp" +#include "third_party/disruptorplus/include/disruptorplus/spin_wait_strategy.hpp" + +#include "xenia/base/assert.h" +#include "xenia/base/threading.h" +#include "xenia/base/threading_timer_queue.h" + +namespace dp = disruptorplus; + +namespace xe { +namespace threading { + +using WaitItem = TimerQueueWaitItem; + +class TimerQueue { + public: + using clock = WaitItem::clock; + static_assert(clock::is_steady); + + public: + TimerQueue() + : buffer_(kWaitCount), + wait_strategy_(), + claim_strategy_(kWaitCount, wait_strategy_), + consumed_(wait_strategy_), + shutdown_(false) { + claim_strategy_.add_claim_barrier(consumed_); + dispatch_thread_ = std::thread(&TimerQueue::TimerThreadMain, this); + } + + ~TimerQueue() { + shutdown_.store(true, std::memory_order_release); + + // Kick dispatch thread to check shutdown flag + auto wait_item = std::make_shared(nullptr, nullptr, this, + clock::time_point::min(), + clock::duration::zero()); + wait_item->Disarm(); + QueueTimer(std::move(wait_item)); + + dispatch_thread_.join(); + } + + void TimerThreadMain() { + dp::sequence_t next_sequence = 0; + const auto comp = [](const std::shared_ptr& left, + const std::shared_ptr& right) { + return left->due_ < right->due_; + }; + + xe::threading::set_name("xe::threading::TimerQueue"); + + while (!shutdown_.load(std::memory_order_relaxed)) { + { + // Consume new wait items and add them to sorted wait queue + dp::sequence_t available = claim_strategy_.wait_until_published( + next_sequence, next_sequence - 1, + wait_queue_.empty() ? clock::time_point::max() + : wait_queue_.front()->due_); + + // Check for timeout + if (available != next_sequence - 1) { + std::forward_list> wait_items; + do { + wait_items.push_front(std::move(buffer_[next_sequence])); + } while (next_sequence++ != available); + + consumed_.publish(available); + + wait_items.sort(comp); + wait_queue_.merge(wait_items, comp); + } + } + + { + // Check wait queue, invoke callbacks and reschedule + std::forward_list> wait_items; + while (!wait_queue_.empty() && + wait_queue_.front()->due_ <= clock::now()) { + auto wait_item = std::move(wait_queue_.front()); + wait_queue_.pop_front(); + + // Ensure that it isn't disarmed + auto state = WaitItem::State::kIdle; + if (wait_item->state_.compare_exchange_strong( + state, WaitItem::State::kInCallback, + std::memory_order_acq_rel)) { + // Possibility to dispatch to a thread pool here + assert_not_null(wait_item->callback_); + wait_item->callback_(wait_item->userdata_); + + if (wait_item->interval_ != clock::duration::zero() && + wait_item->state_.load(std::memory_order_acquire) != + WaitItem::State::kInCallbackSelfDisarmed) { + // Item is recurring and didn't self-disarm during callback: + wait_item->due_ += wait_item->interval_; + wait_item->state_.store(WaitItem::State::kIdle, + std::memory_order_release); + wait_items.push_front(std::move(wait_item)); + } else { + wait_item->state_.store(WaitItem::State::kDisarmed, + std::memory_order_release); + } + } else { + // Specifically, kInCallback is illegal here + assert_true(WaitItem::State::kDisarmed == state); + } + } + wait_items.sort(comp); + wait_queue_.merge(wait_items, comp); + } + } + } + + std::weak_ptr QueueTimer(std::shared_ptr wait_item) { + auto wait_item_weak = std::weak_ptr(wait_item); + + // Mitigate callback flooding + wait_item->due_ = + std::max(clock::now() - wait_item->interval_, wait_item->due_); + + auto sequence = claim_strategy_.claim_one(); + buffer_[sequence] = std::move(wait_item); + claim_strategy_.publish(sequence); + + return wait_item_weak; + } + + const std::thread& dispatch_thread() const { return dispatch_thread_; } + + private: + // This ring buffer will be used to introduce timers queued by the public API + static constexpr size_t kWaitCount = 512; + dp::ring_buffer> buffer_; + dp::spin_wait_strategy wait_strategy_; + dp::multi_threaded_claim_strategy claim_strategy_; + dp::sequence_barrier consumed_; + + // This is a _sorted_ (ascending due_) list of active timers managed by a + // dedicated thread + std::forward_list> wait_queue_; + std::atomic_bool shutdown_; + std::thread dispatch_thread_; +}; + +xe::threading::TimerQueue timer_queue_; + +void TimerQueueWaitItem::Disarm() { + State state; + + // Special case for calling from a callback itself + if (std::this_thread::get_id() == parent_queue_->dispatch_thread().get_id()) { + state = State::kInCallback; + if (state_.compare_exchange_strong(state, State::kInCallbackSelfDisarmed, + std::memory_order_acq_rel)) { + // If we are self disarming from the callback set this special state and + // exit + return; + } + // Normal case can handle the rest + } + + dp::spin_wait spinner; + state = State::kIdle; + // Classes which hold WaitItems will often call Disarm() to cancel them during + // destruction. This may lead to race conditions when the dispatch thread + // executes a callback which accesses memory that is freed simultaneously due + // to this. Therefore, we need to guarantee that no callbacks will be running + // once Disarm() has returned. + while (!state_.compare_exchange_weak(state, State::kDisarmed, + std::memory_order_acq_rel)) { + if (state == State::kInCallbackSelfDisarmed || state == State::kDisarmed) { + break; + } + state = State::kIdle; + spinner.spin_once(); + } +} + +std::weak_ptr QueueTimerOnce(std::function callback, + void* userdata, + WaitItem::clock::time_point due) { + return timer_queue_.QueueTimer( + std::make_shared(std::move(callback), userdata, &timer_queue_, + due, WaitItem::clock::duration::zero())); +} + +std::weak_ptr QueueTimerRecurring( + std::function callback, void* userdata, + WaitItem::clock::time_point due, WaitItem::clock::duration interval) { + return timer_queue_.QueueTimer(std::make_shared( + std::move(callback), userdata, &timer_queue_, due, interval)); +} + +} // namespace threading +} // namespace xe diff --git a/src/xenia/base/threading_timer_queue.h b/src/xenia/base/threading_timer_queue.h new file mode 100644 index 000000000..91301fa7c --- /dev/null +++ b/src/xenia/base/threading_timer_queue.h @@ -0,0 +1,79 @@ +/** + ****************************************************************************** + * Xenia : Xbox 360 Emulator Research Project * + ****************************************************************************** + * Copyright 2022 Ben Vanik. All rights reserved. * + * Released under the BSD license - see LICENSE in the root for more details. * + ****************************************************************************** + */ + +#ifndef XENIA_BASE_THREADING_TIMER_QUEUE_H_ +#define XENIA_BASE_THREADING_TIMER_QUEUE_H_ + +#include +#include +#include +#include + +// This is a platform independent implementation of a timer queue similar to +// Windows CreateTimerQueueTimer with WT_EXECUTEINTIMERTHREAD. + +namespace xe::threading { + +class TimerQueue; + +struct TimerQueueWaitItem { + using clock = std::chrono::steady_clock; + + TimerQueueWaitItem(std::function callback, void* userdata, + TimerQueue* parent_queue, clock::time_point due, + clock::duration interval) + : callback_(std::move(callback)), + userdata_(userdata), + parent_queue_(parent_queue), + due_(due), + interval_(interval), + state_(State::kIdle) {} + + // Cancel the pending wait item. No callbacks will be running after this call. + // The function blocks if a callback is running and returns only after the + // callback has finished (except when called from the corresponding callback + // itself, where it will mark the wait item for disarmament and return + // immediately). Deadlocks are possible when a lock is held during disamament + // and the corresponding callback is running concurrently, trying to acquire + // said lock. + void Disarm(); + + friend TimerQueue; + + private: + enum class State : uint_least8_t { + kIdle = 0, // Waiting for the due time + kInCallback, // Callback is being executed + kInCallbackSelfDisarmed, // Callback is being executed and disarmed itself + kDisarmed // Disarmed, waiting for destruction + }; + static_assert(std::atomic::is_always_lock_free); + + std::function callback_; + void* userdata_; + TimerQueue* parent_queue_; + clock::time_point due_; + clock::duration interval_; // zero if not recurring + std::atomic state_; +}; + +std::weak_ptr QueueTimerOnce( + std::function callback, void* userdata, + TimerQueueWaitItem::clock::time_point due); + +// Callback is first executed at due, then again repeatedly after interval +// passes (unless interval == 0). The first callback will be scheduled at +// `max(now() - interval, due)` to mitigate callback flooding. +std::weak_ptr QueueTimerRecurring( + std::function callback, void* userdata, + TimerQueueWaitItem::clock::time_point due, + TimerQueueWaitItem::clock::duration interval); +} // namespace xe::threading + +#endif