#pragma once #include "types.h" #include "Atomic.h" //! Simple sizeless array base for concurrent access. Cannot shrink, only growths automatically. //! There is no way to know the current size. The smaller index is, the faster it's accessed. //! //! T is the type of elements. Currently, default constructor of T shall be constexpr. //! N is initial element count, available without any memory allocation and only stored contiguously. template class lf_array { // Data (default-initialized) T m_data[N]{}; // Next array block atomic_t m_next{}; public: constexpr lf_array() = default; ~lf_array() { for (auto ptr = m_next.raw(); UNLIKELY(ptr);) { delete std::exchange(ptr, std::exchange(ptr->m_next.raw(), nullptr)); } } T& operator [](std::size_t index) { if (LIKELY(index < N)) { return m_data[index]; } else if (UNLIKELY(!m_next)) { // Create new array block. It's not a full-fledged once-synchronization, unlikely needed. for (auto _new = new lf_array, ptr = this; UNLIKELY(ptr);) { // Install the pointer. If failed, go deeper. ptr = ptr->m_next.compare_and_swap(nullptr, _new); } } // Access recursively return (*m_next)[index - N]; } }; //! Simple lock-free FIFO queue base. Based on lf_array itself. Currently uses 32-bit counters. //! There is no "push_end" or "pop_begin" provided, the queue element must signal its state on its own. template class lf_fifo : public lf_array { struct alignas(8) ctrl_t { u32 push; u32 pop; }; atomic_t m_ctrl{}; public: constexpr lf_fifo() = default; // Get current "push" position u32 size() const { return reinterpret_cast&>(m_ctrl).load(); // Hack } // Acquire the place for one or more elements. u32 push_begin(u32 count = 1) { return reinterpret_cast&>(m_ctrl).fetch_add(count); // Hack } // Get current "pop" position u32 peek() const { return m_ctrl.load().pop; } // Acknowledge processed element, return number of the next one. // Perform clear if possible, zero is returned in this case. u32 pop_end(u32 count = 1) { return m_ctrl.atomic_op([&](ctrl_t& ctrl) { ctrl.pop += count; if (ctrl.pop == ctrl.push) { // Clean if possible ctrl.push = 0; ctrl.pop = 0; } return ctrl.pop; }); } }; //! Simple lock-free map. Based on lf_array<>. All elements are accessible, implicitly initialized. template, std::size_t Size = 256> class lf_hashmap { struct pair_t { // Default-constructed key means "no key" atomic_t key{}; T value{}; }; // lf_array m_data{}; // Value for default-constructed key T m_default_key_data{}; public: constexpr lf_hashmap() = default; // Access element (added implicitly) T& operator [](const K& key) { if (UNLIKELY(key == K{})) { return m_default_key_data; } // Calculate hash and array position for (std::size_t pos = Hash{}(key) % Size;; pos += Size) { // Access the array auto& pair = m_data[pos]; // Check the key value (optimistic) if (LIKELY(pair.key == key) || pair.key.compare_and_swap_test(K{}, key)) { return pair.value; } } } }; // Fixed-size single-producer single-consumer queue template class lf_spsc { // If N is a power of 2, m_push/m_pop can safely overflow and the algorithm is simplified static_assert(N && (1u << 31) % N == 0, "lf_spsc<> error: size must be power of 2"); protected: volatile std::uint32_t m_push{0}; volatile std::uint32_t m_pop{0}; T m_data[N]{}; public: constexpr lf_spsc() = default; // Try to push (producer only) template bool try_push(T2&& data) { const std::uint32_t pos = m_push; if (pos - m_pop >= N) { return false; } _mm_lfence(); m_data[pos % N] = std::forward(data); _mm_sfence(); m_push = pos + 1; return true; } // Try to get push pointer (producer only) operator T*() { const std::uint32_t pos = m_push; if (pos - m_pop >= N) { return nullptr; } _mm_lfence(); return m_data + (pos % N); } // Increment push counter (producer only) void end_push() { const std::uint32_t pos = m_push; if (pos - m_pop < N) { _mm_sfence(); m_push = pos + 1; } } // Unsafe access T& get_push(std::size_t i) { _mm_lfence(); return m_data[(m_push + i) % N]; } // Try to pop (consumer only) template bool try_pop(T2& out) { const std::uint32_t pos = m_pop; if (m_push - pos <= 0) { return false; } _mm_lfence(); out = std::move(m_data[pos % N]); _mm_sfence(); m_pop = pos + 1; return true; } // Increment pop counter (consumer only) void end_pop() { const std::uint32_t pos = m_pop; if (m_push - pos > 0) { _mm_sfence(); m_pop = pos + 1; } } // Get size (consumer only) std::uint32_t size() const { return m_push - m_pop; } // Direct access (consumer only) T& operator [](std::size_t i) { _mm_lfence(); return m_data[(m_pop + i) % N]; } }; // Fixed-size multi-producer single-consumer queue template class lf_mpsc : lf_spsc { protected: using lf_spsc::m_push; using lf_spsc::m_pop; using lf_spsc::m_data; enum : std::uint64_t { c_ack = 1ull << 0, c_rel = 1ull << 32, }; atomic_t m_lock{0}; void release(std::uint64_t value) { // Push all pending elements at once when possible if (value && value % c_rel == value / c_rel) { _mm_sfence(); m_push += value % c_rel; m_lock.compare_and_swap_test(value, 0); } } public: constexpr lf_mpsc() = default; // Try to get push pointer operator T*() { const std::uint64_t old = m_lock.fetch_add(c_ack); const std::uint32_t pos = m_push; if (old % N >= N || pos - m_pop >= N - (old % N)) { release(m_lock.sub_fetch(c_ack)); return nullptr; } return m_data + ((pos + old) % N); } // Increment push counter (producer only) void end_push() { release(m_lock.add_fetch(c_rel)); } // Try to push template bool try_push(T2&& data) { if (T* ptr = *this) { *ptr = std::forward(data); end_push(); return true; } return false; } // Enable consumer methods using lf_spsc::try_pop; using lf_spsc::end_pop; using lf_spsc::size; using lf_spsc::operator []; };