Merge branch 'gsdx-boost-queue'

This commit is contained in:
Gregory Hainaut 2015-04-17 19:13:32 +02:00
commit 1d70865f09
15 changed files with 666 additions and 21 deletions

View File

@ -853,7 +853,11 @@ EXPORT_C GSgetTitleInfo2(char* dest, size_t length)
if(s_gs->m_GStitleInfoBuffer[0])
{
#ifdef _CX11_
std::lock_guard<std::mutex> lock(s_gs->m_pGSsetTitle_Crit);
#else
GSAutoLock lock(&s_gs->m_pGSsetTitle_Crit);
#endif
s = format("GSdx | %s", s_gs->m_GStitleInfoBuffer);

View File

@ -386,7 +386,11 @@ GSCapture::~GSCapture()
bool GSCapture::BeginCapture(float fps)
{
GSAutoLock lock(this);
#ifdef _CX11_
std::lock_guard<std::mutex> lock(m_lock);
#else
GSAutoLock lock(&m_lock);
#endif
ASSERT(fps != 0);
@ -481,7 +485,11 @@ bool GSCapture::BeginCapture(float fps)
bool GSCapture::DeliverFrame(const void* bits, int pitch, bool rgba)
{
GSAutoLock lock(this);
#ifdef _CX11_
std::lock_guard<std::mutex> lock(m_lock);
#else
GSAutoLock lock(&m_lock);
#endif
if(bits == NULL || pitch == 0)
{
@ -506,7 +514,11 @@ bool GSCapture::DeliverFrame(const void* bits, int pitch, bool rgba)
bool GSCapture::EndCapture()
{
GSAutoLock lock(this);
#ifdef _CX11_
std::lock_guard<std::mutex> lock(m_lock);
#else
GSAutoLock lock(&m_lock);
#endif
#ifdef _WINDOWS

View File

@ -22,14 +22,21 @@
#pragma once
#include "GSVector.h"
#ifndef _CX11_
#include "GSThread.h"
#endif
#ifdef _WINDOWS
#include "GSCaptureDlg.h"
#endif
class GSCapture : protected GSCritSec
class GSCapture
{
#ifdef _CX11_
std::mutex m_lock;
#else
GSCritSec m_lock;
#endif
bool m_capturing;
GSVector2i m_size;

View File

@ -165,12 +165,12 @@ bool RunLinuxDialog()
GtkWidget *fsaa_combo_box, *render_combo_box, *filter_combo_box;
GtkWidget *shader, *shader_conf, *shader_label, *shader_conf_label;
GtkWidget *shadeboost_check, *paltex_check, *fba_check, *aa_check, *native_res_check, *stretch_hack_check, *fxaa_check, *shaderfx_check, *align_sprite_check;
GtkWidget *shadeboost_check, *paltex_check, *fba_check, *aa_check, *native_res_check, *fxaa_check, *shaderfx_check, *spin_thread_check;
GtkWidget *sb_contrast, *sb_brightness, *sb_saturation;
GtkWidget *resx_spin, *resy_spin;
GtkWidget *hack_table, *hack_skipdraw_label, *hack_box, *hack_frame;
GtkWidget *hack_alpha_check, *hack_date_check, *hack_offset_check, *hack_skipdraw_spin, *hack_sprite_check, * hack_wild_check, *hack_enble_check, *hack_logz_check;
GtkWidget *hack_alpha_check, *hack_date_check, *hack_offset_check, *hack_skipdraw_spin, *hack_sprite_check, * hack_wild_check, *hack_enble_check, *hack_logz_check, *align_sprite_check, *stretch_hack_check;
GtkWidget *hack_tco_label, *hack_tco_entry;
GtkWidget *gl_box, *gl_frame, *gl_table;
@ -352,6 +352,7 @@ bool RunLinuxDialog()
paltex_check = gtk_check_button_new_with_label("Allow 8 bits textures");
fba_check = gtk_check_button_new_with_label("Alpha correction (FBA)");
aa_check = gtk_check_button_new_with_label("Edge anti-aliasing (AA1)");
spin_thread_check= gtk_check_button_new_with_label("Disable thread sleeping (6+ cores CPU)");
fxaa_check = gtk_check_button_new_with_label("Fxaa shader");
shaderfx_check = gtk_check_button_new_with_label("External shader");
@ -360,6 +361,7 @@ bool RunLinuxDialog()
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(paltex_check), theApp.GetConfig("paltex", 0));
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(fba_check), theApp.GetConfig("fba", 1));
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(aa_check), theApp.GetConfig("aa1", 0));
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(spin_thread_check), theApp.GetConfig("spin_thread", 0));
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(fxaa_check), theApp.GetConfig("fxaa", 0));
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(shaderfx_check), theApp.GetConfig("shaderfx", 0));
gtk_toggle_button_set_active(GTK_TOGGLE_BUTTON(native_res_check), theApp.GetConfig("nativeres", 0));
@ -414,6 +416,7 @@ bool RunLinuxDialog()
gtk_container_add(GTK_CONTAINER(sw_box), threads_box);
gtk_container_add(GTK_CONTAINER(sw_box), aa_check);
gtk_container_add(GTK_CONTAINER(sw_box), spin_thread_check);
// Tables are strange. The numbers are for their position: left, right, top, bottom.
gtk_table_attach_defaults(GTK_TABLE(shader_table), fxaa_check, 0, 1, 0, 1);
@ -544,6 +547,7 @@ override_GL_ARB_shading_language_420pack = -1
theApp.SetConfig("paltex", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(paltex_check)));
theApp.SetConfig("fba", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(fba_check)));
theApp.SetConfig("aa1", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(aa_check)));
theApp.SetConfig("spin_thread", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(spin_thread_check)));
theApp.SetConfig("fxaa", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(fxaa_check)));
theApp.SetConfig("shaderfx", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(shaderfx_check)));
theApp.SetConfig("nativeres", (int)gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(native_res_check)));

View File

@ -26,7 +26,9 @@
#include "GSVector.h"
#include "GSBlock.h"
#include "GSClut.h"
#ifndef _CX11_
#include "GSThread.h"
#endif
class GSOffset : public GSAlignedClass<32>
{

View File

@ -104,7 +104,7 @@ int GSRasterizer::FindMyNextScanline(int top) const
return top;
}
void GSRasterizer::Queue(shared_ptr<GSRasterizerData> data)
void GSRasterizer::Queue(const shared_ptr<GSRasterizerData>& data)
{
Draw(data.get());
}
@ -1147,7 +1147,7 @@ GSRasterizerList::GSRasterizerList(int threads, GSPerfMon* perfmon)
GSRasterizerList::~GSRasterizerList()
{
for(vector<GSWorker*>::iterator i = m_workers.begin(); i != m_workers.end(); i++)
for(auto i = m_workers.begin(); i != m_workers.end(); i++)
{
delete *i;
}
@ -1155,7 +1155,7 @@ GSRasterizerList::~GSRasterizerList()
_aligned_free(m_scanline);
}
void GSRasterizerList::Queue(shared_ptr<GSRasterizerData> data)
void GSRasterizerList::Queue(const shared_ptr<GSRasterizerData>& data)
{
GSVector4i r = data->bbox.rintersect(data->scissor);
@ -1210,13 +1210,13 @@ int GSRasterizerList::GetPixels(bool reset)
// GSRasterizerList::GSWorker
GSRasterizerList::GSWorker::GSWorker(GSRasterizer* r)
GSRasterizerList::GSWorker::GSWorker(GSRasterizer* r)
: GSJobQueue<shared_ptr<GSRasterizerData> >()
, m_r(r)
{
}
GSRasterizerList::GSWorker::~GSWorker()
GSRasterizerList::GSWorker::~GSWorker()
{
Wait();
@ -1228,7 +1228,33 @@ int GSRasterizerList::GSWorker::GetPixels(bool reset)
return m_r->GetPixels(reset);
}
void GSRasterizerList::GSWorker::Process(shared_ptr<GSRasterizerData>& item)
void GSRasterizerList::GSWorker::Process(shared_ptr<GSRasterizerData>& item)
{
m_r->Draw(item.get());
}
// GSRasterizerList::GSWorkerSpin
#ifdef ENABLE_BOOST
GSRasterizerList::GSWorkerSpin::GSWorkerSpin(GSRasterizer* r)
: GSJobQueueSpin<shared_ptr<GSRasterizerData> >()
, m_r(r)
{
}
GSRasterizerList::GSWorkerSpin::~GSWorkerSpin()
{
Wait();
delete m_r;
}
int GSRasterizerList::GSWorkerSpin::GetPixels(bool reset)
{
return m_r->GetPixels(reset);
}
void GSRasterizerList::GSWorkerSpin::Process(shared_ptr<GSRasterizerData>& item)
{
m_r->Draw(item.get());
}
#endif

View File

@ -24,9 +24,13 @@
#include "GS.h"
#include "GSVertexSW.h"
#include "GSFunctionMap.h"
#include "GSThread.h"
#include "GSAlignedClass.h"
#include "GSPerfMon.h"
#ifdef ENABLE_BOOST
#include "GSThread_CXX11.h"
#else
#include "GSThread.h"
#endif
__aligned(class, 32) GSRasterizerData : public GSAlignedClass<32>
{
@ -115,7 +119,7 @@ class IRasterizer : public GSAlignedClass<32>
public:
virtual ~IRasterizer() {}
virtual void Queue(shared_ptr<GSRasterizerData> data) = 0;
virtual void Queue(const shared_ptr<GSRasterizerData>& data) = 0;
virtual void Sync() = 0;
virtual bool IsSynced() const = 0;
virtual int GetPixels(bool reset = true) = 0;
@ -170,7 +174,7 @@ public:
// IRasterizer
void Queue(shared_ptr<GSRasterizerData> data);
void Queue(const shared_ptr<GSRasterizerData>& data);
void Sync() {}
bool IsSynced() const {return true;}
int GetPixels(bool reset);
@ -195,8 +199,29 @@ protected:
void Process(shared_ptr<GSRasterizerData>& item);
};
#ifdef ENABLE_BOOST
class GSWorkerSpin : public GSJobQueueSpin<shared_ptr<GSRasterizerData> >
{
GSRasterizer* m_r;
public:
GSWorkerSpin(GSRasterizer* r);
virtual ~GSWorkerSpin();
int GetPixels(bool reset);
// GSJobQueue
void Process(shared_ptr<GSRasterizerData>& item);
};
#endif
GSPerfMon* m_perfmon;
#ifdef ENABLE_BOOST
vector<IGSJobQueue<shared_ptr<GSRasterizerData> > *> m_workers;
#else
vector<GSWorker*> m_workers;
#endif
uint8* m_scanline;
GSRasterizerList(int threads, GSPerfMon* perfmon);
@ -204,7 +229,7 @@ protected:
public:
virtual ~GSRasterizerList();
template<class DS> static IRasterizer* Create(int threads, GSPerfMon* perfmon)
template<class DS> static IRasterizer* Create(int threads, GSPerfMon* perfmon, bool spin_thread = false)
{
threads = std::max<int>(threads, 0);
@ -218,7 +243,14 @@ public:
for(int i = 0; i < threads; i++)
{
#ifdef ENABLE_BOOST
if (spin_thread)
rl->m_workers.push_back(new GSWorkerSpin(new GSRasterizer(new DS(), i, threads, perfmon)));
else
rl->m_workers.push_back(new GSWorker(new GSRasterizer(new DS(), i, threads, perfmon)));
#else
rl->m_workers.push_back(new GSWorker(new GSRasterizer(new DS(), i, threads, perfmon)));
#endif
}
return rl;
@ -227,7 +259,7 @@ public:
// IRasterizer
void Queue(shared_ptr<GSRasterizerData> data);
void Queue(const shared_ptr<GSRasterizerData>& data);
void Sync();
bool IsSynced() const;
int GetPixels(bool reset);

View File

@ -406,7 +406,11 @@ void GSRenderer::VSync(int field)
// be noticeable). Besides, these locks are extremely short -- overhead of conditional
// is way more expensive than just waiting for the CriticalSection in 1 of 10,000,000 tries. --air
#ifdef _CX11_
std::lock_guard<std::mutex> lock(m_pGSsetTitle_Crit);
#else
GSAutoLock lock(&m_pGSsetTitle_Crit);
#endif
strncpy(m_GStitleInfoBuffer, s.c_str(), countof(m_GStitleInfoBuffer) - 1);

View File

@ -78,7 +78,11 @@ public:
virtual void EndCapture();
public:
#ifdef _CX11_
std::mutex m_pGSsetTitle_Crit;
#else
GSCritSec m_pGSsetTitle_Crit;
#endif
char m_GStitleInfoBuffer[128];
};

View File

@ -41,7 +41,8 @@ GSRendererSW::GSRendererSW(int threads)
memset(m_texture, 0, sizeof(m_texture));
m_rl = GSRasterizerList::Create<GSDrawScanline>(threads, &m_perfmon);
bool spin_thread = !!theApp.GetConfig("spin_thread", 0);
m_rl = GSRasterizerList::Create<GSDrawScanline>(threads, &m_perfmon, spin_thread);
m_output = (uint8*)_aligned_malloc(1024 * 1024 * sizeof(uint32), 32);

View File

@ -20,10 +20,15 @@
*/
#include "stdafx.h"
#ifdef ENABLE_BOOST
#include "GSThread_CXX11.h"
#else
#include "GSThread.h"
#endif
#ifdef _WINDOWS
#ifndef ENABLE_BOOST
InitializeConditionVariablePtr pInitializeConditionVariable;
WakeConditionVariablePtr pWakeConditionVariable;
WakeAllConditionVariablePtr pWakeAllConditionVariable;
@ -65,6 +70,7 @@ public:
};
static InitCondVar s_icv;
#endif
#endif

View File

@ -152,9 +152,6 @@ public:
#include <pthread.h>
#endif
#include <mutex>
#include <condition_variable>
class GSThread : public IGSThread
{
#ifdef _STD_THREAD_

View File

@ -0,0 +1,355 @@
/*
* Copyright (C) 2007-2009 Gabest
* http://www.gabest.org
*
* This Program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2, or (at your option)
* any later version.
*
* This Program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with GNU Make; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA USA.
* http://www.gnu.org/copyleft/gpl.html
*
*/
#pragma once
#include "GSdx.h"
#define BOOST_STAND_ALONE
#ifdef BOOST_STAND_ALONE
#include "boost_spsc_queue.hpp"
#else
#include <boost/lockfree/spsc_queue.hpp>
#endif
class IGSThread
{
protected:
virtual void ThreadProc() = 0;
};
// let us use std::thread for now, comment out the definition to go back to pthread
// There are currently some bugs/limitations to std::thread (see various comment)
// For the moment let's keep pthread but uses new std object (mutex, cond_var)
//#define _STD_THREAD_
#ifdef _WINDOWS
class GSThread : public IGSThread
{
DWORD m_ThreadId;
HANDLE m_hThread;
static DWORD WINAPI StaticThreadProc(void* lpParam);
protected:
void CreateThread();
void CloseThread();
public:
GSThread();
virtual ~GSThread();
};
#else
#ifdef _STD_THREAD_
#include <thread>
#else
#include <pthread.h>
#endif
class GSThread : public IGSThread
{
#ifdef _STD_THREAD_
std::thread *t;
#else
pthread_attr_t m_thread_attr;
pthread_t m_thread;
#endif
static void* StaticThreadProc(void* param);
protected:
void CreateThread();
void CloseThread();
public:
GSThread();
virtual ~GSThread();
};
#endif
// To allow switching between queue dynamically
template<class T> class IGSJobQueue : public GSThread
{
public:
IGSJobQueue() {}
virtual ~IGSJobQueue() {}
virtual bool IsEmpty() const = 0;
virtual void Push(const T& item) = 0;
virtual void Wait() = 0;
virtual void Process(T& item) = 0;
virtual int GetPixels(bool reset) = 0;
};
// This queue doesn't reserve any thread. It would be nicer for 2c/4c CPU.
// pros: no hard limit on thread numbers
// cons: less performance by thread
template<class T> class GSJobQueue : public IGSJobQueue<T>
{
protected:
std::atomic<int16_t> m_count;
std::atomic<bool> m_exit;
#ifdef BOOST_STAND_ALONE
ringbuffer_base<T, 256> m_queue;
#else
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<255> > m_queue;
#endif
std::mutex m_lock;
std::condition_variable m_empty;
std::condition_variable m_notempty;
void ThreadProc() {
std::unique_lock<std::mutex> l(m_lock);
while (true) {
while (m_count == 0) {
if (m_exit.load(memory_order_acquire)) return;
m_notempty.wait(l);
}
l.unlock();
int16_t consumed = 0;
for (int16_t nb = m_count; nb >= 0; nb--) {
if (m_queue.consume_one(*this))
consumed++;
}
l.lock();
m_count -= consumed;
if (m_count <= 0)
m_empty.notify_one();
}
}
public:
GSJobQueue() :
m_count(0),
m_exit(false)
{
this->CreateThread();
}
virtual ~GSJobQueue() {
m_exit.store(true, memory_order_release);
m_notempty.notify_one();
this->CloseThread();
}
bool IsEmpty() const {
ASSERT(m_count >= 0);
return m_count == 0;
}
void Push(const T& item) {
while(!m_queue.push(item))
std::this_thread::yield();
std::unique_lock<std::mutex> l(m_lock);
m_count++;
l.unlock();
m_notempty.notify_one();
}
void Wait() {
if (m_count > 0) {
std::unique_lock<std::mutex> l(m_lock);
while (m_count > 0) {
m_empty.wait(l);
}
}
ASSERT(m_count == 0);
}
void operator() (T& item) {
this->Process(item);
}
};
// This queue reserves 'only' RENDERING threads mostly the same performance as a no reservation queue if the CPU is fast enough
// pros: nearly best fps by thread
// cons: requires (1 + eThreads) cores for GS emulation only ! Reserved to 6/8 cores CPU.
// Note: I'm not sure of the source of the speedup
// 1/ It could be related to less MT logic (lock, cond var)
// 2/ But I highly suspect that waking up thread is rather slow. My guess
// is that low power feature (like C state) increases latency. In this case
// gain will be smaller if PCSX2 is running or in limited core CPU (<=4)
template<class T> class GSJobQueueSpin : public IGSJobQueue<T>
{
protected:
std::atomic<int16_t> m_count;
std::atomic<bool> m_exit;
#ifdef BOOST_STAND_ALONE
ringbuffer_base<T, 256> m_queue;
#else
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<255> > m_queue;
#endif
std::mutex m_lock;
std::condition_variable m_empty;
void ThreadProc() {
std::unique_lock<std::mutex> l(m_lock, defer_lock);
while (true) {
while (m_count == 0) {
if (m_exit.load(memory_order_acquire)) return;
std::this_thread::yield();
}
int16_t consumed = 0;
for (int16_t nb = m_count; nb >= 0; nb--) {
if (m_queue.consume_one(*this))
consumed++;
}
l.lock();
m_count -= consumed;
l.unlock();
if (m_count <= 0)
m_empty.notify_one();
}
}
public:
GSJobQueueSpin() :
m_count(0),
m_exit(false)
{
this->CreateThread();
};
virtual ~GSJobQueueSpin() {
m_exit.store(true, memory_order_release);
this->CloseThread();
}
bool IsEmpty() const {
ASSERT(m_count >= 0);
return m_count == 0;
}
void Push(const T& item) {
while(!m_queue.push(item))
std::this_thread::yield();
m_count++;
}
void Wait() {
if (m_count > 0) {
std::unique_lock<std::mutex> l(m_lock);
while (m_count > 0) {
m_empty.wait(l);
}
}
ASSERT(m_count == 0);
}
void operator() (T& item) {
this->Process(item);
}
};
// This queue reserves RENDERING threads + GS threads onto dedicated CPU
// pros: best fps by thread
// cons: requires (1 + eThreads) cores for GS emulation only ! Reserved to 8 cores CPU.
#if 0
template<class T> class GSJobQueue : public IGSJobQueue<T>
{
protected:
std::atomic<int16_t> m_count;
std::atomic<bool> m_exit;
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<255> > m_queue;
void ThreadProc() {
while (true) {
while (m_count == 0) {
if (m_exit.load(memory_order_acquire)) return;
std::this_thread::yield();
}
m_count -= m_queue.consume_all(*this);
}
}
public:
GSJobQueue() :
m_count(0),
m_exit(false)
{
CreateThread();
};
virtual ~GSJobQueue() {
m_exit = true;
CloseThread();
}
bool IsEmpty() const {
ASSERT(m_count >= 0);
return m_count == 0;
}
void Push(const T& item) {
m_count++;
while(!m_queue.push(item))
std::this_thread::yield();
}
void Wait() {
while (m_count > 0)
std::this_thread::yield();
ASSERT(m_count == 0);
}
virtual void Process(T& item) = 0;
void operator() (T& item) {
this->Process(item);
}
};
#endif

View File

@ -0,0 +1,177 @@
// This version is a stripped down version of boost/lockfree/spsc_queue.hpp boost_spsc_queue.hpp
// Rational
// * Performance is better on linux than the standard std::queue
// * Performance in the same on windows
// => 100-200MB of dependency feel rather unfriendly
// Potential optimization
// * plug condition variable into the queue directly to avoid redundant m_count
// * Restore boost optimization
// => unlikely or replace it with a % (if size is 2^n)
// lock-free single-producer/single-consumer ringbuffer
// this algorithm is implemented in various projects (linux kernel)
//
// Copyright (C) 2009-2013 Tim Blechmann
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Boost Software License - Version 1.0 - August 17th, 2003
//
// Permission is hereby granted, free of charge, to any person or organization
// obtaining a copy of the software and accompanying documentation covered by
// this license (the "Software") to use, reproduce, display, distribute,
// execute, and transmit the Software, and to prepare derivative works of the
// Software, and to permit third-parties to whom the Software is furnished to
// do so, all subject to the following:
//
// The copyright notices in the Software and this entire statement, including
// the above license grant, this restriction and the following disclaimer,
// must be included in all copies of the Software, in whole or in part, and
// all derivative works of the Software, unless such copies or derivative
// works are solely in the form of machine-executable object code generated by
// a source language processor.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
template <typename T, size_t max_size>
class ringbuffer_base
{
static const int padding_size = 64 - sizeof(size_t);
atomic<size_t> write_index_;
char padding1[padding_size]; /* force read_index and write_index to different cache lines */
atomic<size_t> read_index_;
T *buffer;
ringbuffer_base(ringbuffer_base const &) = delete;
ringbuffer_base(ringbuffer_base &&) = delete;
const ringbuffer_base& operator=( const ringbuffer_base& ) = delete;
public:
ringbuffer_base(void):
write_index_(0), read_index_(0)
{
// Use dynamically allocation here with no T object dependency
// Otherwise the ringbuffer_base destructor will call the destructor
// of T which crash if T is a (invalid) shared_ptr.
//
// Note another solution will be to create a char buffer as union of T
buffer = (T*)_aligned_malloc(sizeof(T)*max_size, 32);
}
~ringbuffer_base(void) {
// destroy all remaining items
T out;
while (pop(out)) {};
_aligned_free(buffer);
}
static size_t next_index(size_t arg)
{
size_t ret = arg + 1;
#if 0
while (unlikely(ret >= max_size))
#else
while (ret >= max_size)
#endif
ret -= max_size;
return ret;
}
bool push(T const & t)
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index);
if (next == read_index_.load(memory_order_acquire))
return false; /* ringbuffer is full */
new (buffer + write_index) T(t); // copy-construct
write_index_.store(next, memory_order_release);
return true;
}
bool pop (T & ret)
{
const size_t write_index = write_index_.load(memory_order_acquire);
const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
if (empty(write_index, read_index))
return false;
ret = buffer[read_index];
buffer[read_index].~T();
size_t next = next_index(read_index);
read_index_.store(next, memory_order_release);
return true;
}
template <typename Functor>
bool consume_one(Functor & f)
{
const size_t write_index = write_index_.load(memory_order_acquire);
const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
if (empty(write_index, read_index))
return false;
f(buffer[read_index]);
buffer[read_index].~T();
size_t next = next_index(read_index);
read_index_.store(next, memory_order_release);
return true;
}
public:
/** reset the ringbuffer
*
* \note Not thread-safe
* */
void reset(void)
{
write_index_.store(0, memory_order_relaxed);
read_index_.store(0, memory_order_release);
}
/** Check if the ringbuffer is empty
*
* \return true, if the ringbuffer is empty, false otherwise
* \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
* */
bool empty(void)
{
return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
}
/**
* \return true, if implementation is lock-free.
*
* */
bool is_lock_free(void) const
{
return write_index_.is_lock_free() && read_index_.is_lock_free();
}
private:
bool empty(size_t write_index, size_t read_index)
{
return write_index == read_index;
}
};

View File

@ -60,6 +60,12 @@
#endif
// Require at least Visual Studio 2012
#if defined(__linux__) || (defined(_MSC_VER) && (_MSC_VER >= 1700))
#define _CX11_
#define ENABLE_BOOST // queue is from boost but it doesn't require a full boost install
#endif
// put these into vc9/common7/ide/usertype.dat to have them highlighted
typedef unsigned char uint8;
@ -96,6 +102,14 @@ typedef uint32 uptr;
#include <set>
#include <queue>
#include <algorithm>
#ifdef _CX11_
#include <thread>
#include <atomic>
#endif
#if defined(__linux__) || defined(_CX11_)
#include <mutex>
#include <condition_variable>
#endif
using namespace std;