From b91203a0b557b2c25963d83be849eada958d2fbc Mon Sep 17 00:00:00 2001 From: Sandy Carter Date: Mon, 3 Dec 2018 14:28:11 -0800 Subject: [PATCH] [threading linux] Implement basic Thread function Add Basic Tests on Threads --- .gdbinit | 2 + src/xenia/base/testing/threading_test.cc | 82 +++++- src/xenia/base/threading_posix.cc | 303 +++++++++++++++++------ 3 files changed, 313 insertions(+), 74 deletions(-) diff --git a/.gdbinit b/.gdbinit index f54495075..3aaf134d2 100644 --- a/.gdbinit +++ b/.gdbinit @@ -2,3 +2,5 @@ handle SIG34 nostop noprint # Ignore PosixTimer custom event handle SIG35 nostop noprint +# Ignore PosixThread exit event +handle SIG32 nostop noprint diff --git a/src/xenia/base/testing/threading_test.cc b/src/xenia/base/testing/threading_test.cc index 9e4187165..be475d5b8 100644 --- a/src/xenia/base/testing/threading_test.cc +++ b/src/xenia/base/testing/threading_test.cc @@ -683,12 +683,90 @@ TEST_CASE("Set and Test Current Thread ID", "Thread") { } TEST_CASE("Set and Test Current Thread Name", "Thread") { + auto current_thread = Thread::GetCurrentThread(); + REQUIRE(current_thread); + auto old_thread_name = current_thread->name(); + std::string new_thread_name = "Threading Test"; - set_name(new_thread_name); + REQUIRE_NOTHROW(set_name(new_thread_name)); + + // Restore the old catch.hpp thread name + REQUIRE_NOTHROW(set_name(old_thread_name)); } TEST_CASE("Create and Run Thread", "Thread") { - // TODO(bwrsandman): + std::unique_ptr thread; + WaitResult result; + Thread::CreationParameters params = {}; + auto func = [] { Sleep(20ms); }; + + // Create most basic case of thread + thread = Thread::Create(params, func); + REQUIRE(thread->native_handle() != nullptr); + REQUIRE_NOTHROW(thread->affinity_mask()); + REQUIRE(thread->name().empty()); + result = Wait(thread.get(), false, 50ms); + REQUIRE(result == WaitResult::kSuccess); + + // Add thread name + std::string new_name = "Test thread name"; + thread = Thread::Create(params, func); + auto name = thread->name(); + INFO(name.c_str()); + REQUIRE(name.empty()); + thread->set_name(new_name); + REQUIRE(thread->name() == new_name); + result = Wait(thread.get(), false, 50ms); + REQUIRE(result == WaitResult::kSuccess); + + // Use Terminate to end an infinitely looping thread + thread = Thread::Create(params, [] { + while (true) { + Sleep(1ms); + } + }); + result = Wait(thread.get(), false, 50ms); + REQUIRE(result == WaitResult::kTimeout); + thread->Terminate(-1); + result = Wait(thread.get(), false, 50ms); + REQUIRE(result == WaitResult::kSuccess); + + // Call Exit from inside an infinitely looping thread + thread = Thread::Create(params, [] { + while (true) { + Thread::Exit(-1); + } + }); + result = Wait(thread.get(), false, 50ms); + REQUIRE(result == WaitResult::kSuccess); + + // Call timeout wait on self + result = Wait(Thread::GetCurrentThread(), false, 50ms); + REQUIRE(result == WaitResult::kTimeout); + + params.stack_size = 16 * 1024; + thread = Thread::Create(params, [] { + while (true) { + Thread::Exit(-1); + } + }); + REQUIRE(thread != nullptr); + result = Wait(thread.get(), false, 50ms); + REQUIRE(result == WaitResult::kSuccess); + + // TODO(bwrsandman): Test with different priorities + // TODO(bwrsandman): Test setting and getting thread affinity +} + +TEST_CASE("Test Suspending Thread", "Thread") { + // TODO(bwrsandman): Test suspension and resume + REQUIRE(true); +} + +TEST_CASE("Test Thread QueueUserCallback", "Thread") { + // TODO(bwrsandman): Test Exit command with QueueUserCallback + // TODO(bwrsandman): Test alertable wait returning kUserCallback by using IO + // callbacks. REQUIRE(true); } diff --git a/src/xenia/base/threading_posix.cc b/src/xenia/base/threading_posix.cc index 212286b1e..65000203b 100644 --- a/src/xenia/base/threading_posix.cc +++ b/src/xenia/base/threading_posix.cc @@ -20,6 +20,7 @@ #include #include #include +#include namespace xe { namespace threading { @@ -427,19 +428,160 @@ class PosixCondition : public PosixConditionBase { const bool manual_reset_; }; -// Native posix thread handle -template -class PosixThreadHandle : public T { - public: - explicit PosixThreadHandle(pthread_t handle) : handle_(handle) {} - ~PosixThreadHandle() override {} +struct ThreadStartData { + std::function start_routine; + Thread* thread_obj; +}; - protected: - void* native_handle() const override { - return reinterpret_cast(handle_); +template <> +class PosixCondition : public PosixConditionBase { + public: + PosixCondition() : thread_(0), signaled_(false), exit_code_(0) {} + bool Initialize(Thread::CreationParameters params, + ThreadStartData* start_data) { + assert_false(params.create_suspended); + pthread_attr_t attr; + if (pthread_attr_init(&attr) != 0) return false; + if (pthread_attr_setstacksize(&attr, params.stack_size) != 0) { + pthread_attr_destroy(&attr); + return false; + } + if (params.initial_priority != 0) { + sched_param sched{}; + sched.sched_priority = params.initial_priority + 1; + if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) { + pthread_attr_destroy(&attr); + return false; + } + if (pthread_attr_setschedparam(&attr, &sched) != 0) { + pthread_attr_destroy(&attr); + return false; + } + } + if (pthread_create(&thread_, &attr, ThreadStartRoutine, start_data) != 0) { + return false; + } + pthread_attr_destroy(&attr); + return true; } - pthread_t handle_; + /// Constructor for existing thread. This should only happen once called by + /// Thread::GetCurrentThread() on the main thread + explicit PosixCondition(pthread_t thread) + : thread_(thread), signaled_(false), exit_code_(0) {} + + virtual ~PosixCondition() { + if (thread_ && !signaled_) { + if (pthread_cancel(thread_) != 0) { + assert_always(); + } + if (pthread_join(thread_, nullptr) != 0) { + assert_always(); + } + } + } + + std::string name() const { + auto result = std::array{'\0'}; + if (pthread_getname_np(thread_, result.data(), result.size() - 1) != 0) + assert_always(); + return std::string(result.data()); + } + + void set_name(const std::string& name) { + threading::set_name(static_cast(thread_), + name); + } + + uint32_t system_id() const { return static_cast(thread_); } + + uint64_t affinity_mask() { + cpu_set_t cpu_set; + if (pthread_getaffinity_np(thread_, sizeof(cpu_set_t), &cpu_set) != 0) + assert_always(); + uint64_t result = 0; + auto cpu_count = std::min(CPU_SETSIZE, 64); + for (auto i = 0u; i < cpu_count; i++) { + auto set = CPU_ISSET(i, &cpu_set); + result |= set << i; + } + return result; + } + + void set_affinity_mask(uint64_t mask) { + cpu_set_t cpu_set; + CPU_ZERO(&cpu_set); + for (auto i = 0u; i < 64; i++) { + if (mask & (1 << i)) { + CPU_SET(i, &cpu_set); + } + } + if (pthread_setaffinity_np(thread_, sizeof(cpu_set_t), &cpu_set) != 0) { + assert_always(); + } + } + + int priority() { + int policy; + sched_param param{}; + int ret = pthread_getschedparam(thread_, &policy, ¶m); + if (ret != 0) { + return -1; + } + + return param.sched_priority; + } + + void set_priority(int new_priority) { + sched_param param{}; + param.sched_priority = new_priority; + if (pthread_setschedparam(thread_, SCHED_FIFO, ¶m) != 0) + assert_always(); + } + + void QueueUserCallback(std::function callback) { + // TODO(bwrsandman) + assert_always(); + } + + bool Resume(uint32_t* out_new_suspend_count = nullptr) { + // TODO(bwrsandman) + assert_always(); + return false; + } + + bool Suspend(uint32_t* out_previous_suspend_count = nullptr) { + // TODO(bwrsandman) + assert_always(); + return false; + } + + void Terminate(int exit_code) { + std::lock_guard lock(mutex_); + + // Sometimes the thread can call terminate twice before stopping + if (thread_ == 0) return; + auto thread = thread_; + + exit_code_ = exit_code; + signaled_ = true; + cond_.notify_all(); + + if (pthread_cancel(thread) != 0) assert_always(); + } + + private: + static void* ThreadStartRoutine(void* parameter); + inline bool signaled() const override { return signaled_; } + inline void post_execution() override { + if (thread_) { + pthread_join(thread_, nullptr); + thread_ = 0; + } + } + pthread_t thread_; + bool signaled_; + int exit_code_; }; // This wraps a condition object as our handle because posix has no single @@ -447,7 +589,9 @@ class PosixThreadHandle : public T { template class PosixConditionHandle : public T { public: + PosixConditionHandle() = default; explicit PosixConditionHandle(bool); + explicit PosixConditionHandle(pthread_t thread); PosixConditionHandle(bool manual_reset, bool initial_state); PosixConditionHandle(uint32_t initial_count, uint32_t maximum_count); ~PosixConditionHandle() override = default; @@ -458,6 +602,7 @@ class PosixConditionHandle : public T { } PosixCondition handle_; + friend PosixCondition; }; template <> @@ -478,6 +623,10 @@ PosixConditionHandle::PosixConditionHandle(bool manual_reset, bool initial_state) : handle_(manual_reset, initial_state) {} +template <> +PosixConditionHandle::PosixConditionHandle(pthread_t thread) + : handle_(thread) {} + WaitResult Wait(WaitHandle* wait_handle, bool is_alertable, std::chrono::milliseconds timeout) { auto handle = @@ -590,104 +739,114 @@ std::unique_ptr Timer::CreateSynchronizationTimer() { return std::make_unique(false); } -class PosixThread : public PosixThreadHandle { +class PosixThread : public PosixConditionHandle { public: - explicit PosixThread(pthread_t handle) : PosixThreadHandle(handle) {} - ~PosixThread() = default; + PosixThread() = default; + explicit PosixThread(pthread_t thread) : PosixConditionHandle(thread) {} + ~PosixThread() override = default; + + bool Initialize(CreationParameters params, + std::function start_routine) { + auto start_data = new ThreadStartData({std::move(start_routine), this}); + return handle_.Initialize(params, start_data); + } void set_name(std::string name) override { - pthread_setname_np(handle_, name.c_str()); - } - - uint32_t system_id() const override { return 0; } - - // TODO(DrChat) - uint64_t affinity_mask() override { return 0; } - void set_affinity_mask(uint64_t mask) override { assert_always(); } - - int priority() override { - int policy; - struct sched_param param; - int ret = pthread_getschedparam(handle_, &policy, ¶m); - if (ret != 0) { - return -1; + Thread::set_name(name); + if (name.length() > 15) { + name = name.substr(0, 15); } - - return param.sched_priority; + handle_.set_name(name); } + uint32_t system_id() const override { return handle_.system_id(); } + + uint64_t affinity_mask() override { return handle_.affinity_mask(); } + void set_affinity_mask(uint64_t mask) override { + handle_.set_affinity_mask(mask); + } + + int priority() override { return handle_.priority(); } void set_priority(int new_priority) override { - struct sched_param param; - param.sched_priority = new_priority; - int ret = pthread_setschedparam(handle_, SCHED_FIFO, ¶m); + handle_.set_priority(new_priority); } - // TODO(DrChat) void QueueUserCallback(std::function callback) override { - assert_always(); + handle_.QueueUserCallback(std::move(callback)); } - bool Resume(uint32_t* out_new_suspend_count = nullptr) override { - assert_always(); - return false; + bool Resume(uint32_t* out_new_suspend_count) override { + return handle_.Resume(out_new_suspend_count); } - bool Suspend(uint32_t* out_previous_suspend_count = nullptr) override { - assert_always(); - return false; + bool Suspend(uint32_t* out_previous_suspend_count) override { + return handle_.Suspend(out_previous_suspend_count); } - void Terminate(int exit_code) override {} + void Terminate(int exit_code) override { handle_.Terminate(exit_code); } }; -thread_local std::unique_ptr current_thread_ = nullptr; +thread_local PosixThread* current_thread_ = nullptr; -struct ThreadStartData { - std::function start_routine; -}; -void* ThreadStartRoutine(void* parameter) { - current_thread_ = - std::unique_ptr(new PosixThread(::pthread_self())); +void* PosixCondition::ThreadStartRoutine(void* parameter) { + if (pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr) != 0) { + assert_always(); + } + threading::set_name(""); - auto start_data = reinterpret_cast(parameter); - start_data->start_routine(); + auto start_data = static_cast(parameter); + assert_not_null(start_data); + assert_not_null(start_data->thread_obj); + + auto thread = dynamic_cast(start_data->thread_obj); + auto start_routine = std::move(start_data->start_routine); delete start_data; - return 0; + + current_thread_ = thread; + start_routine(); + + std::unique_lock lock(mutex_); + thread->handle_.exit_code_ = 0; + thread->handle_.signaled_ = true; + cond_.notify_all(); + + current_thread_ = nullptr; + return nullptr; } std::unique_ptr Thread::Create(CreationParameters params, std::function start_routine) { - auto start_data = new ThreadStartData({std::move(start_routine)}); - - assert_false(params.create_suspended); - pthread_t handle; - pthread_attr_t attr; - pthread_attr_init(&attr); - int ret = pthread_create(&handle, &attr, ThreadStartRoutine, start_data); - if (ret != 0) { - // TODO(benvanik): pass back? - auto last_error = errno; - XELOGE("Unable to pthread_create: {}", last_error); - delete start_data; - return nullptr; - } - - return std::unique_ptr(new PosixThread(handle)); + auto thread = std::make_unique(); + if (!thread->Initialize(params, std::move(start_routine))) return nullptr; + assert_not_null(thread); + return thread; } Thread* Thread::GetCurrentThread() { if (current_thread_) { - return current_thread_.get(); + return current_thread_; } + // Should take this route only for threads not created by Thread::Create. + // The only thread not created by Thread::Create should be the main thread. pthread_t handle = pthread_self(); - current_thread_ = std::make_unique(handle); - return current_thread_.get(); + current_thread_ = new PosixThread(handle); + atexit([] { delete current_thread_; }); + + return current_thread_; } void Thread::Exit(int exit_code) { - pthread_exit(reinterpret_cast(exit_code)); + if (current_thread_) { + current_thread_->Terminate(exit_code); + // Sometimes the current thread keeps running after being cancelled. + // Prevent other calls from this thread from using current_thread_. + current_thread_ = nullptr; + } else { + // Should only happen with the main thread + pthread_exit(reinterpret_cast(exit_code)); + } } static void signal_handler(int signal, siginfo_t* info, void* /*context*/) {