[threading linux] Implement suspendable pthreads

Use real-time event interrupt to communicate suspend in timely manner.
Use conditional_variable to implement suspend wait and resume trigger.

Ignore real-time event 36 in .gdbinit which is used to signal suspend.

Test suspending threads.
This commit is contained in:
Sandy Carter 2019-01-11 14:47:59 -05:00 committed by Rick Gibbed
parent b2912e7891
commit 4397f25325
3 changed files with 69 additions and 13 deletions

View File

@ -4,3 +4,5 @@ handle SIG34 nostop noprint
handle SIG35 nostop noprint handle SIG35 nostop noprint
# Ignore PosixThread exit event # Ignore PosixThread exit event
handle SIG32 nostop noprint handle SIG32 nostop noprint
# Ignore PosixThread suspend event
handle SIG36 nostop noprint

View File

@ -759,8 +759,29 @@ TEST_CASE("Create and Run Thread", "Thread") {
} }
TEST_CASE("Test Suspending Thread", "Thread") { TEST_CASE("Test Suspending Thread", "Thread") {
// TODO(bwrsandman): Test suspension and resume std::unique_ptr<Thread> thread;
REQUIRE(true); WaitResult result;
Thread::CreationParameters params = {};
auto func = [] { Sleep(20ms); };
// Create initially suspended
params.create_suspended = true;
thread = threading::Thread::Create(params, func);
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kTimeout);
thread->Resume();
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kSuccess);
params.create_suspended = false;
// Create and then suspend
thread = threading::Thread::Create(params, func);
thread->Suspend();
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kTimeout);
thread->Resume();
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kSuccess);
} }
TEST_CASE("Test Thread QueueUserCallback", "Thread") { TEST_CASE("Test Thread QueueUserCallback", "Thread") {

View File

@ -38,7 +38,7 @@ inline timespec DurationToTimeSpec(
// This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread // This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread
// gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop // gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop
// lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false // lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false
enum class SignalType { kHighResolutionTimer, kTimer, k_Count }; enum class SignalType { kHighResolutionTimer, kTimer, kThreadSuspend, k_Count };
int GetSystemSignal(SignalType num) { int GetSystemSignal(SignalType num) {
auto result = SIGRTMIN + static_cast<int>(num); auto result = SIGRTMIN + static_cast<int>(num);
@ -430,6 +430,7 @@ class PosixCondition<Timer> : public PosixConditionBase {
struct ThreadStartData { struct ThreadStartData {
std::function<void()> start_routine; std::function<void()> start_routine;
bool create_suspended;
Thread* thread_obj; Thread* thread_obj;
}; };
@ -438,6 +439,7 @@ class PosixCondition<Thread> : public PosixConditionBase {
enum class State { enum class State {
kUninitialized, kUninitialized,
kRunning, kRunning,
kSuspended,
kFinished, kFinished,
}; };
@ -449,7 +451,7 @@ class PosixCondition<Thread> : public PosixConditionBase {
state_(State::kUninitialized) {} state_(State::kUninitialized) {}
bool Initialize(Thread::CreationParameters params, bool Initialize(Thread::CreationParameters params,
ThreadStartData* start_data) { ThreadStartData* start_data) {
assert_false(params.create_suspended); start_data->create_suspended = params.create_suspended;
pthread_attr_t attr; pthread_attr_t attr;
if (pthread_attr_init(&attr) != 0) return false; if (pthread_attr_init(&attr) != 0) return false;
if (pthread_attr_setstacksize(&attr, params.stack_size) != 0) { if (pthread_attr_setstacksize(&attr, params.stack_size) != 0) {
@ -570,15 +572,23 @@ class PosixCondition<Thread> : public PosixConditionBase {
} }
bool Resume(uint32_t* out_new_suspend_count = nullptr) { bool Resume(uint32_t* out_new_suspend_count = nullptr) {
// TODO(bwrsandman) // TODO(bwrsandman): implement suspend_count
assert_always(); assert_null(out_new_suspend_count);
return false; WaitStarted();
std::unique_lock<std::mutex> lock(state_mutex_);
if (state_ != State::kSuspended) return false;
state_ = State::kRunning;
state_signal_.notify_all();
return true;
} }
bool Suspend(uint32_t* out_previous_suspend_count = nullptr) { bool Suspend(uint32_t* out_previous_suspend_count = nullptr) {
// TODO(bwrsandman) // TODO(bwrsandman): implement suspend_count
assert_always(); assert_null(out_previous_suspend_count);
return false; WaitStarted();
int result =
pthread_kill(thread_, GetSystemSignal(SignalType::kThreadSuspend));
return result == 0;
} }
void Terminate(int exit_code) { void Terminate(int exit_code) {
@ -606,6 +616,13 @@ class PosixCondition<Thread> : public PosixConditionBase {
[this] { return state_ != State::kUninitialized; }); [this] { return state_ != State::kUninitialized; });
} }
/// Set state to suspended and wait until it reset by another thread
void WaitSuspended() {
std::unique_lock<std::mutex> lock(state_mutex_);
state_ = State::kSuspended;
state_signal_.wait(lock, [this] { return state_ != State::kSuspended; });
}
private: private:
static void* ThreadStartRoutine(void* parameter); static void* ThreadStartRoutine(void* parameter);
inline bool signaled() const override { return signaled_; } inline bool signaled() const override { return signaled_; }
@ -618,7 +635,7 @@ class PosixCondition<Thread> : public PosixConditionBase {
pthread_t thread_; pthread_t thread_;
bool signaled_; bool signaled_;
int exit_code_; int exit_code_;
State state_; volatile State state_;
mutable std::mutex state_mutex_; mutable std::mutex state_mutex_;
mutable std::condition_variable state_signal_; mutable std::condition_variable state_signal_;
}; };
@ -786,7 +803,8 @@ class PosixThread : public PosixConditionHandle<Thread> {
bool Initialize(CreationParameters params, bool Initialize(CreationParameters params,
std::function<void()> start_routine) { std::function<void()> start_routine) {
auto start_data = new ThreadStartData({std::move(start_routine), this}); auto start_data =
new ThreadStartData({std::move(start_routine), false, this});
return handle_.Initialize(params, start_data); return handle_.Initialize(params, start_data);
} }
@ -824,6 +842,8 @@ class PosixThread : public PosixConditionHandle<Thread> {
} }
void Terminate(int exit_code) override { handle_.Terminate(exit_code); } void Terminate(int exit_code) override { handle_.Terminate(exit_code); }
void WaitSuspended() { handle_.WaitSuspended(); }
}; };
thread_local PosixThread* current_thread_ = nullptr; thread_local PosixThread* current_thread_ = nullptr;
@ -840,12 +860,20 @@ void* PosixCondition<Thread>::ThreadStartRoutine(void* parameter) {
auto thread = dynamic_cast<PosixThread*>(start_data->thread_obj); auto thread = dynamic_cast<PosixThread*>(start_data->thread_obj);
auto start_routine = std::move(start_data->start_routine); auto start_routine = std::move(start_data->start_routine);
auto create_suspended = start_data->create_suspended;
delete start_data; delete start_data;
current_thread_ = thread; current_thread_ = thread;
{ {
std::unique_lock<std::mutex> lock(thread->handle_.state_mutex_); std::unique_lock<std::mutex> lock(thread->handle_.state_mutex_);
if (create_suspended) {
thread->handle_.state_ = State::kSuspended;
thread->handle_.state_signal_.wait(lock, [thread] {
return thread->handle_.state_ != State::kSuspended;
});
} else {
thread->handle_.state_ = State::kRunning; thread->handle_.state_ = State::kRunning;
}
thread->handle_.state_signal_.notify_all(); thread->handle_.state_signal_.notify_all();
} }
@ -867,6 +895,7 @@ void* PosixCondition<Thread>::ThreadStartRoutine(void* parameter) {
std::unique_ptr<Thread> Thread::Create(CreationParameters params, std::unique_ptr<Thread> Thread::Create(CreationParameters params,
std::function<void()> start_routine) { std::function<void()> start_routine) {
install_signal_handler(SignalType::kThreadSuspend);
auto thread = std::make_unique<PosixThread>(); auto thread = std::make_unique<PosixThread>();
if (!thread->Initialize(params, std::move(start_routine))) return nullptr; if (!thread->Initialize(params, std::move(start_routine))) return nullptr;
assert_not_null(thread); assert_not_null(thread);
@ -914,6 +943,10 @@ static void signal_handler(int signal, siginfo_t* info, void* /*context*/) {
static_cast<PosixCondition<Timer>*>(info->si_value.sival_ptr); static_cast<PosixCondition<Timer>*>(info->si_value.sival_ptr);
pTimer->CompletionRoutine(); pTimer->CompletionRoutine();
} break; } break;
case SignalType::kThreadSuspend: {
assert_not_null(current_thread_);
current_thread_->WaitSuspended();
} break;
default: default:
assert_always(); assert_always();
} }