task.cpp:

- EXPERIMENTAL: Revert task.cpp and pthreads.c to what they were back in r5538, but change scond_wait() to explicitly unlock the mutex before calling WaitForSingleObject().
This commit is contained in:
rogerman 2016-08-24 21:17:39 +00:00
parent 66bc2d1d71
commit 212c23f30e
2 changed files with 100 additions and 159 deletions

View File

@ -81,9 +81,6 @@ struct scond
{ {
#ifdef USE_WIN32_THREADS #ifdef USE_WIN32_THREADS
HANDLE event; HANDLE event;
volatile int waiters;
volatile bool waiting_ack;
HANDLE ack;
#else #else
pthread_cond_t cond; pthread_cond_t cond;
#endif #endif
@ -313,13 +310,7 @@ scond_t *scond_new(void)
return NULL; return NULL;
#ifdef USE_WIN32_THREADS #ifdef USE_WIN32_THREADS
/* this is very complex because recreating condition variable semantics with win32 parts is not easy (or maybe it is and I just havent seen how) */ cond->event = CreateEvent(NULL, FALSE, FALSE, NULL);
/* the main problem is that a condition variable can be used to wake up a thread, but only if the thread is already waiting. */
/* whereas a win32 event will 'wake up' a thread in advance (the event will be set in advance, so a 'waiter' wont even have to wait on it) */
cond->event = CreateEvent(NULL, FALSE, FALSE, NULL);
cond->ack = CreateEvent(NULL, FALSE, FALSE, NULL);
cond->waiters = 0;
cond->waiting_ack = false;
event_created = !!cond->event; event_created = !!cond->event;
#else #else
event_created = (pthread_cond_init(&cond->cond, NULL) == 0); event_created = (pthread_cond_init(&cond->cond, NULL) == 0);
@ -348,7 +339,6 @@ void scond_free(scond_t *cond)
#ifdef USE_WIN32_THREADS #ifdef USE_WIN32_THREADS
CloseHandle(cond->event); CloseHandle(cond->event);
CloseHandle(cond->ack);
#else #else
pthread_cond_destroy(&cond->cond); pthread_cond_destroy(&cond->cond);
#endif #endif
@ -365,33 +355,9 @@ void scond_free(scond_t *cond)
void scond_wait(scond_t *cond, slock_t *lock) void scond_wait(scond_t *cond, slock_t *lock)
{ {
#ifdef USE_WIN32_THREADS #ifdef USE_WIN32_THREADS
/* remember: we currently have mutex so this will be safe */ slock_unlock(lock);
cond->waiters++;
if(cond->waiting_ack)
WaitForSingleObject(cond->ack,INFINITE);
ReleaseMutex(lock->lock);
/* wait for a signaller */
WaitForSingleObject(cond->event, INFINITE); WaitForSingleObject(cond->event, INFINITE);
slock_lock(lock);
/* the algorithm hinges on this doing this stuff outside of the mutex */
/* suppose several people signal right now. Actually, only one of them can. He'll be waiting on an ack signal! *inside the mutex* */
/* we need to clear waiting_ack before we release him, otherwise it might race to set it to true and beat us */
/* also: suppose several people are waiting right now (in the above wait on `event`). */
/* well, only one of them is going to get freed by a signal; it must have been us */
/* notice that both of the waits for ack are inside the mutex; this guarantees only one of them can be waiting at a time */
/* that's essential for making this safe */
//if(cond->waiting_ack)
{
cond->waiting_ack = false;
SetEvent(cond->ack);
}
/* reacquire mutex and finish up */
WaitForSingleObject(lock->lock, INFINITE);
cond->waiters--;
#else #else
pthread_cond_wait(&cond->cond, &lock->lock); pthread_cond_wait(&cond->cond, &lock->lock);
#endif #endif
@ -426,22 +392,7 @@ int scond_broadcast(scond_t *cond)
void scond_signal(scond_t *cond) void scond_signal(scond_t *cond)
{ {
#ifdef USE_WIN32_THREADS #ifdef USE_WIN32_THREADS
/* remember: we currently have mutex */
if(cond->waiters == 0) return;
/* OK, someone is waiting for a signal */
/* if we're waiting for an ack, we can't proceed until we receive an ack (signifies that the event is freed up from the waiter destined to be waked by it) */
if(cond->waiting_ack)
WaitForSingleObject(cond->ack,INFINITE);
/* before any further waits or signals, we'll need to wait for a waiter to wake up */
cond->waiting_ack = true;
/* the main wakeup event. the winning waiter definitely won't wake up this moment since we're in a mutex. */
SetEvent(cond->event); SetEvent(cond->event);
#else #else
pthread_cond_signal(&cond->cond); pthread_cond_signal(&cond->cond);
#endif #endif

View File

@ -1,5 +1,5 @@
/* /*
Copyright (C) 2009-2016 DeSmuME team Copyright (C) 2009-2015 DeSmuME team
This file is free software: you can redistribute it and/or modify This file is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -16,7 +16,6 @@
*/ */
#include <stdio.h> #include <stdio.h>
#include <assert.h>
#include "types.h" #include "types.h"
#include "task.h" #include "task.h"
@ -52,14 +51,11 @@ int getOnlineCores (void)
#endif #endif
} }
static void thunkTaskProc(void *arg);
class Task::Impl { class Task::Impl {
private: private:
sthread_t* thread; sthread_t* _thread;
friend void thunkTaskProc(void* arg); bool _isThreadRunning;
void taskProc();
public: public:
Impl(); Impl();
~Impl(); ~Impl();
@ -68,141 +64,135 @@ public:
void execute(const TWork &work, void *param); void execute(const TWork &work, void *param);
void* finish(); void* finish();
void shutdown(); void shutdown();
void initialize();
slock_t *mutex; slock_t *mutex;
scond_t *workCond; scond_t *condWork;
volatile bool workFlag, finishFlag, exitFlag; TWork workFunc;
volatile TWork workFunc; void *workFuncParam;
void * volatile workFuncParam; void *ret;
void * volatile ret; bool exitThread;
bool started;
}; };
static void thunkTaskProc(void *arg) static void taskProc(void *arg)
{ {
Task::Impl *ctx = (Task::Impl *)arg; Task::Impl *ctx = (Task::Impl *)arg;
ctx->taskProc();
}
void Task::Impl::taskProc() do {
{ slock_lock(ctx->mutex);
for(;;)
{
slock_lock(mutex);
if(!workFlag) while (ctx->workFunc == NULL && !ctx->exitThread) {
scond_wait(workCond, mutex); scond_wait(ctx->condWork, ctx->mutex);
workFlag = false; }
ret = workFunc(workFuncParam); if (ctx->workFunc != NULL) {
ctx->ret = ctx->workFunc(ctx->workFuncParam);
} else {
ctx->ret = NULL;
}
finishFlag = true; ctx->workFunc = NULL;
scond_signal(workCond); scond_signal(ctx->condWork);
slock_unlock(mutex); slock_unlock(ctx->mutex);
if(exitFlag) } while(!ctx->exitThread);
break;
}
}
static void* killTask(void* task)
{
((Task::Impl*)task)->exitFlag = true;
return NULL;
} }
Task::Impl::Impl() Task::Impl::Impl()
: started(false)
{ {
_isThreadRunning = false;
workFunc = NULL;
workFuncParam = NULL;
ret = NULL;
exitThread = false;
mutex = slock_new();
condWork = scond_new();
} }
Task::Impl::~Impl() Task::Impl::~Impl()
{ {
shutdown(); shutdown();
} slock_free(mutex);
scond_free(condWork);
void Task::Impl::initialize()
{
thread = NULL;
workFunc = NULL;
workCond = NULL;
workFunc = NULL;
workFuncParam = NULL;
workFlag = finishFlag = exitFlag = false;
ret = NULL;
started = false;
} }
void Task::Impl::start(bool spinlock) void Task::Impl::start(bool spinlock)
{ {
//check user error slock_lock(this->mutex);
assert(!started);
if(started) shutdown(); if (this->_isThreadRunning) {
slock_unlock(this->mutex);
return;
}
initialize(); this->workFunc = NULL;
mutex = slock_new(); this->workFuncParam = NULL;
workCond = scond_new(); this->ret = NULL;
this->exitThread = false;
slock_lock(mutex); this->_thread = sthread_create(&taskProc,this);
this->_isThreadRunning = true;
thread = sthread_create(&thunkTaskProc,this);
started = true;
slock_unlock(mutex);
}
void Task::Impl::shutdown()
{
if(!started) return;
//nobody should shutdown while a task is still running;
//it would imply that we're in some kind of shutdown process, and datastructures might be getting freed while a worker is still working on it.
//nonetheless, _troublingly_, it seems like we do that now, so for now let's try to let that work finish instead of blowing up when it isn't finished.
//assert(!workFunc);
finish();
//a new task which sets the kill flag
execute(killTask,this);
finish();
started = false;
sthread_join(thread);
scond_free(workCond);
slock_free(mutex);
}
void* Task::Impl::finish()
{
//no work running; nothing to do
if(!workFunc)
return NULL;
slock_lock(mutex);
if(!finishFlag)
scond_wait(workCond, mutex);
finishFlag = false;
slock_unlock(this->mutex); slock_unlock(this->mutex);
workFunc = NULL;
return ret;
} }
void Task::Impl::execute(const TWork &work, void *param) void Task::Impl::execute(const TWork &work, void *param)
{ {
slock_lock(this->mutex); slock_lock(this->mutex);
workFunc = work; if (work == NULL || !this->_isThreadRunning) {
workFuncParam = param; slock_unlock(this->mutex);
workFlag = true; return;
scond_signal(workCond); }
this->workFunc = work;
this->workFuncParam = param;
scond_signal(this->condWork);
slock_unlock(this->mutex);
}
void* Task::Impl::finish()
{
void *returnValue = NULL;
slock_lock(this->mutex);
if (!this->_isThreadRunning) {
slock_unlock(this->mutex);
return returnValue;
}
while (this->workFunc != NULL) {
scond_wait(this->condWork, this->mutex);
}
returnValue = this->ret;
slock_unlock(this->mutex);
return returnValue;
}
void Task::Impl::shutdown()
{
slock_lock(this->mutex);
if (!this->_isThreadRunning) {
slock_unlock(this->mutex);
return;
}
this->workFunc = NULL;
this->exitThread = true;
scond_signal(this->condWork);
slock_unlock(this->mutex);
sthread_join(this->_thread);
slock_lock(this->mutex);
this->_isThreadRunning = false;
slock_unlock(this->mutex); slock_unlock(this->mutex);
} }