reimplement task to not be buggy

This commit is contained in:
zeromus 2016-08-23 21:13:29 +00:00
parent ae92918d27
commit fc77539bda
1 changed files with 93 additions and 90 deletions

View File

@ -53,8 +53,9 @@ int getOnlineCores (void)
class Task::Impl { class Task::Impl {
private: private:
sthread_t* _thread; sthread_t* thread;
bool _isThreadRunning; friend void thunkTaskProc(void* arg);
void taskProc();
public: public:
Impl(); Impl();
@ -64,142 +65,144 @@ public:
void execute(const TWork &work, void *param); void execute(const TWork &work, void *param);
void* finish(); void* finish();
void shutdown(); void shutdown();
void initialize();
slock_t *mutex; slock_t *mutex;
scond_t *condWork; scond_t *workCond;
bool workFlag, finishFlag;
TWork workFunc; TWork workFunc;
void *workFuncParam; void *workFuncParam;
void *ret; void *ret;
bool exitThread; bool exitThread;
bool started;
}; };
static void taskProc(void *arg) static void thunkTaskProc(void *arg)
{ {
Task::Impl *ctx = (Task::Impl *)arg; Task::Impl *ctx = (Task::Impl *)arg;
ctx->taskProc();
}
do { void Task::Impl::taskProc()
slock_lock(ctx->mutex); {
for(;;)
{
slock_lock(mutex);
while (ctx->workFunc == NULL && !ctx->exitThread) { if(!workFlag)
scond_wait(ctx->condWork, ctx->mutex); scond_wait(workCond, mutex);
} workFlag = false;
if (ctx->workFunc != NULL) { ret = workFunc(workFuncParam);
ctx->ret = ctx->workFunc(ctx->workFuncParam);
} else {
ctx->ret = NULL;
}
ctx->workFunc = NULL; finishFlag = true;
scond_signal(ctx->condWork); scond_signal(workCond);
slock_unlock(ctx->mutex); slock_unlock(mutex);
} while(!ctx->exitThread); if(exitThread)
break;
}
}
static void* killTask(void* task)
{
((Task::Impl*)task)->exitThread = true;
return 0;
} }
Task::Impl::Impl() Task::Impl::Impl()
: started(false)
{ {
_isThreadRunning = false;
workFunc = NULL;
workFuncParam = NULL;
ret = NULL;
exitThread = false;
mutex = slock_new();
condWork = scond_new();
} }
Task::Impl::~Impl() Task::Impl::~Impl()
{ {
shutdown(); shutdown();
slock_free(mutex); }
scond_free(condWork);
void Task::Impl::initialize()
{
thread = NULL;
workFunc = NULL;
workCond = NULL;
workFlag = finishFlag = false;
workFunc = NULL;
workFuncParam = NULL;
ret = NULL;
exitThread = false;
started = false;
} }
void Task::Impl::start(bool spinlock) void Task::Impl::start(bool spinlock)
{ {
slock_lock(this->mutex); initialize();
mutex = slock_new();
workCond = scond_new();
if (this->_isThreadRunning) { slock_lock(mutex);
slock_unlock(this->mutex);
return;
}
this->workFunc = NULL; thread = sthread_create(&thunkTaskProc,this);
this->workFuncParam = NULL; started = true;
this->ret = NULL;
this->exitThread = false; slock_unlock(mutex);
this->_thread = sthread_create(&taskProc,this); }
this->_isThreadRunning = true;
void Task::Impl::shutdown()
{
if(!started) return;
execute(killTask,this);
finish();
started = false;
sthread_join(thread);
slock_free(mutex);
scond_free(workCond);
}
void* Task::Impl::finish()
{
//no work running; nothing to do (it's kind of lame that we call this under the circumstances)
if(!workFunc)
return NULL;
slock_lock(mutex);
if(!finishFlag)
scond_wait(workCond, mutex);
finishFlag = false;
slock_unlock(this->mutex); slock_unlock(this->mutex);
workFunc = NULL;
return ret;
} }
void Task::Impl::execute(const TWork &work, void *param) void Task::Impl::execute(const TWork &work, void *param)
{ {
slock_lock(this->mutex); slock_lock(this->mutex);
if (work == NULL || !this->_isThreadRunning) { workFunc = work;
slock_unlock(this->mutex); workFuncParam = param;
return; workFlag = true;
} scond_signal(workCond);
this->workFunc = work;
this->workFuncParam = param;
scond_signal(this->condWork);
slock_unlock(this->mutex); slock_unlock(this->mutex);
} }
void* Task::Impl::finish()
{
void *returnValue = NULL;
slock_lock(this->mutex);
if (!this->_isThreadRunning) {
slock_unlock(this->mutex);
return returnValue;
}
while (this->workFunc != NULL) {
scond_wait(this->condWork, this->mutex);
}
returnValue = this->ret;
slock_unlock(this->mutex);
return returnValue;
}
void Task::Impl::shutdown()
{
slock_lock(this->mutex);
if (!this->_isThreadRunning) {
slock_unlock(this->mutex);
return;
}
this->workFunc = NULL;
this->exitThread = true;
scond_signal(this->condWork);
slock_unlock(this->mutex);
sthread_join(this->_thread);
slock_lock(this->mutex);
this->_isThreadRunning = false;
slock_unlock(this->mutex);
}
void Task::start(bool spinlock) { impl->start(spinlock); } void Task::start(bool spinlock) { impl->start(spinlock); }
void Task::shutdown() { impl->shutdown(); } void Task::shutdown() { impl->shutdown(); }
Task::Task() : impl(new Task::Impl()) {} Task::Task() : impl(new Task::Impl()) {}
Task::~Task() { delete impl; } Task::~Task()
{
delete impl;
}
void Task::execute(const TWork &work, void* param) { impl->execute(work,param); } void Task::execute(const TWork &work, void* param) { impl->execute(work,param); }
void* Task::finish() { return impl->finish(); } void* Task::finish() { return impl->finish(); }