[Base] Add TimerQueue

- Cross platform functionality similar to Windows' `CreateTimerQueue`
  with `WT_EXECUTEINTIMERTHREAD`
This commit is contained in:
Joel Linn 2022-03-05 11:45:08 +01:00 committed by Rick Gibbed
parent a85fc25040
commit 75357caeaf
2 changed files with 290 additions and 0 deletions

View File

@ -0,0 +1,211 @@
/**
******************************************************************************
* Xenia : Xbox 360 Emulator Research Project *
******************************************************************************
* Copyright 2022 Ben Vanik. All rights reserved. *
* Released under the BSD license - see LICENSE in the root for more details. *
******************************************************************************
*/
#include <algorithm>
#include <forward_list>
#include "third_party/disruptorplus/include/disruptorplus/multi_threaded_claim_strategy.hpp"
#include "third_party/disruptorplus/include/disruptorplus/ring_buffer.hpp"
#include "third_party/disruptorplus/include/disruptorplus/sequence_barrier.hpp"
#include "third_party/disruptorplus/include/disruptorplus/spin_wait.hpp"
#include "third_party/disruptorplus/include/disruptorplus/spin_wait_strategy.hpp"
#include "xenia/base/assert.h"
#include "xenia/base/threading.h"
#include "xenia/base/threading_timer_queue.h"
namespace dp = disruptorplus;
namespace xe {
namespace threading {
using WaitItem = TimerQueueWaitItem;
class TimerQueue {
public:
using clock = WaitItem::clock;
static_assert(clock::is_steady);
public:
TimerQueue()
: buffer_(kWaitCount),
wait_strategy_(),
claim_strategy_(kWaitCount, wait_strategy_),
consumed_(wait_strategy_),
shutdown_(false) {
claim_strategy_.add_claim_barrier(consumed_);
dispatch_thread_ = std::thread(&TimerQueue::TimerThreadMain, this);
}
~TimerQueue() {
shutdown_.store(true, std::memory_order_release);
// Kick dispatch thread to check shutdown flag
auto wait_item = std::make_shared<WaitItem>(nullptr, nullptr, this,
clock::time_point::min(),
clock::duration::zero());
wait_item->Disarm();
QueueTimer(std::move(wait_item));
dispatch_thread_.join();
}
void TimerThreadMain() {
dp::sequence_t next_sequence = 0;
const auto comp = [](const std::shared_ptr<WaitItem>& left,
const std::shared_ptr<WaitItem>& right) {
return left->due_ < right->due_;
};
xe::threading::set_name("xe::threading::TimerQueue");
while (!shutdown_.load(std::memory_order_relaxed)) {
{
// Consume new wait items and add them to sorted wait queue
dp::sequence_t available = claim_strategy_.wait_until_published(
next_sequence, next_sequence - 1,
wait_queue_.empty() ? clock::time_point::max()
: wait_queue_.front()->due_);
// Check for timeout
if (available != next_sequence - 1) {
std::forward_list<std::shared_ptr<WaitItem>> wait_items;
do {
wait_items.push_front(std::move(buffer_[next_sequence]));
} while (next_sequence++ != available);
consumed_.publish(available);
wait_items.sort(comp);
wait_queue_.merge(wait_items, comp);
}
}
{
// Check wait queue, invoke callbacks and reschedule
std::forward_list<std::shared_ptr<WaitItem>> wait_items;
while (!wait_queue_.empty() &&
wait_queue_.front()->due_ <= clock::now()) {
auto wait_item = std::move(wait_queue_.front());
wait_queue_.pop_front();
// Ensure that it isn't disarmed
auto state = WaitItem::State::kIdle;
if (wait_item->state_.compare_exchange_strong(
state, WaitItem::State::kInCallback,
std::memory_order_acq_rel)) {
// Possibility to dispatch to a thread pool here
assert_not_null(wait_item->callback_);
wait_item->callback_(wait_item->userdata_);
if (wait_item->interval_ != clock::duration::zero() &&
wait_item->state_.load(std::memory_order_acquire) !=
WaitItem::State::kInCallbackSelfDisarmed) {
// Item is recurring and didn't self-disarm during callback:
wait_item->due_ += wait_item->interval_;
wait_item->state_.store(WaitItem::State::kIdle,
std::memory_order_release);
wait_items.push_front(std::move(wait_item));
} else {
wait_item->state_.store(WaitItem::State::kDisarmed,
std::memory_order_release);
}
} else {
// Specifically, kInCallback is illegal here
assert_true(WaitItem::State::kDisarmed == state);
}
}
wait_items.sort(comp);
wait_queue_.merge(wait_items, comp);
}
}
}
std::weak_ptr<WaitItem> QueueTimer(std::shared_ptr<WaitItem> wait_item) {
auto wait_item_weak = std::weak_ptr<WaitItem>(wait_item);
// Mitigate callback flooding
wait_item->due_ =
std::max(clock::now() - wait_item->interval_, wait_item->due_);
auto sequence = claim_strategy_.claim_one();
buffer_[sequence] = std::move(wait_item);
claim_strategy_.publish(sequence);
return wait_item_weak;
}
const std::thread& dispatch_thread() const { return dispatch_thread_; }
private:
// This ring buffer will be used to introduce timers queued by the public API
static constexpr size_t kWaitCount = 512;
dp::ring_buffer<std::shared_ptr<WaitItem>> buffer_;
dp::spin_wait_strategy wait_strategy_;
dp::multi_threaded_claim_strategy<dp::spin_wait_strategy> claim_strategy_;
dp::sequence_barrier<dp::spin_wait_strategy> consumed_;
// This is a _sorted_ (ascending due_) list of active timers managed by a
// dedicated thread
std::forward_list<std::shared_ptr<WaitItem>> wait_queue_;
std::atomic_bool shutdown_;
std::thread dispatch_thread_;
};
xe::threading::TimerQueue timer_queue_;
void TimerQueueWaitItem::Disarm() {
State state;
// Special case for calling from a callback itself
if (std::this_thread::get_id() == parent_queue_->dispatch_thread().get_id()) {
state = State::kInCallback;
if (state_.compare_exchange_strong(state, State::kInCallbackSelfDisarmed,
std::memory_order_acq_rel)) {
// If we are self disarming from the callback set this special state and
// exit
return;
}
// Normal case can handle the rest
}
dp::spin_wait spinner;
state = State::kIdle;
// Classes which hold WaitItems will often call Disarm() to cancel them during
// destruction. This may lead to race conditions when the dispatch thread
// executes a callback which accesses memory that is freed simultaneously due
// to this. Therefore, we need to guarantee that no callbacks will be running
// once Disarm() has returned.
while (!state_.compare_exchange_weak(state, State::kDisarmed,
std::memory_order_acq_rel)) {
if (state == State::kInCallbackSelfDisarmed || state == State::kDisarmed) {
break;
}
state = State::kIdle;
spinner.spin_once();
}
}
std::weak_ptr<WaitItem> QueueTimerOnce(std::function<void(void*)> callback,
void* userdata,
WaitItem::clock::time_point due) {
return timer_queue_.QueueTimer(
std::make_shared<WaitItem>(std::move(callback), userdata, &timer_queue_,
due, WaitItem::clock::duration::zero()));
}
std::weak_ptr<WaitItem> QueueTimerRecurring(
std::function<void(void*)> callback, void* userdata,
WaitItem::clock::time_point due, WaitItem::clock::duration interval) {
return timer_queue_.QueueTimer(std::make_shared<WaitItem>(
std::move(callback), userdata, &timer_queue_, due, interval));
}
} // namespace threading
} // namespace xe

View File

@ -0,0 +1,79 @@
/**
******************************************************************************
* Xenia : Xbox 360 Emulator Research Project *
******************************************************************************
* Copyright 2022 Ben Vanik. All rights reserved. *
* Released under the BSD license - see LICENSE in the root for more details. *
******************************************************************************
*/
#ifndef XENIA_BASE_THREADING_TIMER_QUEUE_H_
#define XENIA_BASE_THREADING_TIMER_QUEUE_H_
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
// This is a platform independent implementation of a timer queue similar to
// Windows CreateTimerQueueTimer with WT_EXECUTEINTIMERTHREAD.
namespace xe::threading {
class TimerQueue;
struct TimerQueueWaitItem {
using clock = std::chrono::steady_clock;
TimerQueueWaitItem(std::function<void(void*)> callback, void* userdata,
TimerQueue* parent_queue, clock::time_point due,
clock::duration interval)
: callback_(std::move(callback)),
userdata_(userdata),
parent_queue_(parent_queue),
due_(due),
interval_(interval),
state_(State::kIdle) {}
// Cancel the pending wait item. No callbacks will be running after this call.
// The function blocks if a callback is running and returns only after the
// callback has finished (except when called from the corresponding callback
// itself, where it will mark the wait item for disarmament and return
// immediately). Deadlocks are possible when a lock is held during disamament
// and the corresponding callback is running concurrently, trying to acquire
// said lock.
void Disarm();
friend TimerQueue;
private:
enum class State : uint_least8_t {
kIdle = 0, // Waiting for the due time
kInCallback, // Callback is being executed
kInCallbackSelfDisarmed, // Callback is being executed and disarmed itself
kDisarmed // Disarmed, waiting for destruction
};
static_assert(std::atomic<State>::is_always_lock_free);
std::function<void(void*)> callback_;
void* userdata_;
TimerQueue* parent_queue_;
clock::time_point due_;
clock::duration interval_; // zero if not recurring
std::atomic<State> state_;
};
std::weak_ptr<TimerQueueWaitItem> QueueTimerOnce(
std::function<void(void*)> callback, void* userdata,
TimerQueueWaitItem::clock::time_point due);
// Callback is first executed at due, then again repeatedly after interval
// passes (unless interval == 0). The first callback will be scheduled at
// `max(now() - interval, due)` to mitigate callback flooding.
std::weak_ptr<TimerQueueWaitItem> QueueTimerRecurring(
std::function<void(void*)> callback, void* userdata,
TimerQueueWaitItem::clock::time_point due,
TimerQueueWaitItem::clock::duration interval);
} // namespace xe::threading
#endif