[threading linux] Implement suspend count

Add suspend count to thread implementation.
Increment suspend count on suspend and decrement on resume.
Wait on suspend count to be decremented to 0.
Return suspend count on suspend and on resume before incr/decr.
Fix naming of resume suspend count to make clear that suspend count is
before incr/decr.
Add test.
This commit is contained in:
Sandy Carter 2019-07-13 16:18:49 -04:00 committed by Rick Gibbed
parent 382dd8860f
commit e945a13957
4 changed files with 68 additions and 18 deletions

View File

@ -849,6 +849,41 @@ TEST_CASE("Test Suspending Thread", "Thread") {
thread->Resume(); thread->Resume();
result = threading::Wait(thread.get(), false, 50ms); result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kSuccess); REQUIRE(result == threading::WaitResult::kSuccess);
// Test recursive suspend
thread = threading::Thread::Create(params, func);
thread->Suspend();
thread->Suspend();
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kTimeout);
thread->Resume();
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kTimeout);
thread->Resume();
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kSuccess);
// Test suspend count
uint32_t suspend_count = 0;
thread = threading::Thread::Create(params, func);
thread->Suspend(&suspend_count);
REQUIRE(suspend_count == 0);
thread->Suspend(&suspend_count);
REQUIRE(suspend_count == 1);
thread->Suspend(&suspend_count);
REQUIRE(suspend_count == 2);
thread->Resume(&suspend_count);
REQUIRE(suspend_count == 3);
thread->Resume(&suspend_count);
REQUIRE(suspend_count == 2);
thread->Resume(&suspend_count);
REQUIRE(suspend_count == 1);
thread->Suspend(&suspend_count);
REQUIRE(suspend_count == 0);
thread->Resume(&suspend_count);
REQUIRE(suspend_count == 1);
result = threading::Wait(thread.get(), false, 50ms);
REQUIRE(result == threading::WaitResult::kSuccess);
} }
TEST_CASE("Test Thread QueueUserCallback", "Thread") { TEST_CASE("Test Thread QueueUserCallback", "Thread") {

View File

@ -389,7 +389,7 @@ class Thread : public WaitHandle {
// Decrements a thread's suspend count. When the suspend count is decremented // Decrements a thread's suspend count. When the suspend count is decremented
// to zero, the execution of the thread is resumed. // to zero, the execution of the thread is resumed.
virtual bool Resume(uint32_t* out_new_suspend_count = nullptr) = 0; virtual bool Resume(uint32_t* out_previous_suspend_count = nullptr) = 0;
// Suspends the specified thread. // Suspends the specified thread.
virtual bool Suspend(uint32_t* out_previous_suspend_count = nullptr) = 0; virtual bool Suspend(uint32_t* out_previous_suspend_count = nullptr) = 0;

View File

@ -473,7 +473,8 @@ class PosixCondition<Thread> : public PosixConditionBase {
: thread_(0), : thread_(0),
signaled_(false), signaled_(false),
exit_code_(0), exit_code_(0),
state_(State::kUninitialized) {} state_(State::kUninitialized),
suspend_count_(0) {}
bool Initialize(Thread::CreationParameters params, bool Initialize(Thread::CreationParameters params,
ThreadStartData* start_data) { ThreadStartData* start_data) {
start_data->create_suspended = params.create_suspended; start_data->create_suspended = params.create_suspended;
@ -608,21 +609,33 @@ class PosixCondition<Thread> : public PosixConditionBase {
user_callback_(); user_callback_();
} }
bool Resume(uint32_t* out_new_suspend_count = nullptr) { bool Resume(uint32_t* out_previous_suspend_count = nullptr) {
// TODO(bwrsandman): implement suspend_count if (out_previous_suspend_count) {
assert_null(out_new_suspend_count); *out_previous_suspend_count = 0;
}
WaitStarted(); WaitStarted();
std::unique_lock<std::mutex> lock(state_mutex_); std::unique_lock<std::mutex> lock(state_mutex_);
if (state_ != State::kSuspended) return false; if (state_ != State::kSuspended) return false;
state_ = State::kRunning; if (out_previous_suspend_count) {
*out_previous_suspend_count = suspend_count_;
}
--suspend_count_;
state_signal_.notify_all(); state_signal_.notify_all();
return true; return true;
} }
bool Suspend(uint32_t* out_previous_suspend_count = nullptr) { bool Suspend(uint32_t* out_previous_suspend_count = nullptr) {
// TODO(bwrsandman): implement suspend_count if (out_previous_suspend_count) {
assert_null(out_previous_suspend_count); *out_previous_suspend_count = 0;
}
WaitStarted(); WaitStarted();
{
if (out_previous_suspend_count) {
*out_previous_suspend_count = suspend_count_;
}
state_ = State::kSuspended;
++suspend_count_;
}
int result = int result =
pthread_kill(thread_, GetSystemSignal(SignalType::kThreadSuspend)); pthread_kill(thread_, GetSystemSignal(SignalType::kThreadSuspend));
return result == 0; return result == 0;
@ -656,8 +669,8 @@ class PosixCondition<Thread> : public PosixConditionBase {
/// Set state to suspended and wait until it reset by another thread /// Set state to suspended and wait until it reset by another thread
void WaitSuspended() { void WaitSuspended() {
std::unique_lock<std::mutex> lock(state_mutex_); std::unique_lock<std::mutex> lock(state_mutex_);
state_ = State::kSuspended; state_signal_.wait(lock, [this] { return suspend_count_ == 0; });
state_signal_.wait(lock, [this] { return state_ != State::kSuspended; }); state_ = State::kRunning;
} }
private: private:
@ -673,6 +686,7 @@ class PosixCondition<Thread> : public PosixConditionBase {
bool signaled_; bool signaled_;
int exit_code_; int exit_code_;
volatile State state_; volatile State state_;
volatile uint32_t suspend_count_;
mutable std::mutex state_mutex_; mutable std::mutex state_mutex_;
mutable std::mutex callback_mutex_; mutable std::mutex callback_mutex_;
mutable std::condition_variable state_signal_; mutable std::condition_variable state_signal_;
@ -883,8 +897,8 @@ class PosixThread : public PosixConditionHandle<Thread> {
handle_.QueueUserCallback(std::move(callback)); handle_.QueueUserCallback(std::move(callback));
} }
bool Resume(uint32_t* out_new_suspend_count) override { bool Resume(uint32_t* out_previous_suspend_count) override {
return handle_.Resume(out_new_suspend_count); return handle_.Resume(out_previous_suspend_count);
} }
bool Suspend(uint32_t* out_previous_suspend_count) override { bool Suspend(uint32_t* out_previous_suspend_count) override {
@ -923,8 +937,9 @@ void* PosixCondition<Thread>::ThreadStartRoutine(void* parameter) {
if (create_suspended) { if (create_suspended) {
std::unique_lock<std::mutex> lock(thread->handle_.state_mutex_); std::unique_lock<std::mutex> lock(thread->handle_.state_mutex_);
thread->handle_.suspend_count_ = 1;
thread->handle_.state_signal_.wait( thread->handle_.state_signal_.wait(
lock, [thread] { return thread->handle_.state_ != State::kSuspended; }); lock, [thread] { return thread->handle_.suspend_count_ == 0; });
} }
start_routine(); start_routine();

View File

@ -388,16 +388,16 @@ class Win32Thread : public Win32Handle<Thread> {
QueueUserAPC(DispatchApc, handle_, reinterpret_cast<ULONG_PTR>(apc_data)); QueueUserAPC(DispatchApc, handle_, reinterpret_cast<ULONG_PTR>(apc_data));
} }
bool Resume(uint32_t* out_new_suspend_count = nullptr) override { bool Resume(uint32_t* out_previous_suspend_count = nullptr) override {
if (out_new_suspend_count) { if (out_previous_suspend_count) {
*out_new_suspend_count = 0; *out_previous_suspend_count = 0;
} }
DWORD result = ResumeThread(handle_); DWORD result = ResumeThread(handle_);
if (result == UINT_MAX) { if (result == UINT_MAX) {
return false; return false;
} }
if (out_new_suspend_count) { if (out_previous_suspend_count) {
*out_new_suspend_count = result; *out_previous_suspend_count = result;
} }
return true; return true;
} }