[threading linux] Implement Callback Queuing

Add thread local bool for alertable state.
Use real-time event interrupt to run callback.
Fix sleep duration from miliseconds (microseconds / 1000) to seconds in sleep
command.
Add note for future implementation.

Ignore real-time event 37 in .gdbinit which is used to signal callback.

Test AlertableSleep
Test Thread QueueUserCallback.
TODO: Test alerted wait result when using IO functions.
This commit is contained in:
Sandy Carter 2018-03-11 14:48:55 -04:00 committed by Rick Gibbed
parent 4397f25325
commit 634f87f63b
3 changed files with 128 additions and 14 deletions

View File

@ -6,3 +6,5 @@ handle SIG35 nostop noprint
handle SIG32 nostop noprint handle SIG32 nostop noprint
# Ignore PosixThread suspend event # Ignore PosixThread suspend event
handle SIG36 nostop noprint handle SIG36 nostop noprint
# Ignore PosixThread user callback event
handle SIG37 nostop noprint

View File

@ -101,8 +101,15 @@ TEST_CASE("Sleep Current Thread", "Sleep") {
} }
TEST_CASE("Sleep Current Thread in Alertable State", "Sleep") { TEST_CASE("Sleep Current Thread in Alertable State", "Sleep") {
// TODO(bwrsandman): auto wait_time = 50ms;
REQUIRE(true); auto start = std::chrono::steady_clock::now();
auto result = threading::AlertableSleep(wait_time);
auto duration = std::chrono::steady_clock::now() - start;
REQUIRE(duration >= wait_time);
REQUIRE(result == threading::SleepResult::kSuccess);
// TODO(bwrsandman): Test a Thread to return kAlerted.
// Need callback to call extended I/O function (ReadFileEx or WriteFileEx)
} }
TEST_CASE("TlsHandle") { TEST_CASE("TlsHandle") {
@ -785,10 +792,77 @@ TEST_CASE("Test Suspending Thread", "Thread") {
} }
TEST_CASE("Test Thread QueueUserCallback", "Thread") { TEST_CASE("Test Thread QueueUserCallback", "Thread") {
// TODO(bwrsandman): Test Exit command with QueueUserCallback std::unique_ptr<Thread> thread;
WaitResult result;
Thread::CreationParameters params = {};
std::atomic_int order;
int is_modified;
int has_finished;
auto callback = [&is_modified, &order] {
is_modified = std::atomic_fetch_add_explicit(
&order, 1, std::memory_order::memory_order_relaxed);
};
// Without alertable
order = 0;
is_modified = -1;
has_finished = -1;
thread = Thread::Create(params, [&has_finished, &order] {
// Not using Alertable so callback is not registered
Sleep(90ms);
has_finished = std::atomic_fetch_add_explicit(
&order, 1, std::memory_order::memory_order_relaxed);
});
result = Wait(thread.get(), true, 50ms);
REQUIRE(result == WaitResult::kTimeout);
REQUIRE(is_modified == -1);
thread->QueueUserCallback(callback);
result = Wait(thread.get(), true, 100ms);
REQUIRE(result == WaitResult::kSuccess);
REQUIRE(is_modified == -1);
REQUIRE(has_finished == 0);
// With alertable
order = 0;
is_modified = -1;
has_finished = -1;
thread = Thread::Create(params, [&has_finished, &order] {
// Using Alertable so callback is registered
AlertableSleep(90ms);
has_finished = std::atomic_fetch_add_explicit(
&order, 1, std::memory_order::memory_order_relaxed);
});
result = Wait(thread.get(), true, 50ms);
REQUIRE(result == WaitResult::kTimeout);
REQUIRE(is_modified == -1);
thread->QueueUserCallback(callback);
result = Wait(thread.get(), true, 100ms);
REQUIRE(result == WaitResult::kSuccess);
REQUIRE(is_modified == 0);
REQUIRE(has_finished == 1);
// Test Exit command with QueueUserCallback
order = 0;
is_modified = -1;
has_finished = -1;
thread = Thread::Create(params, [&is_modified, &has_finished, &order] {
is_modified = std::atomic_fetch_add_explicit(
&order, 1, std::memory_order::memory_order_relaxed);
// Using Alertable so callback is registered
AlertableSleep(200ms);
has_finished = std::atomic_fetch_add_explicit(
&order, 1, std::memory_order::memory_order_relaxed);
});
result = Wait(thread.get(), true, 100ms);
REQUIRE(result == WaitResult::kTimeout);
thread->QueueUserCallback([] { Thread::Exit(0); });
result = Wait(thread.get(), true, 500ms);
REQUIRE(result == WaitResult::kSuccess);
REQUIRE(is_modified == 0);
REQUIRE(has_finished == -1);
// TODO(bwrsandman): Test alertable wait returning kUserCallback by using IO // TODO(bwrsandman): Test alertable wait returning kUserCallback by using IO
// callbacks. // callbacks.
REQUIRE(true);
} }
} // namespace test } // namespace test

View File

@ -38,7 +38,13 @@ inline timespec DurationToTimeSpec(
// This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread // This implementation uses the SIGRTMAX - SIGRTMIN to signal to a thread
// gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop // gdb tip, for SIG = SIGRTMIN + SignalType : handle SIG nostop
// lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false // lldb tip, for SIG = SIGRTMIN + SignalType : process handle SIG -s false
enum class SignalType { kHighResolutionTimer, kTimer, kThreadSuspend, k_Count }; enum class SignalType {
kHighResolutionTimer,
kTimer,
kThreadSuspend,
kThreadUserCallback,
k_Count
};
int GetSystemSignal(SignalType num) { int GetSystemSignal(SignalType num) {
auto result = SIGRTMIN + static_cast<int>(num); auto result = SIGRTMIN + static_cast<int>(num);
@ -102,9 +108,12 @@ void Sleep(std::chrono::microseconds duration) {
} while (ret == -1 && errno == EINTR); } while (ret == -1 && errno == EINTR);
} }
// TODO(dougvj) Not sure how to implement the equivalent of this on POSIX. // TODO(bwrsandman) Implement by allowing alert interrupts from IO operations
thread_local bool alertable_state_ = false;
SleepResult AlertableSleep(std::chrono::microseconds duration) { SleepResult AlertableSleep(std::chrono::microseconds duration) {
sleep(duration.count() / 1000); alertable_state_ = true;
Sleep(duration);
alertable_state_ = false;
return SleepResult::kSuccess; return SleepResult::kSuccess;
} }
@ -567,8 +576,18 @@ class PosixCondition<Thread> : public PosixConditionBase {
} }
void QueueUserCallback(std::function<void()> callback) { void QueueUserCallback(std::function<void()> callback) {
// TODO(bwrsandman) WaitStarted();
assert_always(); std::unique_lock<std::mutex> lock(callback_mutex_);
user_callback_ = std::move(callback);
sigval value{};
value.sival_ptr = this;
pthread_sigqueue(thread_, GetSystemSignal(SignalType::kThreadUserCallback),
value);
}
void CallUserCallback() {
std::unique_lock<std::mutex> lock(callback_mutex_);
user_callback_();
} }
bool Resume(uint32_t* out_new_suspend_count = nullptr) { bool Resume(uint32_t* out_new_suspend_count = nullptr) {
@ -637,7 +656,9 @@ class PosixCondition<Thread> : public PosixConditionBase {
int exit_code_; int exit_code_;
volatile State state_; volatile State state_;
mutable std::mutex state_mutex_; mutable std::mutex state_mutex_;
mutable std::mutex callback_mutex_;
mutable std::condition_variable state_signal_; mutable std::condition_variable state_signal_;
std::function<void()> user_callback_;
}; };
// This wraps a condition object as our handle because posix has no single // This wraps a condition object as our handle because posix has no single
@ -687,7 +708,10 @@ WaitResult Wait(WaitHandle* wait_handle, bool is_alertable,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
auto handle = auto handle =
reinterpret_cast<PosixConditionBase*>(wait_handle->native_handle()); reinterpret_cast<PosixConditionBase*>(wait_handle->native_handle());
return handle->Wait(timeout); if (is_alertable) alertable_state_ = true;
auto result = handle->Wait(timeout);
if (is_alertable) alertable_state_ = false;
return result;
} }
// TODO(dougvj) // TODO(dougvj)
@ -695,10 +719,12 @@ WaitResult SignalAndWait(WaitHandle* wait_handle_to_signal,
WaitHandle* wait_handle_to_wait_on, bool is_alertable, WaitHandle* wait_handle_to_wait_on, bool is_alertable,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
assert_always(); assert_always();
return WaitResult::kFailed; if (is_alertable) alertable_state_ = true;
auto result = WaitResult::kFailed;
if (is_alertable) alertable_state_ = false;
return result;
} }
// TODO(bwrsandman): Add support for is_alertable
std::pair<WaitResult, size_t> WaitMultiple(WaitHandle* wait_handles[], std::pair<WaitResult, size_t> WaitMultiple(WaitHandle* wait_handles[],
size_t wait_handle_count, size_t wait_handle_count,
bool wait_all, bool is_alertable, bool wait_all, bool is_alertable,
@ -708,8 +734,11 @@ std::pair<WaitResult, size_t> WaitMultiple(WaitHandle* wait_handles[],
handles[i] = handles[i] =
reinterpret_cast<PosixConditionBase*>(wait_handles[i]->native_handle()); reinterpret_cast<PosixConditionBase*>(wait_handles[i]->native_handle());
} }
return PosixConditionBase::WaitMultiple(std::move(handles), wait_all, if (is_alertable) alertable_state_ = true;
timeout); auto result =
PosixConditionBase::WaitMultiple(std::move(handles), wait_all, timeout);
if (is_alertable) alertable_state_ = false;
return result;
} }
class PosixEvent : public PosixConditionHandle<Event> { class PosixEvent : public PosixConditionHandle<Event> {
@ -896,6 +925,7 @@ void* PosixCondition<Thread>::ThreadStartRoutine(void* parameter) {
std::unique_ptr<Thread> Thread::Create(CreationParameters params, std::unique_ptr<Thread> Thread::Create(CreationParameters params,
std::function<void()> start_routine) { std::function<void()> start_routine) {
install_signal_handler(SignalType::kThreadSuspend); install_signal_handler(SignalType::kThreadSuspend);
install_signal_handler(SignalType::kThreadUserCallback);
auto thread = std::make_unique<PosixThread>(); auto thread = std::make_unique<PosixThread>();
if (!thread->Initialize(params, std::move(start_routine))) return nullptr; if (!thread->Initialize(params, std::move(start_routine))) return nullptr;
assert_not_null(thread); assert_not_null(thread);
@ -947,6 +977,14 @@ static void signal_handler(int signal, siginfo_t* info, void* /*context*/) {
assert_not_null(current_thread_); assert_not_null(current_thread_);
current_thread_->WaitSuspended(); current_thread_->WaitSuspended();
} break; } break;
case SignalType::kThreadUserCallback: {
assert_not_null(info->si_value.sival_ptr);
auto p_thread =
static_cast<PosixCondition<Thread>*>(info->si_value.sival_ptr);
if (alertable_state_) {
p_thread->CallUserCallback();
}
} break;
default: default:
assert_always(); assert_always();
} }