diff --git a/.gdbinit b/.gdbinit new file mode 100644 index 000000000..09b4af30f --- /dev/null +++ b/.gdbinit @@ -0,0 +1,10 @@ +# Ignore HighResolutionTimer custom event +handle SIG34 nostop noprint +# Ignore PosixTimer custom event +handle SIG35 nostop noprint +# Ignore PosixThread exit event +handle SIG32 nostop noprint +# Ignore PosixThread suspend event +handle SIG36 nostop noprint +# Ignore PosixThread user callback event +handle SIG37 nostop noprint diff --git a/src/xenia/app/emulator_window.cc b/src/xenia/app/emulator_window.cc index beec76939..a312467b6 100644 --- a/src/xenia/app/emulator_window.cc +++ b/src/xenia/app/emulator_window.cc @@ -70,8 +70,8 @@ std::unique_ptr EmulatorWindow::Create(Emulator* emulator) { std::unique_ptr emulator_window(new EmulatorWindow(emulator)); emulator_window->loop()->PostSynchronous([&emulator_window]() { - xe::threading::set_name("Win32 Loop"); - xe::Profiler::ThreadEnter("Win32 Loop"); + xe::threading::set_name("Windowing Loop"); + xe::Profiler::ThreadEnter("Windowing Loop"); if (!emulator_window->Initialize()) { xe::FatalError("Failed to initialize main window"); diff --git a/src/xenia/apu/xma_decoder.cc b/src/xenia/apu/xma_decoder.cc index 52e0b61a6..ac79182c7 100644 --- a/src/xenia/apu/xma_decoder.cc +++ b/src/xenia/apu/xma_decoder.cc @@ -133,7 +133,7 @@ X_STATUS XmaDecoder::Setup(kernel::KernelState* kernel_state) { WorkerThreadMain(); return 0; })); - worker_thread_->set_name("XMA Decoder Worker"); + worker_thread_->set_name("XMA Decoder"); worker_thread_->set_can_debugger_suspend(true); worker_thread_->Create(); diff --git a/src/xenia/base/logging.cc b/src/xenia/base/logging.cc index 238969513..891e9d52c 100644 --- a/src/xenia/base/logging.cc +++ b/src/xenia/base/logging.cc @@ -72,7 +72,7 @@ class Logger { write_thread_ = xe::threading::Thread::Create({}, [this]() { WriteThread(); }); - write_thread_->set_name("xe::FileLogSink Writer"); + write_thread_->set_name("Logging Writer"); } ~Logger() { diff --git a/src/xenia/base/testing/threading_test.cc b/src/xenia/base/testing/threading_test.cc new file mode 100644 index 000000000..bfcabb9e4 --- /dev/null +++ b/src/xenia/base/testing/threading_test.cc @@ -0,0 +1,965 @@ +/** +****************************************************************************** +* Xenia : Xbox 360 Emulator Research Project * +****************************************************************************** +* Copyright 2018 Ben Vanik. All rights reserved. * +* Released under the BSD license - see LICENSE in the root for more details. * +****************************************************************************** +*/ + +#include + +#include "xenia/base/threading.h" + +#include "third_party/catch/include/catch.hpp" + +namespace xe { +namespace base { +namespace test { +using namespace threading; +using namespace std::chrono_literals; + +TEST_CASE("Fence") { + std::unique_ptr pFence; + std::unique_ptr pTimer; + + // Signal without wait + pFence = std::make_unique(); + pFence->Signal(); + + // Signal once and wait + pFence = std::make_unique(); + pFence->Signal(); + pFence->Wait(); + + // Signal twice and wait + pFence = std::make_unique(); + pFence->Signal(); + pFence->Signal(); + pFence->Wait(); + + // Test to synchronize multiple threads + std::atomic started(0); + std::atomic finished(0); + pFence = std::make_unique(); + auto func = [&pFence, &started, &finished] { + started.fetch_add(1); + pFence->Wait(); + finished.fetch_add(1); + }; + + auto threads = std::array({ + std::thread(func), + std::thread(func), + std::thread(func), + std::thread(func), + std::thread(func), + }); + + Sleep(10ms); + REQUIRE(finished.load() == 0); + + // TODO(bwrsandman): Check if this is correct behaviour: looping with Sleep + // is the only way to get fence to signal all threads on windows + for (int i = 0; i < threads.size(); ++i) { + Sleep(1ms); + pFence->Signal(); + } + REQUIRE(started.load() == threads.size()); + + for (auto& t : threads) t.join(); + REQUIRE(finished.load() == threads.size()); +} // namespace test + +TEST_CASE("Get number of logical processors") { + auto count = std::thread::hardware_concurrency(); + REQUIRE(logical_processor_count() == count); + REQUIRE(logical_processor_count() == count); + REQUIRE(logical_processor_count() == count); +} + +TEST_CASE("Enable process to set thread affinity") { + EnableAffinityConfiguration(); +} + +TEST_CASE("Yield Current Thread", "MaybeYield") { + // Run to see if there are any errors + MaybeYield(); +} + +TEST_CASE("Sync with Memory Barrier", "SyncMemory") { + // Run to see if there are any errors + SyncMemory(); +} + +TEST_CASE("Sleep Current Thread", "Sleep") { + auto wait_time = 5ms; + auto start = std::chrono::steady_clock::now(); + Sleep(wait_time); + auto duration = std::chrono::steady_clock::now() - start; + REQUIRE(duration >= wait_time); +} + +TEST_CASE("Sleep Current Thread in Alertable State", "Sleep") { + auto wait_time = 5ms; + auto start = std::chrono::steady_clock::now(); + auto result = threading::AlertableSleep(wait_time); + auto duration = std::chrono::steady_clock::now() - start; + REQUIRE(duration >= wait_time); + REQUIRE(result == threading::SleepResult::kSuccess); + + // TODO(bwrsandman): Test a Thread to return kAlerted. + // Need callback to call extended I/O function (ReadFileEx or WriteFileEx) +} + +TEST_CASE("TlsHandle") { + // Test Allocate + auto handle = threading::AllocateTlsHandle(); + + // Test Free + REQUIRE(threading::FreeTlsHandle(handle)); + REQUIRE(!threading::FreeTlsHandle(handle)); + REQUIRE(!threading::FreeTlsHandle(threading::kInvalidTlsHandle)); + + // Test setting values + handle = threading::AllocateTlsHandle(); + REQUIRE(threading::GetTlsValue(handle) == 0); + uint32_t value = 0xDEADBEEF; + threading::SetTlsValue(handle, reinterpret_cast(&value)); + auto p_received_value = threading::GetTlsValue(handle); + REQUIRE(threading::GetTlsValue(handle) != 0); + auto received_value = *reinterpret_cast(p_received_value); + REQUIRE(received_value == value); + + uintptr_t non_thread_local_value = 0; + auto thread = Thread::Create({}, [&non_thread_local_value, &handle] { + non_thread_local_value = threading::GetTlsValue(handle); + }); + + auto result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(non_thread_local_value == 0); + + // Cleanup + REQUIRE(threading::FreeTlsHandle(handle)); +} + +TEST_CASE("HighResolutionTimer") { + // The wait time is 500ms with an interval of 50ms + // Smaller values are not as precise and fail the test + const auto wait_time = 50ms; + + // Time the actual sleep duration + { + const auto interval = 5ms; + uint64_t counter = 0; + auto start = std::chrono::steady_clock::now(); + auto cb = [&counter] { ++counter; }; + auto pTimer = HighResolutionTimer::CreateRepeating(interval, cb); + Sleep(wait_time); + pTimer.reset(); + auto duration = std::chrono::steady_clock::now() - start; + + // Should have run as many times as wait_time / timer_interval plus or + // minus 1 due to imprecision of Sleep + REQUIRE(duration.count() >= wait_time.count()); + auto ratio = static_cast(duration / interval); + REQUIRE(counter >= ratio - 1); + REQUIRE(counter <= ratio + 1); + } + + // Test concurrent timers + { + const auto interval1 = 10ms; + const auto interval2 = 20ms; + uint64_t counter1 = 0; + uint64_t counter2 = 0; + auto start = std::chrono::steady_clock::now(); + auto cb1 = [&counter1] { ++counter1; }; + auto cb2 = [&counter2] { ++counter2; }; + auto pTimer1 = HighResolutionTimer::CreateRepeating(interval1, cb1); + auto pTimer2 = HighResolutionTimer::CreateRepeating(interval2, cb2); + Sleep(wait_time); + pTimer1.reset(); + pTimer2.reset(); + auto duration = std::chrono::steady_clock::now() - start; + + // Should have run as many times as wait_time / timer_interval plus or + // minus 1 due to imprecision of Sleep + REQUIRE(duration.count() >= wait_time.count()); + auto ratio1 = static_cast(duration / interval1); + auto ratio2 = static_cast(duration / interval2); + REQUIRE(counter1 >= ratio1 - 1); + REQUIRE(counter1 <= ratio1 + 1); + REQUIRE(counter2 >= ratio2 - 1); + REQUIRE(counter2 <= ratio2 + 1); + } + + // TODO(bwrsandman): Check on which thread callbacks are executed when + // spawned from differing threads +} + +TEST_CASE("Wait on Multiple Handles", "Wait") { + auto mutant = Mutant::Create(true); + auto semaphore = Semaphore::Create(10, 10); + auto event_ = Event::CreateManualResetEvent(false); + auto thread = Thread::Create({}, [&mutant, &semaphore, &event_] { + event_->Set(); + Wait(mutant.get(), false, 2ms); + semaphore->Release(1, nullptr); + Wait(mutant.get(), false, 2ms); + mutant->Release(); + }); + + std::vector handles = { + mutant.get(), + semaphore.get(), + event_.get(), + thread.get(), + }; + + auto any_result = WaitAny(handles, false, 10ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 0); + + auto all_result = WaitAll(handles, false, 10ms); + REQUIRE(all_result == WaitResult::kSuccess); +} + +TEST_CASE("Signal and Wait") { + WaitResult result; + auto mutant = Mutant::Create(true); + auto event_ = Event::CreateAutoResetEvent(false); + auto thread = Thread::Create({}, [&mutant, &event_] { + Wait(mutant.get(), false); + event_->Set(); + }); + result = Wait(event_.get(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); + result = SignalAndWait(mutant.get(), event_.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); +} + +TEST_CASE("Wait on Event", "Event") { + auto evt = Event::CreateAutoResetEvent(false); + WaitResult result; + + // Call wait on unset Event + result = Wait(evt.get(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); + + // Call wait on set Event + evt->Set(); + result = Wait(evt.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + + // Call wait on now consumed Event + result = Wait(evt.get(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); +} + +TEST_CASE("Reset Event", "Event") { + auto evt = Event::CreateAutoResetEvent(false); + WaitResult result; + + // Call wait on reset Event + evt->Set(); + evt->Reset(); + result = Wait(evt.get(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); + + // Test resetting the unset event + evt->Reset(); + result = Wait(evt.get(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); + + // Test setting the reset event + evt->Set(); + result = Wait(evt.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); +} + +TEST_CASE("Wait on Multiple Events", "Event") { + auto events = std::array, 4>{ + Event::CreateAutoResetEvent(false), + Event::CreateAutoResetEvent(false), + Event::CreateAutoResetEvent(false), + Event::CreateManualResetEvent(false), + }; + + std::array order = {0}; + std::atomic_uint index(0); + auto sign_in = [&order, &index](uint32_t id) { + auto i = index.fetch_add(1, std::memory_order::memory_order_relaxed); + order[i] = static_cast('0' + id); + }; + + auto threads = std::array{ + std::thread([&events, &sign_in] { + auto res = WaitAll({events[1].get(), events[3].get()}, false, 10ms); + if (res == WaitResult::kSuccess) { + sign_in(1); + } + }), + std::thread([&events, &sign_in] { + auto res = WaitAny({events[0].get(), events[2].get()}, false, 10ms); + if (res.first == WaitResult::kSuccess) { + sign_in(2); + } + }), + std::thread([&events, &sign_in] { + auto res = WaitAll({events[0].get(), events[2].get(), events[3].get()}, + false, 10ms); + if (res == WaitResult::kSuccess) { + sign_in(3); + } + }), + std::thread([&events, &sign_in] { + auto res = WaitAny({events[1].get(), events[3].get()}, false, 10ms); + if (res.first == WaitResult::kSuccess) { + sign_in(4); + } + }), + }; + + Sleep(1ms); + events[3]->Set(); // Signals thread id=4 and stays on for 1 and 3 + Sleep(1ms); + events[1]->Set(); // Signals thread id=1 + Sleep(1ms); + events[0]->Set(); // Signals thread id=2 + Sleep(1ms); + events[2]->Set(); // Partial signals thread id=3 + events[0]->Set(); // Signals thread id=3 + + for (auto& t : threads) { + t.join(); + } + + INFO(order.data()); + REQUIRE(order[0] == '4'); + // TODO(bwrsandman): Order is not always maintained on linux + // REQUIRE(order[1] == '1'); + // REQUIRE(order[2] == '2'); + // REQUIRE(order[3] == '3'); +} + +TEST_CASE("Wait on Semaphore", "Semaphore") { + WaitResult result; + std::unique_ptr sem; + int previous_count = 0; + + // Wait on semaphore with no room + sem = Semaphore::Create(0, 5); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kTimeout); + + // Add room in semaphore + REQUIRE(sem->Release(2, &previous_count)); + REQUIRE(previous_count == 0); + REQUIRE(sem->Release(1, &previous_count)); + REQUIRE(previous_count == 2); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(sem->Release(1, &previous_count)); + REQUIRE(previous_count == 2); + + // Set semaphore over maximum_count + sem = Semaphore::Create(5, 5); + previous_count = -1; + REQUIRE_FALSE(sem->Release(1, &previous_count)); + REQUIRE(previous_count == -1); + REQUIRE_FALSE(sem->Release(10, &previous_count)); + REQUIRE(previous_count == -1); + sem = Semaphore::Create(0, 5); + REQUIRE_FALSE(sem->Release(10, &previous_count)); + REQUIRE(previous_count == -1); + REQUIRE_FALSE(sem->Release(10, &previous_count)); + REQUIRE(previous_count == -1); + + // Test invalid Release parameters + REQUIRE_FALSE(sem->Release(0, &previous_count)); + REQUIRE(previous_count == -1); + REQUIRE_FALSE(sem->Release(-1, &previous_count)); + REQUIRE(previous_count == -1); + + // Wait on fully available semaphore + sem = Semaphore::Create(5, 5); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kTimeout); + + // Semaphore between threads + sem = Semaphore::Create(5, 5); + Sleep(1ms); + // Occupy the semaphore with 5 threads + auto func = [&sem] { + auto res = Wait(sem.get(), false, 10ms); + Sleep(50ms); + if (res == WaitResult::kSuccess) { + sem->Release(1, nullptr); + } + }; + auto threads = std::array{ + std::thread(func), std::thread(func), std::thread(func), + std::thread(func), std::thread(func), + }; + // Give threads time to acquire semaphore + Sleep(1ms); + // Attempt to acquire full semaphore with current (6th) thread + result = Wait(sem.get(), false, 2ms); + REQUIRE(result == WaitResult::kTimeout); + // Give threads time to release semaphore + for (auto& t : threads) { + t.join(); + } + result = Wait(sem.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + sem->Release(1, &previous_count); + REQUIRE(previous_count == 4); + + // Test invalid construction parameters + // These are invalid according to documentation + // TODO(bwrsandman): Many of these invalid invocations succeed + sem = Semaphore::Create(-1, 5); + // REQUIRE(sem.get() == nullptr); + sem = Semaphore::Create(10, 5); + // REQUIRE(sem.get() == nullptr); + sem = Semaphore::Create(0, 0); + // REQUIRE(sem.get() == nullptr); + sem = Semaphore::Create(0, -1); + // REQUIRE(sem.get() == nullptr); +} + +TEST_CASE("Wait on Multiple Semaphores", "Semaphore") { + WaitResult all_result; + std::pair any_result; + int previous_count; + std::unique_ptr sem0, sem1; + + // Test Wait all which should fail + sem0 = Semaphore::Create(0, 5); + sem1 = Semaphore::Create(5, 5); + all_result = WaitAll({sem0.get(), sem1.get()}, false, 1ms); + REQUIRE(all_result == WaitResult::kTimeout); + previous_count = -1; + REQUIRE(sem0->Release(1, &previous_count)); + REQUIRE(previous_count == 0); + previous_count = -1; + REQUIRE_FALSE(sem1->Release(1, &previous_count)); + REQUIRE(previous_count == -1); + + // Test Wait all again which should succeed + sem0 = Semaphore::Create(1, 5); + sem1 = Semaphore::Create(5, 5); + all_result = WaitAll({sem0.get(), sem1.get()}, false, 1ms); + REQUIRE(all_result == WaitResult::kSuccess); + previous_count = -1; + REQUIRE(sem0->Release(1, &previous_count)); + REQUIRE(previous_count == 0); + previous_count = -1; + REQUIRE(sem1->Release(1, &previous_count)); + REQUIRE(previous_count == 4); + + // Test Wait Any which should fail + sem0 = Semaphore::Create(0, 5); + sem1 = Semaphore::Create(0, 5); + any_result = WaitAny({sem0.get(), sem1.get()}, false, 1ms); + REQUIRE(any_result.first == WaitResult::kTimeout); + REQUIRE(any_result.second == 0); + previous_count = -1; + REQUIRE(sem0->Release(1, &previous_count)); + REQUIRE(previous_count == 0); + previous_count = -1; + REQUIRE(sem1->Release(1, &previous_count)); + REQUIRE(previous_count == 0); + + // Test Wait Any which should succeed + sem0 = Semaphore::Create(0, 5); + sem1 = Semaphore::Create(5, 5); + any_result = WaitAny({sem0.get(), sem1.get()}, false, 1ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 1); + previous_count = -1; + REQUIRE(sem0->Release(1, &previous_count)); + REQUIRE(previous_count == 0); + previous_count = -1; + REQUIRE(sem1->Release(1, &previous_count)); + REQUIRE(previous_count == 4); +} + +TEST_CASE("Wait on Mutant", "Mutant") { + WaitResult result; + std::unique_ptr mut; + + // Release on initially owned mutant + mut = Mutant::Create(true); + REQUIRE(mut->Release()); + REQUIRE_FALSE(mut->Release()); + + // Release on initially not-owned mutant + mut = Mutant::Create(false); + REQUIRE_FALSE(mut->Release()); + + // Wait on initially owned mutant + mut = Mutant::Create(true); + result = Wait(mut.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(mut->Release()); + REQUIRE(mut->Release()); + REQUIRE_FALSE(mut->Release()); + + // Wait on initially not owned mutant + mut = Mutant::Create(false); + result = Wait(mut.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(mut->Release()); + REQUIRE_FALSE(mut->Release()); + + // Multiple waits (or locks) + mut = Mutant::Create(false); + for (int i = 0; i < 10; ++i) { + result = Wait(mut.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + } + for (int i = 0; i < 10; ++i) { + REQUIRE(mut->Release()); + } + REQUIRE_FALSE(mut->Release()); + + // Test mutants on other threads + auto thread1 = std::thread([&mut] { + Sleep(5ms); + mut = Mutant::Create(true); + Sleep(100ms); + mut->Release(); + }); + Sleep(10ms); + REQUIRE_FALSE(mut->Release()); + Sleep(10ms); + result = Wait(mut.get(), false, 50ms); + REQUIRE(result == WaitResult::kTimeout); + thread1.join(); + result = Wait(mut.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(mut->Release()); +} + +TEST_CASE("Wait on Multiple Mutants", "Mutant") { + WaitResult all_result; + std::pair any_result; + std::unique_ptr mut0, mut1; + + // Test which should fail for WaitAll and WaitAny + auto thread0 = std::thread([&mut0, &mut1] { + mut0 = Mutant::Create(true); + mut1 = Mutant::Create(true); + Sleep(5ms); + mut0->Release(); + mut1->Release(); + }); + Sleep(1ms); + all_result = WaitAll({mut0.get(), mut1.get()}, false, 1ms); + REQUIRE(all_result == WaitResult::kTimeout); + REQUIRE_FALSE(mut0->Release()); + REQUIRE_FALSE(mut1->Release()); + any_result = WaitAny({mut0.get(), mut1.get()}, false, 1ms); + REQUIRE(any_result.first == WaitResult::kTimeout); + REQUIRE(any_result.second == 0); + REQUIRE_FALSE(mut0->Release()); + REQUIRE_FALSE(mut1->Release()); + thread0.join(); + + // Test which should fail for WaitAll but not WaitAny + auto thread1 = std::thread([&mut0, &mut1] { + mut0 = Mutant::Create(true); + mut1 = Mutant::Create(false); + Sleep(5ms); + mut0->Release(); + }); + Sleep(1ms); + all_result = WaitAll({mut0.get(), mut1.get()}, false, 1ms); + REQUIRE(all_result == WaitResult::kTimeout); + REQUIRE_FALSE(mut0->Release()); + REQUIRE_FALSE(mut1->Release()); + any_result = WaitAny({mut0.get(), mut1.get()}, false, 1ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 1); + REQUIRE_FALSE(mut0->Release()); + REQUIRE(mut1->Release()); + thread1.join(); + + // Test which should pass for WaitAll and WaitAny + auto thread2 = std::thread([&mut0, &mut1] { + mut0 = Mutant::Create(false); + mut1 = Mutant::Create(false); + Sleep(5ms); + }); + Sleep(1ms); + all_result = WaitAll({mut0.get(), mut1.get()}, false, 1ms); + REQUIRE(all_result == WaitResult::kSuccess); + REQUIRE(mut0->Release()); + REQUIRE(mut1->Release()); + any_result = WaitAny({mut0.get(), mut1.get()}, false, 1ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 0); + REQUIRE(mut0->Release()); + REQUIRE_FALSE(mut1->Release()); + thread2.join(); +} + +TEST_CASE("Wait on Timer", "Timer") { + WaitResult result; + std::unique_ptr timer; + + // Test Manual Reset + timer = Timer::CreateManualResetTimer(); + result = Wait(timer.get(), false, 1ms); + REQUIRE(result == WaitResult::kTimeout); + REQUIRE(timer->SetOnce(1ms)); // Signals it + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(timer.get(), false, 1ms); + REQUIRE(result == WaitResult::kSuccess); // Did not reset + + // Test Synchronization + timer = Timer::CreateSynchronizationTimer(); + result = Wait(timer.get(), false, 1ms); + REQUIRE(result == WaitResult::kTimeout); + REQUIRE(timer->SetOnce(1ms)); // Signals it + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kSuccess); + result = Wait(timer.get(), false, 1ms); + REQUIRE(result == WaitResult::kTimeout); // Did reset + + // TODO(bwrsandman): This test unexpectedly fails under windows + // Test long due time + // timer = Timer::CreateSynchronizationTimer(); + // REQUIRE(timer->SetOnce(10s)); + // result = Wait(timer.get(), false, 10ms); // Still signals under windows + // REQUIRE(result == WaitResult::kTimeout); + + // Test Repeating + REQUIRE(timer->SetRepeating(100us, 1ms)); + for (int i = 0; i < 10; ++i) { + result = Wait(timer.get(), false, 2ms); + INFO(i); + REQUIRE(result == WaitResult::kSuccess); + } + MaybeYield(); + Sleep(1ms); // Skip a few events + for (int i = 0; i < 10; ++i) { + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kSuccess); + } + // Cancel it + timer->Cancel(); + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kTimeout); + MaybeYield(); + Sleep(1ms); // Skip a few events + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kTimeout); + // Cancel with SetOnce + REQUIRE(timer->SetRepeating(1ms, 1ms)); + for (int i = 0; i < 10; ++i) { + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kSuccess); + } + REQUIRE(timer->SetOnce(100us)); + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kSuccess); // Signal from Set Once + result = Wait(timer.get(), false, 2ms); + REQUIRE(result == WaitResult::kTimeout); // No more signals from repeating +} + +TEST_CASE("Wait on Multiple Timers", "Timer") { + WaitResult all_result; + std::pair any_result; + + auto timer0 = Timer::CreateSynchronizationTimer(); + auto timer1 = Timer::CreateManualResetTimer(); + + // None signaled + all_result = WaitAll({timer0.get(), timer1.get()}, false, 1ms); + REQUIRE(all_result == WaitResult::kTimeout); + any_result = WaitAny({timer0.get(), timer1.get()}, false, 1ms); + REQUIRE(any_result.first == WaitResult::kTimeout); + REQUIRE(any_result.second == 0); + + // Some signaled + REQUIRE(timer1->SetOnce(1ms)); + all_result = WaitAll({timer0.get(), timer1.get()}, false, 10ms); + REQUIRE(all_result == WaitResult::kTimeout); + any_result = WaitAny({timer0.get(), timer1.get()}, false, 10ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 1); + + // All signaled + REQUIRE(timer0->SetOnce(1ms)); + all_result = WaitAll({timer0.get(), timer1.get()}, false, 10ms); + REQUIRE(all_result == WaitResult::kSuccess); + REQUIRE(timer0->SetOnce(1ms)); + Sleep(1ms); + any_result = WaitAny({timer0.get(), timer1.get()}, false, 10ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 0); + + // Check that timer0 reset + any_result = WaitAny({timer0.get(), timer1.get()}, false, 10ms); + REQUIRE(any_result.first == WaitResult::kSuccess); + REQUIRE(any_result.second == 1); +} + +TEST_CASE("Create and Trigger Timer Callbacks", "Timer") { + // TODO(bwrsandman): Check which thread performs callback and timing of + // callback + REQUIRE(true); +} + +TEST_CASE("Set and Test Current Thread ID", "Thread") { + // System ID + auto system_id = current_thread_system_id(); + REQUIRE(system_id > 0); + + // Thread ID + auto thread_id = current_thread_id(); + REQUIRE(thread_id == system_id); + + // Set a new thread id + const uint32_t new_thread_id = 0xDEADBEEF; + set_current_thread_id(new_thread_id); + REQUIRE(current_thread_id() == new_thread_id); + + // Set back original thread id of system + set_current_thread_id(std::numeric_limits::max()); + REQUIRE(current_thread_id() == system_id); + + // TODO(bwrsandman): Test on Thread object +} + +TEST_CASE("Set and Test Current Thread Name", "Thread") { + auto current_thread = Thread::GetCurrentThread(); + REQUIRE(current_thread); + auto old_thread_name = current_thread->name(); + + std::string new_thread_name = "Threading Test"; + REQUIRE_NOTHROW(set_name(new_thread_name)); + + // Restore the old catch.hpp thread name + REQUIRE_NOTHROW(set_name(old_thread_name)); +} + +TEST_CASE("Create and Run Thread", "Thread") { + std::unique_ptr thread; + WaitResult result; + Thread::CreationParameters params = {}; + auto func = [] { Sleep(20ns); }; + + // Create most basic case of thread + thread = Thread::Create(params, func); + REQUIRE(thread->native_handle() != nullptr); + REQUIRE_NOTHROW(thread->affinity_mask()); + REQUIRE(thread->name().empty()); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + + // Add thread name + std::string new_name = "Test thread name"; + thread = Thread::Create(params, func); + auto name = thread->name(); + INFO(name.c_str()); + REQUIRE(name.empty()); + thread->set_name(new_name); + REQUIRE(thread->name() == new_name); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + + // Use Terminate to end an infinitely looping thread + thread = Thread::Create(params, [] { + while (true) { + Sleep(1ms); + } + }); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); + thread->Terminate(-1); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + + // Call Exit from inside an infinitely looping thread + thread = Thread::Create(params, [] { + while (true) { + Thread::Exit(-1); + } + }); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + + // Call timeout wait on self + result = Wait(Thread::GetCurrentThread(), false, 5ms); + REQUIRE(result == WaitResult::kTimeout); + + params.stack_size = 16 * 1024; + thread = Thread::Create(params, [] { + while (true) { + Thread::Exit(-1); + } + }); + REQUIRE(thread != nullptr); + result = Wait(thread.get(), false, 5ms); + REQUIRE(result == WaitResult::kSuccess); + + // TODO(bwrsandman): Test with different priorities + // TODO(bwrsandman): Test setting and getting thread affinity +} + +TEST_CASE("Test Suspending Thread", "Thread") { + std::unique_ptr thread; + WaitResult result; + Thread::CreationParameters params = {}; + auto func = [] { Sleep(20us); }; + + // Create initially suspended + params.create_suspended = true; + thread = threading::Thread::Create(params, func); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kTimeout); + thread->Resume(); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kSuccess); + params.create_suspended = false; + + // Create and then suspend + thread = threading::Thread::Create(params, func); + thread->Suspend(); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kTimeout); + thread->Resume(); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kSuccess); + + // Test recursive suspend + thread = threading::Thread::Create(params, func); + thread->Suspend(); + thread->Suspend(); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kTimeout); + thread->Resume(); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kTimeout); + thread->Resume(); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kSuccess); + + // Test suspend count + uint32_t suspend_count = 0; + thread = threading::Thread::Create(params, func); + thread->Suspend(&suspend_count); + REQUIRE(suspend_count == 0); + thread->Suspend(&suspend_count); + REQUIRE(suspend_count == 1); + thread->Suspend(&suspend_count); + REQUIRE(suspend_count == 2); + thread->Resume(&suspend_count); + REQUIRE(suspend_count == 3); + thread->Resume(&suspend_count); + REQUIRE(suspend_count == 2); + thread->Resume(&suspend_count); + REQUIRE(suspend_count == 1); + thread->Suspend(&suspend_count); + REQUIRE(suspend_count == 0); + thread->Resume(&suspend_count); + REQUIRE(suspend_count == 1); + result = threading::Wait(thread.get(), false, 5ms); + REQUIRE(result == threading::WaitResult::kSuccess); +} + +TEST_CASE("Test Thread QueueUserCallback", "Thread") { + std::unique_ptr thread; + WaitResult result; + Thread::CreationParameters params = {}; + std::atomic_int order; + int is_modified; + int has_finished; + auto callback = [&is_modified, &order] { + is_modified = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }; + + // Without alertable + order = 0; + is_modified = -1; + has_finished = -1; + thread = Thread::Create(params, [&has_finished, &order] { + // Not using Alertable so callback is not registered + Sleep(9ms); + has_finished = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }); + result = Wait(thread.get(), true, 5ms); + REQUIRE(result == WaitResult::kTimeout); + REQUIRE(is_modified == -1); + thread->QueueUserCallback(callback); + result = Wait(thread.get(), true, 10ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(is_modified == -1); + REQUIRE(has_finished == 0); + + // With alertable + order = 0; + is_modified = -1; + has_finished = -1; + thread = Thread::Create(params, [&has_finished, &order] { + // Using Alertable so callback is registered + AlertableSleep(9ms); + has_finished = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }); + result = Wait(thread.get(), true, 5ms); + REQUIRE(result == WaitResult::kTimeout); + REQUIRE(is_modified == -1); + thread->QueueUserCallback(callback); + result = Wait(thread.get(), true, 10ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(is_modified == 0); + REQUIRE(has_finished == 1); + + // Test Exit command with QueueUserCallback + order = 0; + is_modified = -1; + has_finished = -1; + thread = Thread::Create(params, [&is_modified, &has_finished, &order] { + is_modified = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + // Using Alertable so callback is registered + AlertableSleep(20ms); + has_finished = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }); + result = Wait(thread.get(), true, 10ms); + REQUIRE(result == WaitResult::kTimeout); + thread->QueueUserCallback([] { Thread::Exit(0); }); + result = Wait(thread.get(), true, 50ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(is_modified == 0); + REQUIRE(has_finished == -1); + + // TODO(bwrsandman): Test alertable wait returning kUserCallback by using IO + // callbacks. +} + +} // namespace test +} // namespace base +} // namespace xe diff --git a/src/xenia/base/threading.h b/src/xenia/base/threading.h index 480b76207..dab3b53c9 100644 --- a/src/xenia/base/threading.h +++ b/src/xenia/base/threading.h @@ -32,21 +32,19 @@ class Fence { Fence() : signaled_(false) {} void Signal() { std::unique_lock lock(mutex_); - signaled_.store(true); + signaled_ = true; cond_.notify_all(); } void Wait() { std::unique_lock lock(mutex_); - while (!signaled_.load()) { - cond_.wait(lock); - } - signaled_.store(false); + cond_.wait(lock, [this] { return signaled_; }); + signaled_ = false; } private: std::mutex mutex_; std::condition_variable cond_; - std::atomic signaled_; + bool signaled_; }; // Returns the total number of logical processors in the host system. @@ -307,12 +305,12 @@ class Timer : public WaitHandle { std::chrono::milliseconds period, std::function opt_callback = nullptr) = 0; template - void SetRepeating(std::chrono::nanoseconds due_time, + bool SetRepeating(std::chrono::nanoseconds due_time, std::chrono::duration period, std::function opt_callback = nullptr) { - SetRepeating(due_time, - std::chrono::duration_cast(period), - std::move(opt_callback)); + return SetRepeating( + due_time, std::chrono::duration_cast(period), + std::move(opt_callback)); } // Stops the timer before it can be set to the signaled state and cancels @@ -358,7 +356,7 @@ class Thread : public WaitHandle { virtual uint32_t system_id() const = 0; // Returns the current name of the thread, if previously specified. - std::string name() const { return name_; } + virtual std::string name() const { return name_; } // Sets the name of the thread, used in debugging and logging. virtual void set_name(std::string name) { name_ = std::move(name); } @@ -390,7 +388,7 @@ class Thread : public WaitHandle { // Decrements a thread's suspend count. When the suspend count is decremented // to zero, the execution of the thread is resumed. - virtual bool Resume(uint32_t* out_new_suspend_count = nullptr) = 0; + virtual bool Resume(uint32_t* out_previous_suspend_count = nullptr) = 0; // Suspends the specified thread. virtual bool Suspend(uint32_t* out_previous_suspend_count = nullptr) = 0; diff --git a/src/xenia/base/threading_posix.cc b/src/xenia/base/threading_posix.cc index 926f4943d..216d6714e 100644 --- a/src/xenia/base/threading_posix.cc +++ b/src/xenia/base/threading_posix.cc @@ -13,16 +13,64 @@ #include "xenia/base/logging.h" #include +#include #include #include #include #include -#include #include +#include +#include namespace xe { namespace threading { +template +inline timespec DurationToTimeSpec( + std::chrono::duration<_Rep, _Period> duration) { + auto nanoseconds = + std::chrono::duration_cast(duration); + auto div = ldiv(nanoseconds.count(), 1000000000L); + return timespec{div.quot, div.rem}; +} + +// Thread interruption is done using user-defined signals +// This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread +// gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop +// lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false +enum class SignalType { + kHighResolutionTimer, + kTimer, + kThreadSuspend, + kThreadUserCallback, + k_Count +}; + +int GetSystemSignal(SignalType num) { + auto result = SIGRTMIN + static_cast(num); + assert_true(result < SIGRTMAX); + return result; +} + +SignalType GetSystemSignalType(int num) { + return static_cast(num - SIGRTMIN); +} + +thread_local std::array(SignalType::k_Count)> + signal_handler_installed = {}; + +static void signal_handler(int signal, siginfo_t* info, void* context); + +void install_signal_handler(SignalType type) { + if (signal_handler_installed[static_cast(type)]) return; + struct sigaction action {}; + action.sa_flags = SA_SIGINFO; + action.sa_sigaction = signal_handler; + sigemptyset(&action.sa_mask); + if (sigaction(GetSystemSignal(type), &action, nullptr) == -1) + signal_handler_installed[static_cast(type)] = true; +} + // TODO(dougvj) void EnableAffinityConfiguration() {} @@ -33,11 +81,12 @@ uint32_t current_thread_system_id() { } void set_name(const std::string& name) { - pthread_setname_np(pthread_self(), name.c_str()); + set_name(static_cast(pthread_self()), name); } void set_name(std::thread::native_handle_type handle, const std::string& name) { - pthread_setname_np(handle, name.c_str()); + assert_false(name.length() >= 16); + if (pthread_setname_np(handle, name.c_str()) != 0) assert_always(); } void MaybeYield() { @@ -48,55 +97,81 @@ void MaybeYield() { void SyncMemory() { __sync_synchronize(); } void Sleep(std::chrono::microseconds duration) { - timespec rqtp = {time_t(duration.count() / 1000000), - time_t(duration.count() % 1000)}; - nanosleep(&rqtp, nullptr); - // TODO(benvanik): spin while rmtp >0? + timespec rqtp = DurationToTimeSpec(duration); + timespec rmtp = {}; + auto p_rqtp = &rqtp; + auto p_rmtp = &rmtp; + int ret = 0; + do { + ret = nanosleep(p_rqtp, p_rmtp); + // Swap requested for remaining in case of signal interruption + // in which case, we start sleeping again for the remainder + std::swap(p_rqtp, p_rmtp); + } while (ret == -1 && errno == EINTR); } -// TODO(dougvj) Not sure how to implement the equivalent of this on POSIX. +// TODO(bwrsandman) Implement by allowing alert interrupts from IO operations +thread_local bool alertable_state_ = false; SleepResult AlertableSleep(std::chrono::microseconds duration) { - sleep(duration.count() / 1000); + alertable_state_ = true; + Sleep(duration); + alertable_state_ = false; return SleepResult::kSuccess; } -// TODO(dougvj) We can probably wrap this with pthread_key_t but the type of -// TlsHandle probably needs to be refactored TlsHandle AllocateTlsHandle() { - assert_always(); - return 0; + auto key = static_cast(-1); + auto res = pthread_key_create(&key, nullptr); + assert_zero(res); + assert_true(key != static_cast(-1)); + return static_cast(key); } -bool FreeTlsHandle(TlsHandle handle) { return true; } +bool FreeTlsHandle(TlsHandle handle) { + return pthread_key_delete(static_cast(handle)) == 0; +} uintptr_t GetTlsValue(TlsHandle handle) { - assert_always(); - return 0; + return reinterpret_cast( + pthread_getspecific(static_cast(handle))); } bool SetTlsValue(TlsHandle handle, uintptr_t value) { - assert_always(); - return false; + return pthread_setspecific(static_cast(handle), + reinterpret_cast(value)) == 0; } -// TODO(dougvj) class PosixHighResolutionTimer : public HighResolutionTimer { public: - PosixHighResolutionTimer(std::function callback) - : callback_(callback) {} - ~PosixHighResolutionTimer() override {} + explicit PosixHighResolutionTimer(std::function callback) + : callback_(std::move(callback)), timer_(nullptr) {} + ~PosixHighResolutionTimer() override { + if (timer_) timer_delete(timer_); + } bool Initialize(std::chrono::milliseconds period) { - assert_always(); - return false; + // Create timer + sigevent sev{}; + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = GetSystemSignal(SignalType::kHighResolutionTimer); + sev.sigev_value.sival_ptr = (void*)&callback_; + if (timer_create(CLOCK_REALTIME, &sev, &timer_) == -1) return false; + + // Start timer + itimerspec its{}; + its.it_value = DurationToTimeSpec(period); + its.it_interval = its.it_value; + return timer_settime(timer_, 0, &its, nullptr) != -1; } private: std::function callback_; + timer_t timer_; }; std::unique_ptr HighResolutionTimer::CreateRepeating( std::chrono::milliseconds period, std::function callback) { + install_signal_handler(SignalType::kHighResolutionTimer); auto timer = std::make_unique(std::move(callback)); if (!timer->Initialize(period)) { return nullptr; @@ -104,209 +179,664 @@ std::unique_ptr HighResolutionTimer::CreateRepeating( return std::unique_ptr(timer.release()); } -// TODO(dougvj) There really is no native POSIX handle for a single wait/signal -// construct pthreads is at a lower level with more handles for such a mechanism -// This simple wrapper class could function as our handle, but probably needs -// some more functionality -class PosixCondition { +class PosixConditionBase { public: - PosixCondition() : signal_(false) { - pthread_mutex_init(&mutex_, NULL); - pthread_cond_init(&cond_, NULL); + virtual bool Signal() = 0; + + WaitResult Wait(std::chrono::milliseconds timeout) { + bool executed; + auto predicate = [this] { return this->signaled(); }; + auto lock = std::unique_lock(mutex_); + if (predicate()) { + executed = true; + } else { + if (timeout == std::chrono::milliseconds::max()) { + cond_.wait(lock, predicate); + executed = true; // Did not time out; + } else { + executed = cond_.wait_for(lock, timeout, predicate); + } + } + if (executed) { + post_execution(); + return WaitResult::kSuccess; + } else { + return WaitResult::kTimeout; + } } - ~PosixCondition() { - pthread_mutex_destroy(&mutex_); - pthread_cond_destroy(&cond_); + static std::pair WaitMultiple( + std::vector&& handles, bool wait_all, + std::chrono::milliseconds timeout) { + using iter_t = std::vector::const_iterator; + bool executed; + auto predicate = [](auto h) { return h->signaled(); }; + + // Construct a condition for all or any depending on wait_all + auto operation = wait_all ? std::all_of + : std::any_of; + auto aggregate = [&handles, operation, predicate] { + return operation(handles.cbegin(), handles.cend(), predicate); + }; + + std::unique_lock lock(PosixConditionBase::mutex_); + + // Check if the aggregate lambda (all or any) is already satisfied + if (aggregate()) { + executed = true; + } else { + // If the aggregate is not yet satisfied and the timeout is infinite, + // wait without timeout. + if (timeout == std::chrono::milliseconds::max()) { + PosixConditionBase::cond_.wait(lock, aggregate); + executed = true; + } else { + // Wait with timeout. + executed = PosixConditionBase::cond_.wait_for(lock, timeout, aggregate); + } + } + if (executed) { + auto first_signaled = std::numeric_limits::max(); + for (auto i = 0u; i < handles.size(); ++i) { + if (handles[i]->signaled()) { + if (first_signaled > i) { + first_signaled = i; + } + handles[i]->post_execution(); + if (!wait_all) break; + } + } + return std::make_pair(WaitResult::kSuccess, first_signaled); + } else { + return std::make_pair(WaitResult::kTimeout, 0); + } } - void Signal() { - pthread_mutex_lock(&mutex_); + virtual void* native_handle() const { return cond_.native_handle(); } + + protected: + inline virtual bool signaled() const = 0; + inline virtual void post_execution() = 0; + static std::condition_variable cond_; + static std::mutex mutex_; +}; + +std::condition_variable PosixConditionBase::cond_; +std::mutex PosixConditionBase::mutex_; + +// There really is no native POSIX handle for a single wait/signal construct +// pthreads is at a lower level with more handles for such a mechanism. +// This simple wrapper class functions as our handle and uses conditional +// variables for waits and signals. +template +class PosixCondition {}; + +template <> +class PosixCondition : public PosixConditionBase { + public: + PosixCondition(bool manual_reset, bool initial_state) + : signal_(initial_state), manual_reset_(manual_reset) {} + virtual ~PosixCondition() = default; + + bool Signal() override { + auto lock = std::unique_lock(mutex_); signal_ = true; - pthread_cond_broadcast(&cond_); - pthread_mutex_unlock(&mutex_); + if (manual_reset_) { + cond_.notify_all(); + } else { + cond_.notify_one(); + } + return true; } void Reset() { - pthread_mutex_lock(&mutex_); + auto lock = std::unique_lock(mutex_); signal_ = false; - pthread_mutex_unlock(&mutex_); - } - - bool Wait(unsigned int timeout_ms) { - // Assume 0 means no timeout, not instant timeout - if (timeout_ms == 0) { - Wait(); - } - struct timespec time_to_wait; - struct timeval now; - gettimeofday(&now, NULL); - - // Add the number of seconds we want to wait to the current time - time_to_wait.tv_sec = now.tv_sec + (timeout_ms / 1000); - // Add the number of nanoseconds we want to wait to the current nanosecond - // stride - long nsec = (now.tv_usec + (timeout_ms % 1000)) * 1000; - // If we overflowed the nanosecond count then we add a second - time_to_wait.tv_sec += nsec / 1000000000UL; - // We only add nanoseconds within the 1 second stride - time_to_wait.tv_nsec = nsec % 1000000000UL; - pthread_mutex_lock(&mutex_); - while (!signal_) { - int status = pthread_cond_timedwait(&cond_, &mutex_, &time_to_wait); - if (status == ETIMEDOUT) return false; // We timed out - } - pthread_mutex_unlock(&mutex_); - return true; // We didn't time out - } - - bool Wait() { - pthread_mutex_lock(&mutex_); - while (!signal_) { - pthread_cond_wait(&cond_, &mutex_); - } - pthread_mutex_unlock(&mutex_); - return true; // Did not time out; } private: + inline bool signaled() const override { return signal_; } + inline void post_execution() override { + if (!manual_reset_) { + signal_ = false; + } + } bool signal_; - pthread_cond_t cond_; - pthread_mutex_t mutex_; + const bool manual_reset_; }; -// Native posix thread handle -template -class PosixThreadHandle : public T { +template <> +class PosixCondition : public PosixConditionBase { public: - explicit PosixThreadHandle(pthread_t handle) : handle_(handle) {} - ~PosixThreadHandle() override {} + PosixCondition(uint32_t initial_count, uint32_t maximum_count) + : count_(initial_count), maximum_count_(maximum_count) {} - protected: - void* native_handle() const override { - return reinterpret_cast(handle_); + bool Signal() override { return Release(1, nullptr); } + + bool Release(uint32_t release_count, int* out_previous_count) { + if (maximum_count_ - count_ >= release_count) { + auto lock = std::unique_lock(mutex_); + if (out_previous_count) *out_previous_count = count_; + count_ += release_count; + cond_.notify_all(); + return true; + } + return false; } - pthread_t handle_; + private: + inline bool signaled() const override { return count_ > 0; } + inline void post_execution() override { + count_--; + cond_.notify_all(); + } + uint32_t count_; + const uint32_t maximum_count_; }; -// This is wraps a condition object as our handle because posix has no single -// native handle for higher level concurrency constructs such as semaphores -template -class PosixConditionHandle : public T { +template <> +class PosixCondition : public PosixConditionBase { public: - ~PosixConditionHandle() override {} - - protected: - void* native_handle() const override { - return reinterpret_cast(const_cast(&handle_)); + explicit PosixCondition(bool initial_owner) : count_(0) { + if (initial_owner) { + count_ = 1; + owner_ = std::this_thread::get_id(); + } } - PosixCondition handle_; + bool Signal() override { return Release(); } + + bool Release() { + if (owner_ == std::this_thread::get_id() && count_ > 0) { + auto lock = std::unique_lock(mutex_); + --count_; + // Free to be acquired by another thread + if (count_ == 0) { + cond_.notify_one(); + } + return true; + } + return false; + } + + void* native_handle() const override { return mutex_.native_handle(); } + + private: + inline bool signaled() const override { + return count_ == 0 || owner_ == std::this_thread::get_id(); + } + inline void post_execution() override { + count_++; + owner_ = std::this_thread::get_id(); + } + uint32_t count_; + std::thread::id owner_; }; -template -class PosixFdHandle : public T { +template <> +class PosixCondition : public PosixConditionBase { public: - explicit PosixFdHandle(intptr_t handle) : handle_(handle) {} - ~PosixFdHandle() override { - close(handle_); - handle_ = 0; + explicit PosixCondition(bool manual_reset) + : callback_(), + timer_(nullptr), + signal_(false), + manual_reset_(manual_reset) {} + + virtual ~PosixCondition() { Cancel(); } + + bool Signal() override { + CompletionRoutine(); + return true; } - protected: - void* native_handle() const override { - return reinterpret_cast(handle_); - } + // TODO(bwrsandman): due_times of under 1ms deadlock under travis + bool Set(std::chrono::nanoseconds due_time, std::chrono::milliseconds period, + std::function opt_callback = nullptr) { + std::lock_guard lock(mutex_); - intptr_t handle_; -}; + callback_ = std::move(opt_callback); + signal_ = false; -// TODO(dougvj) -WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, - std::chrono::milliseconds timeout) { - intptr_t handle = reinterpret_cast(wait_handle->native_handle()); - - fd_set set; - struct timeval time_val; - int ret; - - FD_ZERO(&set); - FD_SET(handle, &set); - - time_val.tv_sec = timeout.count() / 1000; - time_val.tv_usec = timeout.count() * 1000; - ret = select(handle + 1, &set, NULL, NULL, &time_val); - if (ret == -1) { - return WaitResult::kFailed; - } else if (ret == 0) { - return WaitResult::kTimeout; - } else { - uint64_t buf = 0; - ret = read(handle, &buf, sizeof(buf)); - if (ret < 8) { - return WaitResult::kTimeout; + // Create timer + if (timer_ == nullptr) { + sigevent sev{}; + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = GetSystemSignal(SignalType::kTimer); + sev.sigev_value.sival_ptr = this; + if (timer_create(CLOCK_REALTIME, &sev, &timer_) == -1) return false; } - return WaitResult::kSuccess; + // Start timer + itimerspec its{}; + its.it_value = DurationToTimeSpec(due_time); + its.it_interval = DurationToTimeSpec(period); + return timer_settime(timer_, 0, &its, nullptr) == 0; } + + void CompletionRoutine() { + // As the callback may reset the timer, store local. + std::function callback; + { + std::lock_guard lock(mutex_); + // Store callback + if (callback_) callback = callback_; + signal_ = true; + if (manual_reset_) { + cond_.notify_all(); + } else { + cond_.notify_one(); + } + } + // Call callback + if (callback) callback(); + } + + bool Cancel() { + std::lock_guard lock(mutex_); + bool result = true; + if (timer_) { + result = timer_delete(timer_) == 0; + timer_ = nullptr; + } + return result; + } + + void* native_handle() const override { + return reinterpret_cast(timer_); + } + + private: + inline bool signaled() const override { return signal_; } + inline void post_execution() override { + if (!manual_reset_) { + signal_ = false; + } + } + std::function callback_; + timer_t timer_; + volatile bool signal_; + const bool manual_reset_; +}; + +struct ThreadStartData { + std::function start_routine; + bool create_suspended; + Thread* thread_obj; +}; + +template <> +class PosixCondition : public PosixConditionBase { + enum class State { + kUninitialized, + kRunning, + kSuspended, + kFinished, + }; + + public: + PosixCondition() + : thread_(0), + signaled_(false), + exit_code_(0), + state_(State::kUninitialized), + suspend_count_(0) {} + bool Initialize(Thread::CreationParameters params, + ThreadStartData* start_data) { + start_data->create_suspended = params.create_suspended; + pthread_attr_t attr; + if (pthread_attr_init(&attr) != 0) return false; + if (pthread_attr_setstacksize(&attr, params.stack_size) != 0) { + pthread_attr_destroy(&attr); + return false; + } + if (params.initial_priority != 0) { + sched_param sched{}; + sched.sched_priority = params.initial_priority + 1; + if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) { + pthread_attr_destroy(&attr); + return false; + } + if (pthread_attr_setschedparam(&attr, &sched) != 0) { + pthread_attr_destroy(&attr); + return false; + } + } + if (pthread_create(&thread_, &attr, ThreadStartRoutine, start_data) != 0) { + return false; + } + pthread_attr_destroy(&attr); + return true; + } + + /// Constructor for existing thread. This should only happen once called by + /// Thread::GetCurrentThread() on the main thread + explicit PosixCondition(pthread_t thread) + : thread_(thread), + signaled_(false), + exit_code_(0), + state_(State::kRunning) {} + + virtual ~PosixCondition() { + if (thread_ && !signaled_) { + if (pthread_cancel(thread_) != 0) { + assert_always(); + } + if (pthread_join(thread_, nullptr) != 0) { + assert_always(); + } + } + } + + bool Signal() override { return true; } + + std::string name() const { + WaitStarted(); + auto result = std::array{'\0'}; + std::unique_lock lock(state_mutex_); + if (state_ == State::kRunning) { + if (pthread_getname_np(thread_, result.data(), result.size() - 1) != 0) + assert_always(); + } + return std::string(result.data()); + } + + void set_name(const std::string& name) { + WaitStarted(); + std::unique_lock lock(state_mutex_); + if (state_ == State::kRunning) { + threading::set_name(static_cast(thread_), + name); + } + } + + uint32_t system_id() const { return static_cast(thread_); } + + uint64_t affinity_mask() { + WaitStarted(); + cpu_set_t cpu_set; + if (pthread_getaffinity_np(thread_, sizeof(cpu_set_t), &cpu_set) != 0) + assert_always(); + uint64_t result = 0; + auto cpu_count = std::min(CPU_SETSIZE, 64); + for (auto i = 0u; i < cpu_count; i++) { + auto set = CPU_ISSET(i, &cpu_set); + result |= set << i; + } + return result; + } + + void set_affinity_mask(uint64_t mask) { + WaitStarted(); + cpu_set_t cpu_set; + CPU_ZERO(&cpu_set); + for (auto i = 0u; i < 64; i++) { + if (mask & (1 << i)) { + CPU_SET(i, &cpu_set); + } + } + if (pthread_setaffinity_np(thread_, sizeof(cpu_set_t), &cpu_set) != 0) { + assert_always(); + } + } + + int priority() { + WaitStarted(); + int policy; + sched_param param{}; + int ret = pthread_getschedparam(thread_, &policy, ¶m); + if (ret != 0) { + return -1; + } + + return param.sched_priority; + } + + void set_priority(int new_priority) { + WaitStarted(); + sched_param param{}; + param.sched_priority = new_priority; + if (pthread_setschedparam(thread_, SCHED_FIFO, ¶m) != 0) + assert_always(); + } + + void QueueUserCallback(std::function callback) { + WaitStarted(); + std::unique_lock lock(callback_mutex_); + user_callback_ = std::move(callback); + sigval value{}; + value.sival_ptr = this; + pthread_sigqueue(thread_, GetSystemSignal(SignalType::kThreadUserCallback), + value); + } + + void CallUserCallback() { + std::unique_lock lock(callback_mutex_); + user_callback_(); + } + + bool Resume(uint32_t* out_previous_suspend_count = nullptr) { + if (out_previous_suspend_count) { + *out_previous_suspend_count = 0; + } + WaitStarted(); + std::unique_lock lock(state_mutex_); + if (state_ != State::kSuspended) return false; + if (out_previous_suspend_count) { + *out_previous_suspend_count = suspend_count_; + } + --suspend_count_; + state_signal_.notify_all(); + return true; + } + + bool Suspend(uint32_t* out_previous_suspend_count = nullptr) { + if (out_previous_suspend_count) { + *out_previous_suspend_count = 0; + } + WaitStarted(); + { + if (out_previous_suspend_count) { + *out_previous_suspend_count = suspend_count_; + } + state_ = State::kSuspended; + ++suspend_count_; + } + int result = + pthread_kill(thread_, GetSystemSignal(SignalType::kThreadSuspend)); + return result == 0; + } + + void Terminate(int exit_code) { + { + std::unique_lock lock(state_mutex_); + state_ = State::kFinished; + } + + std::lock_guard lock(mutex_); + + // Sometimes the thread can call terminate twice before stopping + if (thread_ == 0) return; + auto thread = thread_; + + exit_code_ = exit_code; + signaled_ = true; + cond_.notify_all(); + + if (pthread_cancel(thread) != 0) assert_always(); + } + + void WaitStarted() const { + std::unique_lock lock(state_mutex_); + state_signal_.wait(lock, + [this] { return state_ != State::kUninitialized; }); + } + + /// Set state to suspended and wait until it reset by another thread + void WaitSuspended() { + std::unique_lock lock(state_mutex_); + state_signal_.wait(lock, [this] { return suspend_count_ == 0; }); + state_ = State::kRunning; + } + + void* native_handle() const override { + return reinterpret_cast(thread_); + } + + private: + static void* ThreadStartRoutine(void* parameter); + inline bool signaled() const override { return signaled_; } + inline void post_execution() override { + if (thread_) { + pthread_join(thread_, nullptr); + thread_ = 0; + } + } + pthread_t thread_; + bool signaled_; + int exit_code_; + volatile State state_; + volatile uint32_t suspend_count_; + mutable std::mutex state_mutex_; + mutable std::mutex callback_mutex_; + mutable std::condition_variable state_signal_; + std::function user_callback_; +}; + +class PosixWaitHandle { + public: + virtual PosixConditionBase& condition() = 0; +}; + +// This wraps a condition object as our handle because posix has no single +// native handle for higher level concurrency constructs such as semaphores +template +class PosixConditionHandle : public T, public PosixWaitHandle { + public: + PosixConditionHandle() = default; + explicit PosixConditionHandle(bool); + explicit PosixConditionHandle(pthread_t thread); + PosixConditionHandle(bool manual_reset, bool initial_state); + PosixConditionHandle(uint32_t initial_count, uint32_t maximum_count); + ~PosixConditionHandle() override = default; + + PosixConditionBase& condition() override { return handle_; } + void* native_handle() const override { return handle_.native_handle(); } + + protected: + PosixCondition handle_; + friend PosixCondition; +}; + +template <> +PosixConditionHandle::PosixConditionHandle(uint32_t initial_count, + uint32_t maximum_count) + : handle_(initial_count, maximum_count) {} + +template <> +PosixConditionHandle::PosixConditionHandle(bool initial_owner) + : handle_(initial_owner) {} + +template <> +PosixConditionHandle::PosixConditionHandle(bool manual_reset) + : handle_(manual_reset) {} + +template <> +PosixConditionHandle::PosixConditionHandle(bool manual_reset, + bool initial_state) + : handle_(manual_reset, initial_state) {} + +template <> +PosixConditionHandle::PosixConditionHandle(pthread_t thread) + : handle_(thread) {} + +WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, + std::chrono::milliseconds timeout) { + auto posix_wait_handle = dynamic_cast(wait_handle); + if (posix_wait_handle == nullptr) { + return WaitResult::kFailed; + } + if (is_alertable) alertable_state_ = true; + auto result = posix_wait_handle->condition().Wait(timeout); + if (is_alertable) alertable_state_ = false; + return result; } -// TODO(dougvj) WaitResult SignalAndWait(WaitHandle* wait_handle_to_signal, WaitHandle* wait_handle_to_wait_on, bool is_alertable, std::chrono::milliseconds timeout) { - assert_always(); - return WaitResult::kFailed; + auto result = WaitResult::kFailed; + auto posix_wait_handle_to_signal = + dynamic_cast(wait_handle_to_signal); + auto posix_wait_handle_to_wait_on = + dynamic_cast(wait_handle_to_wait_on); + if (posix_wait_handle_to_signal == nullptr || + posix_wait_handle_to_wait_on == nullptr) { + return WaitResult::kFailed; + } + if (is_alertable) alertable_state_ = true; + if (posix_wait_handle_to_signal->condition().Signal()) { + result = posix_wait_handle_to_wait_on->condition().Wait(timeout); + } + if (is_alertable) alertable_state_ = false; + return result; } -// TODO(dougvj) std::pair WaitMultiple(WaitHandle* wait_handles[], size_t wait_handle_count, bool wait_all, bool is_alertable, std::chrono::milliseconds timeout) { - assert_always(); - return std::pair(WaitResult::kFailed, 0); + std::vector conditions; + conditions.reserve(wait_handle_count); + for (size_t i = 0u; i < wait_handle_count; ++i) { + auto handle = dynamic_cast(wait_handles[i]); + if (handle == nullptr) { + return std::make_pair(WaitResult::kFailed, 0); + } + conditions.push_back(&handle->condition()); + } + if (is_alertable) alertable_state_ = true; + auto result = PosixConditionBase::WaitMultiple(std::move(conditions), + wait_all, timeout); + if (is_alertable) alertable_state_ = false; + return result; } -// TODO(dougvj) -class PosixEvent : public PosixFdHandle { +class PosixEvent : public PosixConditionHandle { public: - PosixEvent(intptr_t fd) : PosixFdHandle(fd) {} + PosixEvent(bool manual_reset, bool initial_state) + : PosixConditionHandle(manual_reset, initial_state) {} ~PosixEvent() override = default; - void Set() override { - uint64_t buf = 1; - write(handle_, &buf, sizeof(buf)); + void Set() override { handle_.Signal(); } + void Reset() override { handle_.Reset(); } + void Pulse() override { + using namespace std::chrono_literals; + handle_.Signal(); + MaybeYield(); + Sleep(10us); + handle_.Reset(); } - void Reset() override { assert_always(); } - void Pulse() override { assert_always(); } - - private: - PosixCondition condition_; }; std::unique_ptr Event::CreateManualResetEvent(bool initial_state) { - // Linux's eventfd doesn't appear to support manual reset natively. - return nullptr; + return std::make_unique(true, initial_state); } std::unique_ptr Event::CreateAutoResetEvent(bool initial_state) { - int fd = eventfd(initial_state ? 1 : 0, EFD_CLOEXEC); - if (fd == -1) { - return nullptr; - } - - return std::make_unique(PosixEvent(fd)); + return std::make_unique(false, initial_state); } -// TODO(dougvj) class PosixSemaphore : public PosixConditionHandle { public: - PosixSemaphore(int initial_count, int maximum_count) { assert_always(); } + PosixSemaphore(int initial_count, int maximum_count) + : PosixConditionHandle(static_cast(initial_count), + static_cast(maximum_count)) {} ~PosixSemaphore() override = default; bool Release(int release_count, int* out_previous_count) override { - assert_always(); - return false; + if (release_count < 1) { + return false; + } + return handle_.Release(static_cast(release_count), + out_previous_count); } }; @@ -315,149 +845,219 @@ std::unique_ptr Semaphore::Create(int initial_count, return std::make_unique(initial_count, maximum_count); } -// TODO(dougvj) class PosixMutant : public PosixConditionHandle { public: - PosixMutant(bool initial_owner) { assert_always(); } - ~PosixMutant() = default; - bool Release() override { - assert_always(); - return false; - } + explicit PosixMutant(bool initial_owner) + : PosixConditionHandle(initial_owner) {} + ~PosixMutant() override = default; + bool Release() override { return handle_.Release(); } }; std::unique_ptr Mutant::Create(bool initial_owner) { return std::make_unique(initial_owner); } -// TODO(dougvj) class PosixTimer : public PosixConditionHandle { public: - PosixTimer(bool manual_reset) { assert_always(); } - ~PosixTimer() = default; + explicit PosixTimer(bool manual_reset) : PosixConditionHandle(manual_reset) {} + ~PosixTimer() override = default; bool SetOnce(std::chrono::nanoseconds due_time, std::function opt_callback) override { - assert_always(); - return false; + return handle_.Set(due_time, std::chrono::milliseconds::zero(), + std::move(opt_callback)); } bool SetRepeating(std::chrono::nanoseconds due_time, std::chrono::milliseconds period, std::function opt_callback) override { - assert_always(); - return false; - } - bool Cancel() override { - assert_always(); - return false; + return handle_.Set(due_time, period, std::move(opt_callback)); } + bool Cancel() override { return handle_.Cancel(); } }; std::unique_ptr Timer::CreateManualResetTimer() { + install_signal_handler(SignalType::kTimer); return std::make_unique(true); } std::unique_ptr Timer::CreateSynchronizationTimer() { + install_signal_handler(SignalType::kTimer); return std::make_unique(false); } -class PosixThread : public PosixThreadHandle { +class PosixThread : public PosixConditionHandle { public: - explicit PosixThread(pthread_t handle) : PosixThreadHandle(handle) {} - ~PosixThread() = default; + PosixThread() = default; + explicit PosixThread(pthread_t thread) : PosixConditionHandle(thread) {} + ~PosixThread() override = default; + + bool Initialize(CreationParameters params, + std::function start_routine) { + auto start_data = + new ThreadStartData({std::move(start_routine), false, this}); + return handle_.Initialize(params, start_data); + } + + std::string name() const override { + handle_.WaitStarted(); + auto result = Thread::name(); + if (result.empty()) { + result = handle_.name(); + } + return result; + } void set_name(std::string name) override { - pthread_setname_np(handle_, name.c_str()); - } - - uint32_t system_id() const override { return 0; } - - // TODO(DrChat) - uint64_t affinity_mask() override { return 0; } - void set_affinity_mask(uint64_t mask) override { assert_always(); } - - int priority() override { - int policy; - struct sched_param param; - int ret = pthread_getschedparam(handle_, &policy, ¶m); - if (ret != 0) { - return -1; + handle_.WaitStarted(); + Thread::set_name(name); + if (name.length() > 15) { + name = name.substr(0, 15); } - - return param.sched_priority; + handle_.set_name(name); } + uint32_t system_id() const override { return handle_.system_id(); } + + uint64_t affinity_mask() override { return handle_.affinity_mask(); } + void set_affinity_mask(uint64_t mask) override { + handle_.set_affinity_mask(mask); + } + + int priority() override { return handle_.priority(); } void set_priority(int new_priority) override { - struct sched_param param; - param.sched_priority = new_priority; - int ret = pthread_setschedparam(handle_, SCHED_FIFO, ¶m); + handle_.set_priority(new_priority); } - // TODO(DrChat) void QueueUserCallback(std::function callback) override { - assert_always(); + handle_.QueueUserCallback(std::move(callback)); } - bool Resume(uint32_t* out_new_suspend_count = nullptr) override { - assert_always(); - return false; + bool Resume(uint32_t* out_previous_suspend_count) override { + return handle_.Resume(out_previous_suspend_count); } - bool Suspend(uint32_t* out_previous_suspend_count = nullptr) override { - assert_always(); - return false; + bool Suspend(uint32_t* out_previous_suspend_count) override { + return handle_.Suspend(out_previous_suspend_count); } - void Terminate(int exit_code) override {} + void Terminate(int exit_code) override { handle_.Terminate(exit_code); } + + void WaitSuspended() { handle_.WaitSuspended(); } }; -thread_local std::unique_ptr current_thread_ = nullptr; +thread_local PosixThread* current_thread_ = nullptr; -struct ThreadStartData { - std::function start_routine; -}; -void* ThreadStartRoutine(void* parameter) { - current_thread_ = - std::unique_ptr(new PosixThread(::pthread_self())); +void* PosixCondition::ThreadStartRoutine(void* parameter) { + if (pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr) != 0) { + assert_always(); + } + threading::set_name(""); - auto start_data = reinterpret_cast(parameter); - start_data->start_routine(); + auto start_data = static_cast(parameter); + assert_not_null(start_data); + assert_not_null(start_data->thread_obj); + + auto thread = dynamic_cast(start_data->thread_obj); + auto start_routine = std::move(start_data->start_routine); + auto create_suspended = start_data->create_suspended; delete start_data; - return 0; + + current_thread_ = thread; + { + std::unique_lock lock(thread->handle_.state_mutex_); + thread->handle_.state_ = + create_suspended ? State::kSuspended : State::kRunning; + thread->handle_.state_signal_.notify_all(); + } + + if (create_suspended) { + std::unique_lock lock(thread->handle_.state_mutex_); + thread->handle_.suspend_count_ = 1; + thread->handle_.state_signal_.wait( + lock, [thread] { return thread->handle_.suspend_count_ == 0; }); + } + + start_routine(); + + { + std::unique_lock lock(thread->handle_.state_mutex_); + thread->handle_.state_ = State::kFinished; + } + + std::unique_lock lock(mutex_); + thread->handle_.exit_code_ = 0; + thread->handle_.signaled_ = true; + cond_.notify_all(); + + current_thread_ = nullptr; + return nullptr; } std::unique_ptr Thread::Create(CreationParameters params, std::function start_routine) { - auto start_data = new ThreadStartData({std::move(start_routine)}); - - assert_false(params.create_suspended); - pthread_t handle; - pthread_attr_t attr; - pthread_attr_init(&attr); - int ret = pthread_create(&handle, &attr, ThreadStartRoutine, start_data); - if (ret != 0) { - // TODO(benvanik): pass back? - auto last_error = errno; - XELOGE("Unable to pthread_create: %d", last_error); - delete start_data; - return nullptr; - } - - return std::unique_ptr(new PosixThread(handle)); + install_signal_handler(SignalType::kThreadSuspend); + install_signal_handler(SignalType::kThreadUserCallback); + auto thread = std::make_unique(); + if (!thread->Initialize(params, std::move(start_routine))) return nullptr; + assert_not_null(thread); + return thread; } Thread* Thread::GetCurrentThread() { if (current_thread_) { - return current_thread_.get(); + return current_thread_; } + // Should take this route only for threads not created by Thread::Create. + // The only thread not created by Thread::Create should be the main thread. pthread_t handle = pthread_self(); - current_thread_ = std::make_unique(handle); - return current_thread_.get(); + current_thread_ = new PosixThread(handle); + atexit([] { delete current_thread_; }); + + return current_thread_; } void Thread::Exit(int exit_code) { - pthread_exit(reinterpret_cast(exit_code)); + if (current_thread_) { + current_thread_->Terminate(exit_code); + // Sometimes the current thread keeps running after being cancelled. + // Prevent other calls from this thread from using current_thread_. + current_thread_ = nullptr; + } else { + // Should only happen with the main thread + pthread_exit(reinterpret_cast(exit_code)); + } +} + +static void signal_handler(int signal, siginfo_t* info, void* /*context*/) { + switch (GetSystemSignalType(signal)) { + case SignalType::kHighResolutionTimer: { + assert_not_null(info->si_value.sival_ptr); + auto callback = + *static_cast*>(info->si_value.sival_ptr); + callback(); + } break; + case SignalType::kTimer: { + assert_not_null(info->si_value.sival_ptr); + auto pTimer = + static_cast*>(info->si_value.sival_ptr); + pTimer->CompletionRoutine(); + } break; + case SignalType::kThreadSuspend: { + assert_not_null(current_thread_); + current_thread_->WaitSuspended(); + } break; + case SignalType::kThreadUserCallback: { + assert_not_null(info->si_value.sival_ptr); + auto p_thread = + static_cast*>(info->si_value.sival_ptr); + if (alertable_state_) { + p_thread->CallUserCallback(); + } + } break; + default: + assert_always(); + } } } // namespace threading diff --git a/src/xenia/base/threading_win.cc b/src/xenia/base/threading_win.cc index bbdd03624..510ea7310 100644 --- a/src/xenia/base/threading_win.cc +++ b/src/xenia/base/threading_win.cc @@ -378,16 +378,16 @@ class Win32Thread : public Win32Handle { QueueUserAPC(DispatchApc, handle_, reinterpret_cast(apc_data)); } - bool Resume(uint32_t* out_new_suspend_count = nullptr) override { - if (out_new_suspend_count) { - *out_new_suspend_count = 0; + bool Resume(uint32_t* out_previous_suspend_count = nullptr) override { + if (out_previous_suspend_count) { + *out_previous_suspend_count = 0; } DWORD result = ResumeThread(handle_); if (result == UINT_MAX) { return false; } - if (out_new_suspend_count) { - *out_new_suspend_count = result; + if (out_previous_suspend_count) { + *out_previous_suspend_count = result; } return true; } diff --git a/src/xenia/gpu/command_processor.cc b/src/xenia/gpu/command_processor.cc index ff7b00521..0017558c3 100644 --- a/src/xenia/gpu/command_processor.cc +++ b/src/xenia/gpu/command_processor.cc @@ -73,7 +73,7 @@ bool CommandProcessor::Initialize( WorkerThreadMain(); return 0; })); - worker_thread_->set_name("GraphicsSystem Command Processor"); + worker_thread_->set_name("GPU Commands"); worker_thread_->Create(); return true; diff --git a/src/xenia/gpu/graphics_system.cc b/src/xenia/gpu/graphics_system.cc index 0e1c008d8..e74d80d35 100644 --- a/src/xenia/gpu/graphics_system.cc +++ b/src/xenia/gpu/graphics_system.cc @@ -129,7 +129,7 @@ X_STATUS GraphicsSystem::Setup(cpu::Processor* processor, })); // As we run vblank interrupts the debugger must be able to suspend us. vsync_worker_thread_->set_can_debugger_suspend(true); - vsync_worker_thread_->set_name("GraphicsSystem Vsync"); + vsync_worker_thread_->set_name("GPU VSync"); vsync_worker_thread_->Create(); if (cvars::trace_gpu_stream) { diff --git a/src/xenia/kernel/kernel_state.cc b/src/xenia/kernel/kernel_state.cc index 12923b9ae..c32f2d860 100644 --- a/src/xenia/kernel/kernel_state.cc +++ b/src/xenia/kernel/kernel_state.cc @@ -242,10 +242,7 @@ object_ref KernelState::LaunchModule(object_ref module) { module->entry_point(), 0, X_CREATE_SUSPENDED, true, true)); // We know this is the 'main thread'. - char thread_name[32]; - std::snprintf(thread_name, xe::countof(thread_name), "Main XThread%08X", - thread->handle()); - thread->set_name(thread_name); + thread->set_name("Main XThread"); X_STATUS result = thread->Create(); if (XFAILED(result)) { @@ -340,7 +337,7 @@ void KernelState::SetExecutableModule(object_ref module) { } return 0; })); - dispatch_thread_->set_name("Kernel Dispatch Thread"); + dispatch_thread_->set_name("Kernel Dispatch"); dispatch_thread_->Create(); } }