From 4397f253259eeda617c5b1314cfa2f8aa3d01cfa Mon Sep 17 00:00:00 2001 From: Sandy Carter Date: Fri, 11 Jan 2019 14:47:59 -0500 Subject: [PATCH] [threading linux] Implement suspendable pthreads Use real-time event interrupt to communicate suspend in timely manner. Use conditional_variable to implement suspend wait and resume trigger. Ignore real-time event 36 in .gdbinit which is used to signal suspend. Test suspending threads. --- .gdbinit | 2 + src/xenia/base/testing/threading_test.cc | 25 ++++++++++- src/xenia/base/threading_posix.cc | 55 +++++++++++++++++++----- 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/.gdbinit b/.gdbinit index 3aaf134d2..68d6baa21 100644 --- a/.gdbinit +++ b/.gdbinit @@ -4,3 +4,5 @@ handle SIG34 nostop noprint handle SIG35 nostop noprint # Ignore PosixThread exit event handle SIG32 nostop noprint +# Ignore PosixThread suspend event +handle SIG36 nostop noprint diff --git a/src/xenia/base/testing/threading_test.cc b/src/xenia/base/testing/threading_test.cc index be475d5b8..876579807 100644 --- a/src/xenia/base/testing/threading_test.cc +++ b/src/xenia/base/testing/threading_test.cc @@ -759,8 +759,29 @@ TEST_CASE("Create and Run Thread", "Thread") { } TEST_CASE("Test Suspending Thread", "Thread") { - // TODO(bwrsandman): Test suspension and resume - REQUIRE(true); + std::unique_ptr thread; + WaitResult result; + Thread::CreationParameters params = {}; + auto func = [] { Sleep(20ms); }; + + // Create initially suspended + params.create_suspended = true; + thread = threading::Thread::Create(params, func); + result = threading::Wait(thread.get(), false, 50ms); + REQUIRE(result == threading::WaitResult::kTimeout); + thread->Resume(); + result = threading::Wait(thread.get(), false, 50ms); + REQUIRE(result == threading::WaitResult::kSuccess); + params.create_suspended = false; + + // Create and then suspend + thread = threading::Thread::Create(params, func); + thread->Suspend(); + result = threading::Wait(thread.get(), false, 50ms); + REQUIRE(result == threading::WaitResult::kTimeout); + thread->Resume(); + result = threading::Wait(thread.get(), false, 50ms); + REQUIRE(result == threading::WaitResult::kSuccess); } TEST_CASE("Test Thread QueueUserCallback", "Thread") { diff --git a/src/xenia/base/threading_posix.cc b/src/xenia/base/threading_posix.cc index be0517fb8..558a39c5e 100644 --- a/src/xenia/base/threading_posix.cc +++ b/src/xenia/base/threading_posix.cc @@ -38,7 +38,7 @@ inline timespec DurationToTimeSpec( // This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread // gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop // lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false -enum class SignalType { kHighResolutionTimer, kTimer, k_Count }; +enum class SignalType { kHighResolutionTimer, kTimer, kThreadSuspend, k_Count }; int GetSystemSignal(SignalType num) { auto result = SIGRTMIN + static_cast(num); @@ -430,6 +430,7 @@ class PosixCondition : public PosixConditionBase { struct ThreadStartData { std::function start_routine; + bool create_suspended; Thread* thread_obj; }; @@ -438,6 +439,7 @@ class PosixCondition : public PosixConditionBase { enum class State { kUninitialized, kRunning, + kSuspended, kFinished, }; @@ -449,7 +451,7 @@ class PosixCondition : public PosixConditionBase { state_(State::kUninitialized) {} bool Initialize(Thread::CreationParameters params, ThreadStartData* start_data) { - assert_false(params.create_suspended); + start_data->create_suspended = params.create_suspended; pthread_attr_t attr; if (pthread_attr_init(&attr) != 0) return false; if (pthread_attr_setstacksize(&attr, params.stack_size) != 0) { @@ -570,15 +572,23 @@ class PosixCondition : public PosixConditionBase { } bool Resume(uint32_t* out_new_suspend_count = nullptr) { - // TODO(bwrsandman) - assert_always(); - return false; + // TODO(bwrsandman): implement suspend_count + assert_null(out_new_suspend_count); + WaitStarted(); + std::unique_lock lock(state_mutex_); + if (state_ != State::kSuspended) return false; + state_ = State::kRunning; + state_signal_.notify_all(); + return true; } bool Suspend(uint32_t* out_previous_suspend_count = nullptr) { - // TODO(bwrsandman) - assert_always(); - return false; + // TODO(bwrsandman): implement suspend_count + assert_null(out_previous_suspend_count); + WaitStarted(); + int result = + pthread_kill(thread_, GetSystemSignal(SignalType::kThreadSuspend)); + return result == 0; } void Terminate(int exit_code) { @@ -606,6 +616,13 @@ class PosixCondition : public PosixConditionBase { [this] { return state_ != State::kUninitialized; }); } + /// Set state to suspended and wait until it reset by another thread + void WaitSuspended() { + std::unique_lock lock(state_mutex_); + state_ = State::kSuspended; + state_signal_.wait(lock, [this] { return state_ != State::kSuspended; }); + } + private: static void* ThreadStartRoutine(void* parameter); inline bool signaled() const override { return signaled_; } @@ -618,7 +635,7 @@ class PosixCondition : public PosixConditionBase { pthread_t thread_; bool signaled_; int exit_code_; - State state_; + volatile State state_; mutable std::mutex state_mutex_; mutable std::condition_variable state_signal_; }; @@ -786,7 +803,8 @@ class PosixThread : public PosixConditionHandle { bool Initialize(CreationParameters params, std::function start_routine) { - auto start_data = new ThreadStartData({std::move(start_routine), this}); + auto start_data = + new ThreadStartData({std::move(start_routine), false, this}); return handle_.Initialize(params, start_data); } @@ -824,6 +842,8 @@ class PosixThread : public PosixConditionHandle { } void Terminate(int exit_code) override { handle_.Terminate(exit_code); } + + void WaitSuspended() { handle_.WaitSuspended(); } }; thread_local PosixThread* current_thread_ = nullptr; @@ -840,12 +860,20 @@ void* PosixCondition::ThreadStartRoutine(void* parameter) { auto thread = dynamic_cast(start_data->thread_obj); auto start_routine = std::move(start_data->start_routine); + auto create_suspended = start_data->create_suspended; delete start_data; current_thread_ = thread; { std::unique_lock lock(thread->handle_.state_mutex_); - thread->handle_.state_ = State::kRunning; + if (create_suspended) { + thread->handle_.state_ = State::kSuspended; + thread->handle_.state_signal_.wait(lock, [thread] { + return thread->handle_.state_ != State::kSuspended; + }); + } else { + thread->handle_.state_ = State::kRunning; + } thread->handle_.state_signal_.notify_all(); } @@ -867,6 +895,7 @@ void* PosixCondition::ThreadStartRoutine(void* parameter) { std::unique_ptr Thread::Create(CreationParameters params, std::function start_routine) { + install_signal_handler(SignalType::kThreadSuspend); auto thread = std::make_unique(); if (!thread->Initialize(params, std::move(start_routine))) return nullptr; assert_not_null(thread); @@ -914,6 +943,10 @@ static void signal_handler(int signal, siginfo_t* info, void* /*context*/) { static_cast*>(info->si_value.sival_ptr); pTimer->CompletionRoutine(); } break; + case SignalType::kThreadSuspend: { + assert_not_null(current_thread_); + current_thread_->WaitSuspended(); + } break; default: assert_always(); }