From 634f87f63b591c56883394fb6e388c7d41ded96e Mon Sep 17 00:00:00 2001 From: Sandy Carter Date: Sun, 11 Mar 2018 14:48:55 -0400 Subject: [PATCH] [threading linux] Implement Callback Queuing Add thread local bool for alertable state. Use real-time event interrupt to run callback. Fix sleep duration from miliseconds (microseconds / 1000) to seconds in sleep command. Add note for future implementation. Ignore real-time event 37 in .gdbinit which is used to signal callback. Test AlertableSleep Test Thread QueueUserCallback. TODO: Test alerted wait result when using IO functions. --- .gdbinit | 2 + src/xenia/base/testing/threading_test.cc | 82 ++++++++++++++++++++++-- src/xenia/base/threading_posix.cc | 58 ++++++++++++++--- 3 files changed, 128 insertions(+), 14 deletions(-) diff --git a/.gdbinit b/.gdbinit index 68d6baa21..09b4af30f 100644 --- a/.gdbinit +++ b/.gdbinit @@ -6,3 +6,5 @@ handle SIG35 nostop noprint handle SIG32 nostop noprint # Ignore PosixThread suspend event handle SIG36 nostop noprint +# Ignore PosixThread user callback event +handle SIG37 nostop noprint diff --git a/src/xenia/base/testing/threading_test.cc b/src/xenia/base/testing/threading_test.cc index 876579807..03d58111c 100644 --- a/src/xenia/base/testing/threading_test.cc +++ b/src/xenia/base/testing/threading_test.cc @@ -101,8 +101,15 @@ TEST_CASE("Sleep Current Thread", "Sleep") { } TEST_CASE("Sleep Current Thread in Alertable State", "Sleep") { - // TODO(bwrsandman): - REQUIRE(true); + auto wait_time = 50ms; + auto start = std::chrono::steady_clock::now(); + auto result = threading::AlertableSleep(wait_time); + auto duration = std::chrono::steady_clock::now() - start; + REQUIRE(duration >= wait_time); + REQUIRE(result == threading::SleepResult::kSuccess); + + // TODO(bwrsandman): Test a Thread to return kAlerted. + // Need callback to call extended I/O function (ReadFileEx or WriteFileEx) } TEST_CASE("TlsHandle") { @@ -785,10 +792,77 @@ TEST_CASE("Test Suspending Thread", "Thread") { } TEST_CASE("Test Thread QueueUserCallback", "Thread") { - // TODO(bwrsandman): Test Exit command with QueueUserCallback + std::unique_ptr thread; + WaitResult result; + Thread::CreationParameters params = {}; + std::atomic_int order; + int is_modified; + int has_finished; + auto callback = [&is_modified, &order] { + is_modified = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }; + + // Without alertable + order = 0; + is_modified = -1; + has_finished = -1; + thread = Thread::Create(params, [&has_finished, &order] { + // Not using Alertable so callback is not registered + Sleep(90ms); + has_finished = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }); + result = Wait(thread.get(), true, 50ms); + REQUIRE(result == WaitResult::kTimeout); + REQUIRE(is_modified == -1); + thread->QueueUserCallback(callback); + result = Wait(thread.get(), true, 100ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(is_modified == -1); + REQUIRE(has_finished == 0); + + // With alertable + order = 0; + is_modified = -1; + has_finished = -1; + thread = Thread::Create(params, [&has_finished, &order] { + // Using Alertable so callback is registered + AlertableSleep(90ms); + has_finished = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }); + result = Wait(thread.get(), true, 50ms); + REQUIRE(result == WaitResult::kTimeout); + REQUIRE(is_modified == -1); + thread->QueueUserCallback(callback); + result = Wait(thread.get(), true, 100ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(is_modified == 0); + REQUIRE(has_finished == 1); + + // Test Exit command with QueueUserCallback + order = 0; + is_modified = -1; + has_finished = -1; + thread = Thread::Create(params, [&is_modified, &has_finished, &order] { + is_modified = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + // Using Alertable so callback is registered + AlertableSleep(200ms); + has_finished = std::atomic_fetch_add_explicit( + &order, 1, std::memory_order::memory_order_relaxed); + }); + result = Wait(thread.get(), true, 100ms); + REQUIRE(result == WaitResult::kTimeout); + thread->QueueUserCallback([] { Thread::Exit(0); }); + result = Wait(thread.get(), true, 500ms); + REQUIRE(result == WaitResult::kSuccess); + REQUIRE(is_modified == 0); + REQUIRE(has_finished == -1); + // TODO(bwrsandman): Test alertable wait returning kUserCallback by using IO // callbacks. - REQUIRE(true); } } // namespace test diff --git a/src/xenia/base/threading_posix.cc b/src/xenia/base/threading_posix.cc index 558a39c5e..29580eb20 100644 --- a/src/xenia/base/threading_posix.cc +++ b/src/xenia/base/threading_posix.cc @@ -38,7 +38,13 @@ inline timespec DurationToTimeSpec( // This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread // gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop // lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false -enum class SignalType { kHighResolutionTimer, kTimer, kThreadSuspend, k_Count }; +enum class SignalType { + kHighResolutionTimer, + kTimer, + kThreadSuspend, + kThreadUserCallback, + k_Count +}; int GetSystemSignal(SignalType num) { auto result = SIGRTMIN + static_cast(num); @@ -102,9 +108,12 @@ void Sleep(std::chrono::microseconds duration) { } while (ret == -1 && errno == EINTR); } -// TODO(dougvj) Not sure how to implement the equivalent of this on POSIX. +// TODO(bwrsandman) Implement by allowing alert interrupts from IO operations +thread_local bool alertable_state_ = false; SleepResult AlertableSleep(std::chrono::microseconds duration) { - sleep(duration.count() / 1000); + alertable_state_ = true; + Sleep(duration); + alertable_state_ = false; return SleepResult::kSuccess; } @@ -567,8 +576,18 @@ class PosixCondition : public PosixConditionBase { } void QueueUserCallback(std::function callback) { - // TODO(bwrsandman) - assert_always(); + WaitStarted(); + std::unique_lock lock(callback_mutex_); + user_callback_ = std::move(callback); + sigval value{}; + value.sival_ptr = this; + pthread_sigqueue(thread_, GetSystemSignal(SignalType::kThreadUserCallback), + value); + } + + void CallUserCallback() { + std::unique_lock lock(callback_mutex_); + user_callback_(); } bool Resume(uint32_t* out_new_suspend_count = nullptr) { @@ -637,7 +656,9 @@ class PosixCondition : public PosixConditionBase { int exit_code_; volatile State state_; mutable std::mutex state_mutex_; + mutable std::mutex callback_mutex_; mutable std::condition_variable state_signal_; + std::function user_callback_; }; // This wraps a condition object as our handle because posix has no single @@ -687,7 +708,10 @@ WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, std::chrono::milliseconds timeout) { auto handle = reinterpret_cast(wait_handle->native_handle()); - return handle->Wait(timeout); + if (is_alertable) alertable_state_ = true; + auto result = handle->Wait(timeout); + if (is_alertable) alertable_state_ = false; + return result; } // TODO(dougvj) @@ -695,10 +719,12 @@ WaitResult SignalAndWait(WaitHandle* wait_handle_to_signal, WaitHandle* wait_handle_to_wait_on, bool is_alertable, std::chrono::milliseconds timeout) { assert_always(); - return WaitResult::kFailed; + if (is_alertable) alertable_state_ = true; + auto result = WaitResult::kFailed; + if (is_alertable) alertable_state_ = false; + return result; } -// TODO(bwrsandman): Add support for is_alertable std::pair WaitMultiple(WaitHandle* wait_handles[], size_t wait_handle_count, bool wait_all, bool is_alertable, @@ -708,8 +734,11 @@ std::pair WaitMultiple(WaitHandle* wait_handles[], handles[i] = reinterpret_cast(wait_handles[i]->native_handle()); } - return PosixConditionBase::WaitMultiple(std::move(handles), wait_all, - timeout); + if (is_alertable) alertable_state_ = true; + auto result = + PosixConditionBase::WaitMultiple(std::move(handles), wait_all, timeout); + if (is_alertable) alertable_state_ = false; + return result; } class PosixEvent : public PosixConditionHandle { @@ -896,6 +925,7 @@ void* PosixCondition::ThreadStartRoutine(void* parameter) { std::unique_ptr Thread::Create(CreationParameters params, std::function start_routine) { install_signal_handler(SignalType::kThreadSuspend); + install_signal_handler(SignalType::kThreadUserCallback); auto thread = std::make_unique(); if (!thread->Initialize(params, std::move(start_routine))) return nullptr; assert_not_null(thread); @@ -947,6 +977,14 @@ static void signal_handler(int signal, siginfo_t* info, void* /*context*/) { assert_not_null(current_thread_); current_thread_->WaitSuspended(); } break; + case SignalType::kThreadUserCallback: { + assert_not_null(info->si_value.sival_ptr); + auto p_thread = + static_cast*>(info->si_value.sival_ptr); + if (alertable_state_) { + p_thread->CallUserCallback(); + } + } break; default: assert_always(); }