Hook and fix up EmulationWorker -> threading works, pick'n'pile is happy.

This commit is contained in:
Christian Speckner 2018-06-07 23:38:14 +02:00
parent ea94f5e795
commit 8edc597189
6 changed files with 155 additions and 93 deletions

View File

@ -52,6 +52,7 @@
"new": "cpp", "new": "cpp",
"typeinfo": "cpp", "typeinfo": "cpp",
"__mutex_base": "cpp", "__mutex_base": "cpp",
"mutex": "cpp" "mutex": "cpp",
"condition_variable": "cpp"
} }
} }

View File

@ -101,7 +101,7 @@ Int16* AudioQueue::enqueue(Int16* fragment)
if (mySize < capacity) mySize++; if (mySize < capacity) mySize++;
else { else {
myNextFragment = (myNextFragment + 1) % capacity; myNextFragment = (myNextFragment + 1) % capacity;
(cerr << "audio buffer overflow\n").flush(); //(cerr << "audio buffer overflow\n").flush();
} }
return newFragment; return newFragment;

View File

@ -24,7 +24,7 @@
using namespace std::chrono; using namespace std::chrono;
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
EmulationWorker::EmulationWorker(TIA& tia) : myTia(tia) EmulationWorker::EmulationWorker() : myPendingSignal(Signal::none), myState(State::initializing)
{ {
std::mutex mutex; std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
@ -34,7 +34,7 @@ EmulationWorker::EmulationWorker(TIA& tia) : myTia(tia)
&EmulationWorker::threadMain, this, &threadInitialized, &mutex &EmulationWorker::threadMain, this, &threadInitialized, &mutex
); );
// Wait until the thread has acquired myMutex and moved on // Wait until the thread has acquired myWakeupMutex and moved on
while (myState == State::initializing) threadInitialized.wait(lock); while (myState == State::initializing) threadInitialized.wait(lock);
} }
@ -43,11 +43,11 @@ EmulationWorker::~EmulationWorker()
{ {
// This has to run in a block in order to release the mutex before joining // This has to run in a block in order to release the mutex before joining
{ {
std::unique_lock<std::mutex> lock(myMutex); std::unique_lock<std::mutex> lock(myWakeupMutex);
if (myState != State::exception) { if (myState != State::exception) {
myPendingSignal = Signal::quit; signalQuit();
mySignalCondition.notify_one(); myWakeupCondition.notify_one();
} }
} }
@ -69,23 +69,22 @@ void EmulationWorker::handlePossibleException()
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::start(uInt32 cyclesPerSecond, uInt32 maxCycles, uInt32 minCycles, DispatchResult* dispatchResult) void EmulationWorker::start(uInt32 cyclesPerSecond, uInt32 maxCycles, uInt32 minCycles, DispatchResult* dispatchResult, TIA* tia)
{ {
// Optimization: run this in a block to unlock the mutex before notifying the thread; waitForSignalClear();
// otherwise, the thread would immediatelly block again until we have released the mutex,
// sacrificing a timeslice
{
// Aquire the mutex -> wait until the thread is suspended // Aquire the mutex -> wait until the thread is suspended
std::unique_lock<std::mutex> lock(myMutex); std::unique_lock<std::mutex> lock(myWakeupMutex);
// Pass on possible exceptions // Pass on possible exceptions
handlePossibleException(); handlePossibleException();
// NB: The thread does not suspend execution in State::initialized // NB: The thread does not suspend execution in State::initialized
if (myState != State::waitingForResume) if (myState != State::waitingForResume)
throw runtime_error("start called on running or dead worker"); fatal("start called on running or dead worker");
// Store the parameters for emulation // Store the parameters for emulation
myTia = tia;
myCyclesPerSecond = cyclesPerSecond; myCyclesPerSecond = cyclesPerSecond;
myMaxCycles = maxCycles; myMaxCycles = maxCycles;
myMinCycles = minCycles; myMinCycles = minCycles;
@ -93,51 +92,50 @@ void EmulationWorker::start(uInt32 cyclesPerSecond, uInt32 maxCycles, uInt32 min
// Set the signal... // Set the signal...
myPendingSignal = Signal::resume; myPendingSignal = Signal::resume;
}
// ... and wakeup the thread // ... and wakeup the thread
mySignalCondition.notify_one(); myWakeupCondition.notify_one();
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::stop() uInt64 EmulationWorker::stop()
{ {
// See EmulationWorker::start for the gory details waitForSignalClear();
{
std::unique_lock<std::mutex> lock(myMutex); std::unique_lock<std::mutex> lock(myWakeupMutex);
handlePossibleException(); handlePossibleException();
// If the worker has stopped on its own, we return // If the worker has stopped on its own, we return
if (myState == State::waitingForResume) return; if (myState == State::waitingForResume) return 0;
// NB: The thread does not suspend execution in State::initialized or State::running // NB: The thread does not suspend execution in State::initialized or State::running
if (myState != State::waitingForStop) if (myState != State::waitingForStop)
throw runtime_error("stop called on a dead worker"); fatal("stop called on a dead worker");
myPendingSignal = Signal::stop; myPendingSignal = Signal::stop;
}
mySignalCondition.notify_one(); myWakeupCondition.notify_one();
return myTotalCycles;
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::threadMain(std::condition_variable* initializedCondition, std::mutex* initializationMutex) void EmulationWorker::threadMain(std::condition_variable* initializedCondition, std::mutex* initializationMutex)
{ {
std::unique_lock<std::mutex> lock(myMutex); std::unique_lock<std::mutex> lock(myWakeupMutex);
try { try {
// Optimization: release the lock before notifying our parent -> saves a timeslice
{ {
// Wait until our parent releases the lock and sleeps // Wait until our parent releases the lock and sleeps
std::lock_guard<std::mutex> guard(*initializationMutex); std::lock_guard<std::mutex> guard(*initializationMutex);
// Update the state... // Update the state...
myState = State::initialized; myState = State::initialized;
}
// ... and wake up our parent to notifiy that we have initialized. From this point, the // ... and wake up our parent to notifiy that we have initialized. From this point, the
// parent can safely assume that we are running while the mutex is locked. // parent can safely assume that we are running while the mutex is locked.
initializedCondition->notify_one(); initializedCondition->notify_one();
}
while (myPendingSignal != Signal::quit) handleWakeup(lock); while (myPendingSignal != Signal::quit) handleWakeup(lock);
} }
@ -153,7 +151,7 @@ void EmulationWorker::handleWakeup(std::unique_lock<std::mutex>& lock)
switch (myState) { switch (myState) {
case State::initialized: case State::initialized:
myState = State::waitingForResume; myState = State::waitingForResume;
mySignalCondition.wait(lock); myWakeupCondition.wait(lock);
break; break;
case State::waitingForResume: case State::waitingForResume:
@ -165,7 +163,7 @@ void EmulationWorker::handleWakeup(std::unique_lock<std::mutex>& lock)
break; break;
default: default:
throw runtime_error("wakeup in invalid worker state"); fatal("wakeup in invalid worker state");
} }
} }
@ -174,20 +172,21 @@ void EmulationWorker::handleWakeupFromWaitingForResume(std::unique_lock<std::mut
{ {
switch (myPendingSignal) { switch (myPendingSignal) {
case Signal::resume: case Signal::resume:
myPendingSignal = Signal::none; clearSignal();
myVirtualTime = high_resolution_clock::now(); myVirtualTime = high_resolution_clock::now();
myTotalCycles = 0;
dispatchEmulation(lock); dispatchEmulation(lock);
break; break;
case Signal::none: case Signal::none:
mySignalCondition.wait(lock); myWakeupCondition.wait(lock);
break; break;
case Signal::quit: case Signal::quit:
break; break;
default: default:
throw runtime_error("invalid signal while waiting for resume"); fatal("invalid signal while waiting for resume");
} }
} }
@ -197,16 +196,16 @@ void EmulationWorker::handleWakeupFromWaitingForStop(std::unique_lock<std::mutex
switch (myPendingSignal) { switch (myPendingSignal) {
case Signal::stop: case Signal::stop:
myState = State::waitingForResume; myState = State::waitingForResume;
myPendingSignal = Signal::none; clearSignal();
mySignalCondition.wait(lock); myWakeupCondition.wait(lock);
break; break;
case Signal::none: case Signal::none:
if (myVirtualTime <= high_resolution_clock::now()) if (myVirtualTime <= high_resolution_clock::now())
dispatchEmulation(lock); dispatchEmulation(lock);
else else
mySignalCondition.wait_until(lock, myVirtualTime); myWakeupCondition.wait_until(lock, myVirtualTime);
break; break;
@ -214,7 +213,7 @@ void EmulationWorker::handleWakeupFromWaitingForStop(std::unique_lock<std::mutex
break; break;
default: default:
throw runtime_error("invalid signal while waiting for stop"); fatal("invalid signal while waiting for stop");
} }
} }
@ -226,29 +225,62 @@ void EmulationWorker::dispatchEmulation(std::unique_lock<std::mutex>& lock)
uInt64 totalCycles = 0; uInt64 totalCycles = 0;
do { do {
myTia.update(*myDispatchResult, totalCycles > 0 ? myMinCycles - totalCycles : myMaxCycles); myTia->update(*myDispatchResult, totalCycles > 0 ? myMinCycles - totalCycles : myMaxCycles);
totalCycles += myDispatchResult->getCycles();
} while (totalCycles < myMinCycles && myDispatchResult->getStatus() == DispatchResult::Status::ok); } while (totalCycles < myMinCycles && myDispatchResult->getStatus() == DispatchResult::Status::ok);
myTotalCycles += totalCycles;
bool continueEmulating = false;
if (myDispatchResult->getStatus() == DispatchResult::Status::ok) { if (myDispatchResult->getStatus() == DispatchResult::Status::ok) {
// If emulation finished successfully, we can go for another round // If emulation finished successfully, we can go for another round
duration<double> timesliceSeconds(static_cast<double>(totalCycles) / static_cast<double>(myCyclesPerSecond)); duration<double> timesliceSeconds(static_cast<double>(totalCycles) / static_cast<double>(myCyclesPerSecond));
myVirtualTime += duration_cast<high_resolution_clock::duration>(timesliceSeconds); myVirtualTime += duration_cast<high_resolution_clock::duration>(timesliceSeconds);
myState = State::waitingForStop; myState = State::waitingForStop;
continueEmulating = myVirtualTime > high_resolution_clock::now();
if (myVirtualTime > high_resolution_clock::now())
// If we can keep up with the emulation, we sleep
mySignalCondition.wait_until(lock, myVirtualTime);
else {
// If we are already lagging behind, we briefly relinquish control over the mutex
// and yield to scheduler, to make sure that the main thread has a chance to stop us
lock.release();
std::this_thread::yield();
lock.lock();
} }
if (continueEmulating) {
myState = State::waitingForStop;
myWakeupCondition.wait_until(lock, myVirtualTime);
} else { } else {
// If execution trapped, we stop immediatelly.
myState = State::waitingForResume; myState = State::waitingForResume;
mySignalCondition.wait(lock); myWakeupCondition.wait(lock);
} }
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::clearSignal()
{
std::unique_lock<std::mutex> lock(mySignalChangeMutex);
myPendingSignal = Signal::none;
mySignalChangeCondition.notify_one();
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::signalQuit()
{
std::unique_lock<std::mutex> lock(mySignalChangeMutex);
myPendingSignal = Signal::quit;
mySignalChangeCondition.notify_one();
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::waitForSignalClear()
{
std::unique_lock<std::mutex> lock(mySignalChangeMutex);
while (myPendingSignal != Signal::none && myPendingSignal != Signal::quit)
mySignalChangeCondition.wait(lock);
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
void EmulationWorker::fatal(string message)
{
(cerr << "FATAL in emulation worker: " << message << std::endl).flush();
throw runtime_error(message);
}

View File

@ -43,13 +43,13 @@ class EmulationWorker
public: public:
EmulationWorker(TIA& tia); EmulationWorker();
~EmulationWorker(); ~EmulationWorker();
void start(uInt32 cyclesPerSecond, uInt32 maxCycles, uInt32 minCycles, DispatchResult* dispatchResult); void start(uInt32 cyclesPerSecond, uInt32 maxCycles, uInt32 minCycles, DispatchResult* dispatchResult, TIA* tia);
void stop(); uInt64 stop();
private: private:
@ -58,6 +58,12 @@ class EmulationWorker
// Passing references into a thread is awkward and requires std::ref -> use pointers here // Passing references into a thread is awkward and requires std::ref -> use pointers here
void threadMain(std::condition_variable* initializedCondition, std::mutex* initializationMutex); void threadMain(std::condition_variable* initializedCondition, std::mutex* initializationMutex);
void clearSignal();
void signalQuit();
void waitForSignalClear();
void handleWakeup(std::unique_lock<std::mutex>& lock); void handleWakeup(std::unique_lock<std::mutex>& lock);
void handleWakeupFromWaitingForResume(std::unique_lock<std::mutex>& lock); void handleWakeupFromWaitingForResume(std::unique_lock<std::mutex>& lock);
@ -66,14 +72,20 @@ class EmulationWorker
void dispatchEmulation(std::unique_lock<std::mutex>& lock); void dispatchEmulation(std::unique_lock<std::mutex>& lock);
void fatal(string message);
private: private:
TIA& myTia; TIA* myTia;
std::thread myThread; std::thread myThread;
std::condition_variable mySignalCondition; std::condition_variable myWakeupCondition;
std::mutex myMutex; std::mutex myWakeupMutex;
std::condition_variable mySignalChangeCondition;
std::mutex mySignalChangeMutex;
std::exception_ptr myPendingException; std::exception_ptr myPendingException;
Signal myPendingSignal; Signal myPendingSignal;
// The initial access to myState is not synchronized -> make this atomic // The initial access to myState is not synchronized -> make this atomic
@ -84,7 +96,18 @@ class EmulationWorker
uInt32 myMinCycles; uInt32 myMinCycles;
DispatchResult* myDispatchResult; DispatchResult* myDispatchResult;
uInt64 myTotalCycles;
std::chrono::time_point<std::chrono::high_resolution_clock> myVirtualTime; std::chrono::time_point<std::chrono::high_resolution_clock> myVirtualTime;
private:
EmulationWorker(const EmulationWorker&) = delete;
EmulationWorker(EmulationWorker&&) = delete;
EmulationWorker& operator=(const EmulationWorker&) = delete;
EmulationWorker& operator=(EmulationWorker&&) = delete;
}; };
#endif // EMULATION_WORKER_HXX #endif // EMULATION_WORKER_HXX

View File

@ -54,6 +54,7 @@
#include "Version.hxx" #include "Version.hxx"
#include "TIA.hxx" #include "TIA.hxx"
#include "DispatchResult.hxx" #include "DispatchResult.hxx"
#include "EmulationWorker.hxx"
#include "OSystem.hxx" #include "OSystem.hxx"
@ -637,27 +638,35 @@ float OSystem::frameRate() const
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
double OSystem::dispatchEmulation(uInt32 cyclesPerSecond) double OSystem::dispatchEmulation(EmulationWorker& emulationWorker)
{ {
if (!myConsole) return 0.; if (!myConsole) return 0.;
Int64 totalCycles = 0; TIA& tia(myConsole->tia());
const Int64 minCycles = myConsole->emulationTiming().minCyclesPerTimeslice(); EmulationTiming& timing(myConsole->emulationTiming());
const Int64 maxCycles = myConsole->emulationTiming().maxCyclesPerTimeslice();
DispatchResult dispatchResult; DispatchResult dispatchResult;
do { bool framePending = tia.newFramePending();
myConsole->tia().update(dispatchResult, totalCycles > 0 ? minCycles - totalCycles : maxCycles); if (framePending) tia.renderToFrameBuffer();
totalCycles += dispatchResult.getCycles(); emulationWorker.start(
} while (totalCycles < minCycles && dispatchResult.getStatus() == DispatchResult::Status::ok); timing.cyclesPerSecond(),
timing.maxCyclesPerTimeslice(),
timing.minCyclesPerTimeslice(),
&dispatchResult,
&tia
);
if (framePending) myFrameBuffer->updateInEmulationMode();
uInt64 totalCycles = emulationWorker.stop();
if (dispatchResult.getStatus() == DispatchResult::Status::debugger) myDebugger->start(); if (dispatchResult.getStatus() == DispatchResult::Status::debugger) myDebugger->start();
if (dispatchResult.getStatus() == DispatchResult::Status::ok && myEventHandler->frying()) if (dispatchResult.getStatus() == DispatchResult::Status::ok && myEventHandler->frying())
myConsole->fry(); myConsole->fry();
return static_cast<double>(totalCycles) / static_cast<double>(cyclesPerSecond); return static_cast<double>(totalCycles) / static_cast<double>(timing.cyclesPerSecond());
} }
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -666,6 +675,7 @@ void OSystem::mainLoop()
// Sleep-based wait: good for CPU, bad for graphical sync // Sleep-based wait: good for CPU, bad for graphical sync
bool busyWait = mySettings->getString("timing") != "sleep"; bool busyWait = mySettings->getString("timing") != "sleep";
time_point<high_resolution_clock> virtualTime = high_resolution_clock::now(); time_point<high_resolution_clock> virtualTime = high_resolution_clock::now();
EmulationWorker emulationWorker;
for(;;) for(;;)
{ {
@ -674,14 +684,9 @@ void OSystem::mainLoop()
double timesliceSeconds; double timesliceSeconds;
if (myEventHandler->state() == EventHandlerState::EMULATION) { if (myEventHandler->state() == EventHandlerState::EMULATION)
timesliceSeconds = dispatchEmulation(myConsole ? myConsole->emulationTiming().cyclesPerSecond() : 1); timesliceSeconds = dispatchEmulation(emulationWorker);
else {
if (myConsole && myConsole->tia().newFramePending()) {
myConsole->tia().renderToFrameBuffer();
myFrameBuffer->updateInEmulationMode();
}
} else {
timesliceSeconds = 1. / 30.; timesliceSeconds = 1. / 30.;
myFrameBuffer->update(); myFrameBuffer->update();
} }

View File

@ -38,6 +38,7 @@ class Settings;
class Sound; class Sound;
class StateManager; class StateManager;
class VideoDialog; class VideoDialog;
class EmulationWorker;
#include "FSNode.hxx" #include "FSNode.hxx"
#include "FrameBufferConstants.hxx" #include "FrameBufferConstants.hxx"
@ -563,7 +564,7 @@ class OSystem
void validatePath(string& path, const string& setting, void validatePath(string& path, const string& setting,
const string& defaultpath); const string& defaultpath);
double dispatchEmulation(uInt32 cyclesPerSecond); double dispatchEmulation(EmulationWorker& emulationWorker);
// Following constructors and assignment operators not supported // Following constructors and assignment operators not supported
OSystem(const OSystem&) = delete; OSystem(const OSystem&) = delete;