[threading linux] Implement basic Thread function

Add Basic Tests on Threads
This commit is contained in:
Sandy Carter 2018-12-03 14:28:11 -08:00 committed by Rick Gibbed
parent c2de074d5c
commit b91203a0b5
3 changed files with 313 additions and 74 deletions

View File

@ -2,3 +2,5 @@
handle SIG34 nostop noprint
# Ignore PosixTimer custom event
handle SIG35 nostop noprint
# Ignore PosixThread exit event
handle SIG32 nostop noprint

View File

@ -683,12 +683,90 @@ TEST_CASE("Set and Test Current Thread ID", "Thread") {
}
TEST_CASE("Set and Test Current Thread Name", "Thread") {
auto current_thread = Thread::GetCurrentThread();
REQUIRE(current_thread);
auto old_thread_name = current_thread->name();
std::string new_thread_name = "Threading Test";
set_name(new_thread_name);
REQUIRE_NOTHROW(set_name(new_thread_name));
// Restore the old catch.hpp thread name
REQUIRE_NOTHROW(set_name(old_thread_name));
}
TEST_CASE("Create and Run Thread", "Thread") {
// TODO(bwrsandman):
std::unique_ptr<Thread> thread;
WaitResult result;
Thread::CreationParameters params = {};
auto func = [] { Sleep(20ms); };
// Create most basic case of thread
thread = Thread::Create(params, func);
REQUIRE(thread->native_handle() != nullptr);
REQUIRE_NOTHROW(thread->affinity_mask());
REQUIRE(thread->name().empty());
result = Wait(thread.get(), false, 50ms);
REQUIRE(result == WaitResult::kSuccess);
// Add thread name
std::string new_name = "Test thread name";
thread = Thread::Create(params, func);
auto name = thread->name();
INFO(name.c_str());
REQUIRE(name.empty());
thread->set_name(new_name);
REQUIRE(thread->name() == new_name);
result = Wait(thread.get(), false, 50ms);
REQUIRE(result == WaitResult::kSuccess);
// Use Terminate to end an infinitely looping thread
thread = Thread::Create(params, [] {
while (true) {
Sleep(1ms);
}
});
result = Wait(thread.get(), false, 50ms);
REQUIRE(result == WaitResult::kTimeout);
thread->Terminate(-1);
result = Wait(thread.get(), false, 50ms);
REQUIRE(result == WaitResult::kSuccess);
// Call Exit from inside an infinitely looping thread
thread = Thread::Create(params, [] {
while (true) {
Thread::Exit(-1);
}
});
result = Wait(thread.get(), false, 50ms);
REQUIRE(result == WaitResult::kSuccess);
// Call timeout wait on self
result = Wait(Thread::GetCurrentThread(), false, 50ms);
REQUIRE(result == WaitResult::kTimeout);
params.stack_size = 16 * 1024;
thread = Thread::Create(params, [] {
while (true) {
Thread::Exit(-1);
}
});
REQUIRE(thread != nullptr);
result = Wait(thread.get(), false, 50ms);
REQUIRE(result == WaitResult::kSuccess);
// TODO(bwrsandman): Test with different priorities
// TODO(bwrsandman): Test setting and getting thread affinity
}
TEST_CASE("Test Suspending Thread", "Thread") {
// TODO(bwrsandman): Test suspension and resume
REQUIRE(true);
}
TEST_CASE("Test Thread QueueUserCallback", "Thread") {
// TODO(bwrsandman): Test Exit command with QueueUserCallback
// TODO(bwrsandman): Test alertable wait returning kUserCallback by using IO
// callbacks.
REQUIRE(true);
}

View File

@ -20,6 +20,7 @@
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <memory>
namespace xe {
namespace threading {
@ -427,19 +428,160 @@ class PosixCondition<Timer> : public PosixConditionBase {
const bool manual_reset_;
};
// Native posix thread handle
template <typename T>
class PosixThreadHandle : public T {
public:
explicit PosixThreadHandle(pthread_t handle) : handle_(handle) {}
~PosixThreadHandle() override {}
struct ThreadStartData {
std::function<void()> start_routine;
Thread* thread_obj;
};
protected:
void* native_handle() const override {
return reinterpret_cast<void*>(handle_);
template <>
class PosixCondition<Thread> : public PosixConditionBase {
public:
PosixCondition() : thread_(0), signaled_(false), exit_code_(0) {}
bool Initialize(Thread::CreationParameters params,
ThreadStartData* start_data) {
assert_false(params.create_suspended);
pthread_attr_t attr;
if (pthread_attr_init(&attr) != 0) return false;
if (pthread_attr_setstacksize(&attr, params.stack_size) != 0) {
pthread_attr_destroy(&attr);
return false;
}
if (params.initial_priority != 0) {
sched_param sched{};
sched.sched_priority = params.initial_priority + 1;
if (pthread_attr_setschedpolicy(&attr, SCHED_FIFO) != 0) {
pthread_attr_destroy(&attr);
return false;
}
if (pthread_attr_setschedparam(&attr, &sched) != 0) {
pthread_attr_destroy(&attr);
return false;
}
}
if (pthread_create(&thread_, &attr, ThreadStartRoutine, start_data) != 0) {
return false;
}
pthread_attr_destroy(&attr);
return true;
}
pthread_t handle_;
/// Constructor for existing thread. This should only happen once called by
/// Thread::GetCurrentThread() on the main thread
explicit PosixCondition(pthread_t thread)
: thread_(thread), signaled_(false), exit_code_(0) {}
virtual ~PosixCondition() {
if (thread_ && !signaled_) {
if (pthread_cancel(thread_) != 0) {
assert_always();
}
if (pthread_join(thread_, nullptr) != 0) {
assert_always();
}
}
}
std::string name() const {
auto result = std::array<char, 17>{'\0'};
if (pthread_getname_np(thread_, result.data(), result.size() - 1) != 0)
assert_always();
return std::string(result.data());
}
void set_name(const std::string& name) {
threading::set_name(static_cast<std::thread::native_handle_type>(thread_),
name);
}
uint32_t system_id() const { return static_cast<uint32_t>(thread_); }
uint64_t affinity_mask() {
cpu_set_t cpu_set;
if (pthread_getaffinity_np(thread_, sizeof(cpu_set_t), &cpu_set) != 0)
assert_always();
uint64_t result = 0;
auto cpu_count = std::min(CPU_SETSIZE, 64);
for (auto i = 0u; i < cpu_count; i++) {
auto set = CPU_ISSET(i, &cpu_set);
result |= set << i;
}
return result;
}
void set_affinity_mask(uint64_t mask) {
cpu_set_t cpu_set;
CPU_ZERO(&cpu_set);
for (auto i = 0u; i < 64; i++) {
if (mask & (1 << i)) {
CPU_SET(i, &cpu_set);
}
}
if (pthread_setaffinity_np(thread_, sizeof(cpu_set_t), &cpu_set) != 0) {
assert_always();
}
}
int priority() {
int policy;
sched_param param{};
int ret = pthread_getschedparam(thread_, &policy, &param);
if (ret != 0) {
return -1;
}
return param.sched_priority;
}
void set_priority(int new_priority) {
sched_param param{};
param.sched_priority = new_priority;
if (pthread_setschedparam(thread_, SCHED_FIFO, &param) != 0)
assert_always();
}
void QueueUserCallback(std::function<void()> callback) {
// TODO(bwrsandman)
assert_always();
}
bool Resume(uint32_t* out_new_suspend_count = nullptr) {
// TODO(bwrsandman)
assert_always();
return false;
}
bool Suspend(uint32_t* out_previous_suspend_count = nullptr) {
// TODO(bwrsandman)
assert_always();
return false;
}
void Terminate(int exit_code) {
std::lock_guard<std::mutex> lock(mutex_);
// Sometimes the thread can call terminate twice before stopping
if (thread_ == 0) return;
auto thread = thread_;
exit_code_ = exit_code;
signaled_ = true;
cond_.notify_all();
if (pthread_cancel(thread) != 0) assert_always();
}
private:
static void* ThreadStartRoutine(void* parameter);
inline bool signaled() const override { return signaled_; }
inline void post_execution() override {
if (thread_) {
pthread_join(thread_, nullptr);
thread_ = 0;
}
}
pthread_t thread_;
bool signaled_;
int exit_code_;
};
// This wraps a condition object as our handle because posix has no single
@ -447,7 +589,9 @@ class PosixThreadHandle : public T {
template <typename T>
class PosixConditionHandle : public T {
public:
PosixConditionHandle() = default;
explicit PosixConditionHandle(bool);
explicit PosixConditionHandle(pthread_t thread);
PosixConditionHandle(bool manual_reset, bool initial_state);
PosixConditionHandle(uint32_t initial_count, uint32_t maximum_count);
~PosixConditionHandle() override = default;
@ -458,6 +602,7 @@ class PosixConditionHandle : public T {
}
PosixCondition<T> handle_;
friend PosixCondition<T>;
};
template <>
@ -478,6 +623,10 @@ PosixConditionHandle<Event>::PosixConditionHandle(bool manual_reset,
bool initial_state)
: handle_(manual_reset, initial_state) {}
template <>
PosixConditionHandle<Thread>::PosixConditionHandle(pthread_t thread)
: handle_(thread) {}
WaitResult Wait(WaitHandle* wait_handle, bool is_alertable,
std::chrono::milliseconds timeout) {
auto handle =
@ -590,104 +739,114 @@ std::unique_ptr<Timer> Timer::CreateSynchronizationTimer() {
return std::make_unique<PosixTimer>(false);
}
class PosixThread : public PosixThreadHandle<Thread> {
class PosixThread : public PosixConditionHandle<Thread> {
public:
explicit PosixThread(pthread_t handle) : PosixThreadHandle(handle) {}
~PosixThread() = default;
PosixThread() = default;
explicit PosixThread(pthread_t thread) : PosixConditionHandle(thread) {}
~PosixThread() override = default;
bool Initialize(CreationParameters params,
std::function<void()> start_routine) {
auto start_data = new ThreadStartData({std::move(start_routine), this});
return handle_.Initialize(params, start_data);
}
void set_name(std::string name) override {
pthread_setname_np(handle_, name.c_str());
Thread::set_name(name);
if (name.length() > 15) {
name = name.substr(0, 15);
}
handle_.set_name(name);
}
uint32_t system_id() const override { return 0; }
uint32_t system_id() const override { return handle_.system_id(); }
// TODO(DrChat)
uint64_t affinity_mask() override { return 0; }
void set_affinity_mask(uint64_t mask) override { assert_always(); }
int priority() override {
int policy;
struct sched_param param;
int ret = pthread_getschedparam(handle_, &policy, &param);
if (ret != 0) {
return -1;
}
return param.sched_priority;
uint64_t affinity_mask() override { return handle_.affinity_mask(); }
void set_affinity_mask(uint64_t mask) override {
handle_.set_affinity_mask(mask);
}
int priority() override { return handle_.priority(); }
void set_priority(int new_priority) override {
struct sched_param param;
param.sched_priority = new_priority;
int ret = pthread_setschedparam(handle_, SCHED_FIFO, &param);
handle_.set_priority(new_priority);
}
// TODO(DrChat)
void QueueUserCallback(std::function<void()> callback) override {
assert_always();
handle_.QueueUserCallback(std::move(callback));
}
bool Resume(uint32_t* out_new_suspend_count = nullptr) override {
assert_always();
return false;
bool Resume(uint32_t* out_new_suspend_count) override {
return handle_.Resume(out_new_suspend_count);
}
bool Suspend(uint32_t* out_previous_suspend_count = nullptr) override {
assert_always();
return false;
bool Suspend(uint32_t* out_previous_suspend_count) override {
return handle_.Suspend(out_previous_suspend_count);
}
void Terminate(int exit_code) override {}
void Terminate(int exit_code) override { handle_.Terminate(exit_code); }
};
thread_local std::unique_ptr<PosixThread> current_thread_ = nullptr;
thread_local PosixThread* current_thread_ = nullptr;
struct ThreadStartData {
std::function<void()> start_routine;
};
void* ThreadStartRoutine(void* parameter) {
current_thread_ =
std::unique_ptr<PosixThread>(new PosixThread(::pthread_self()));
void* PosixCondition<Thread>::ThreadStartRoutine(void* parameter) {
if (pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr) != 0) {
assert_always();
}
threading::set_name("");
auto start_data = reinterpret_cast<ThreadStartData*>(parameter);
start_data->start_routine();
auto start_data = static_cast<ThreadStartData*>(parameter);
assert_not_null(start_data);
assert_not_null(start_data->thread_obj);
auto thread = dynamic_cast<PosixThread*>(start_data->thread_obj);
auto start_routine = std::move(start_data->start_routine);
delete start_data;
return 0;
current_thread_ = thread;
start_routine();
std::unique_lock<std::mutex> lock(mutex_);
thread->handle_.exit_code_ = 0;
thread->handle_.signaled_ = true;
cond_.notify_all();
current_thread_ = nullptr;
return nullptr;
}
std::unique_ptr<Thread> Thread::Create(CreationParameters params,
std::function<void()> start_routine) {
auto start_data = new ThreadStartData({std::move(start_routine)});
assert_false(params.create_suspended);
pthread_t handle;
pthread_attr_t attr;
pthread_attr_init(&attr);
int ret = pthread_create(&handle, &attr, ThreadStartRoutine, start_data);
if (ret != 0) {
// TODO(benvanik): pass back?
auto last_error = errno;
XELOGE("Unable to pthread_create: {}", last_error);
delete start_data;
return nullptr;
}
return std::unique_ptr<PosixThread>(new PosixThread(handle));
auto thread = std::make_unique<PosixThread>();
if (!thread->Initialize(params, std::move(start_routine))) return nullptr;
assert_not_null(thread);
return thread;
}
Thread* Thread::GetCurrentThread() {
if (current_thread_) {
return current_thread_.get();
return current_thread_;
}
// Should take this route only for threads not created by Thread::Create.
// The only thread not created by Thread::Create should be the main thread.
pthread_t handle = pthread_self();
current_thread_ = std::make_unique<PosixThread>(handle);
return current_thread_.get();
current_thread_ = new PosixThread(handle);
atexit([] { delete current_thread_; });
return current_thread_;
}
void Thread::Exit(int exit_code) {
if (current_thread_) {
current_thread_->Terminate(exit_code);
// Sometimes the current thread keeps running after being cancelled.
// Prevent other calls from this thread from using current_thread_.
current_thread_ = nullptr;
} else {
// Should only happen with the main thread
pthread_exit(reinterpret_cast<void*>(exit_code));
}
}
static void signal_handler(int signal, siginfo_t* info, void* /*context*/) {