mirror of https://github.com/RPCS3/rpcs3.git
lf_queue: implement wait() method
Synchronization does not occupy additional space by reusing LSB
This commit is contained in:
parent
ac775cd75e
commit
81c50bad69
|
@ -1,5 +1,6 @@
|
||||||
#include "cond.h"
|
#include "cond.h"
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
|
#include "lockless.h"
|
||||||
|
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
|
|
||||||
|
@ -267,3 +268,27 @@ void cond_x16::imp_notify() noexcept
|
||||||
|
|
||||||
balanced_awaken(m_cvx16, utils::popcnt16(wait_mask));
|
balanced_awaken(m_cvx16, utils::popcnt16(wait_mask));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool lf_queue_base::wait(u64 _timeout)
|
||||||
|
{
|
||||||
|
return balanced_wait_until(m_head, _timeout, [](std::uintptr_t& head, auto... ret) -> int
|
||||||
|
{
|
||||||
|
if (head != 1)
|
||||||
|
{
|
||||||
|
return +1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (sizeof...(ret))
|
||||||
|
{
|
||||||
|
head = 0;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void lf_queue_base::imp_notify()
|
||||||
|
{
|
||||||
|
balanced_awaken(m_head, 1);
|
||||||
|
}
|
||||||
|
|
|
@ -311,17 +311,28 @@ public:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class lf_queue_base
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
atomic_t<std::uintptr_t> m_head = 0;
|
||||||
|
|
||||||
|
void imp_notify();
|
||||||
|
|
||||||
|
public:
|
||||||
|
// Wait for new elements pushed, no other thread shall call wait() or pop_all() simultaneously
|
||||||
|
bool wait(u64 usec_timeout = -1);
|
||||||
|
};
|
||||||
|
|
||||||
// Linked list-based multi-producer queue (the consumer drains the whole queue at once)
|
// Linked list-based multi-producer queue (the consumer drains the whole queue at once)
|
||||||
template <typename T>
|
template <typename T>
|
||||||
class lf_queue
|
class lf_queue : public lf_queue_base
|
||||||
{
|
{
|
||||||
// Elements are added by replacing m_head
|
using lf_queue_base::m_head;
|
||||||
atomic_t<lf_queue_item<T>*> m_head = nullptr;
|
|
||||||
|
|
||||||
// Extract all elements and reverse element order (FILO to FIFO)
|
// Extract all elements and reverse element order (FILO to FIFO)
|
||||||
lf_queue_item<T>* reverse() noexcept
|
lf_queue_item<T>* reverse() noexcept
|
||||||
{
|
{
|
||||||
if (auto* head = m_head.load() ? m_head.exchange(nullptr) : nullptr)
|
if (auto* head = m_head.load() ? reinterpret_cast<lf_queue_item<T>*>(m_head.exchange(0)) : nullptr)
|
||||||
{
|
{
|
||||||
if (auto* prev = head->m_link)
|
if (auto* prev = head->m_link)
|
||||||
{
|
{
|
||||||
|
@ -347,18 +358,23 @@ public:
|
||||||
|
|
||||||
~lf_queue()
|
~lf_queue()
|
||||||
{
|
{
|
||||||
delete m_head.load();
|
delete reinterpret_cast<T*>(m_head.load());
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
void push(Args&&... args)
|
void push(Args&&... args)
|
||||||
{
|
{
|
||||||
auto* old = m_head.load();
|
auto _old = m_head.load();
|
||||||
auto* item = new lf_queue_item<T>(old, std::forward<Args>(args)...);
|
auto* item = new lf_queue_item<T>(_old & 1 ? nullptr : reinterpret_cast<lf_queue_item<T>*>(_old), std::forward<Args>(args)...);
|
||||||
|
|
||||||
while (!m_head.compare_exchange(old, item))
|
while (!m_head.compare_exchange(_old, reinterpret_cast<std::uint64_t>(item)))
|
||||||
{
|
{
|
||||||
item->m_link = old;
|
item->m_link = _old & 1 ? nullptr : reinterpret_cast<lf_queue_item<T>*>(_old);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_old & 1)
|
||||||
|
{
|
||||||
|
lf_queue_base::imp_notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -153,7 +153,7 @@ inline int futex(volatile void* uaddr, int futex_op, uint val, const timespec* t
|
||||||
template <typename T, typename Pred>
|
template <typename T, typename Pred>
|
||||||
bool balanced_wait_until(atomic_t<T>& var, u64 usec_timeout, Pred&& pred)
|
bool balanced_wait_until(atomic_t<T>& var, u64 usec_timeout, Pred&& pred)
|
||||||
{
|
{
|
||||||
static_assert(sizeof(T) == 4);
|
static_assert(sizeof(T) == 4 || sizeof(T) == 8);
|
||||||
|
|
||||||
const bool is_inf = usec_timeout > u64{UINT32_MAX / 1000} * 1000000;
|
const bool is_inf = usec_timeout > u64{UINT32_MAX / 1000} * 1000000;
|
||||||
|
|
||||||
|
@ -184,7 +184,7 @@ bool balanced_wait_until(atomic_t<T>& var, u64 usec_timeout, Pred&& pred)
|
||||||
{
|
{
|
||||||
while (!test_pred(value))
|
while (!test_pred(value))
|
||||||
{
|
{
|
||||||
if (OptWaitOnAddress(&var, &value, sizeof(u32), is_inf ? INFINITE : usec_timeout / 1000))
|
if (OptWaitOnAddress(&var, &value, sizeof(T), is_inf ? INFINITE : usec_timeout / 1000))
|
||||||
{
|
{
|
||||||
if (!test_pred(value) && !test_pred(value, nullptr))
|
if (!test_pred(value) && !test_pred(value, nullptr))
|
||||||
{
|
{
|
||||||
|
@ -260,7 +260,7 @@ bool balanced_wait_until(atomic_t<T>& var, u64 usec_timeout, Pred&& pred)
|
||||||
template <typename T>
|
template <typename T>
|
||||||
void balanced_awaken(atomic_t<T>& var, u32 weight)
|
void balanced_awaken(atomic_t<T>& var, u32 weight)
|
||||||
{
|
{
|
||||||
static_assert(sizeof(T) == 4);
|
static_assert(sizeof(T) == 4 || sizeof(T) == 8);
|
||||||
|
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
if (OptWaitOnAddress)
|
if (OptWaitOnAddress)
|
||||||
|
|
Loading…
Reference in New Issue