mirror of https://github.com/xemu-project/xemu.git
thread-pool: replace semaphore with condition variable
Since commit f9fc8932b1
("thread-posix: remove the posix semaphore
support", 2022-04-06) QemuSemaphore has its own mutex and condition
variable; this adds unnecessary overhead on I/O with small block sizes.
Check the QTAILQ directly instead of adding the indirection of a
semaphore's count. Using a semaphore has not been necessary since
qemu_cond_timedwait was introduced; the new code has to be careful about
spurious wakeups but it is simpler, for example thread_pool_cancel does
not have to worry about synchronizing the semaphore count with the number
of elements of pool->request_list.
Note that the return value of qemu_cond_timedwait (0 for timeout, 1 for
signal or spurious wakeup) is different from that of qemu_sem_timedwait
(-1 for timeout, 0 for success).
Reported-by: Lukáš Doktor <ldoktor@redhat.com>
Suggested-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
Message-Id: <20220514065012.1149539-3-pbonzini@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
parent
3c7b72ddca
commit
900fa208f5
|
@ -57,7 +57,7 @@ struct ThreadPool {
|
||||||
QEMUBH *completion_bh;
|
QEMUBH *completion_bh;
|
||||||
QemuMutex lock;
|
QemuMutex lock;
|
||||||
QemuCond worker_stopped;
|
QemuCond worker_stopped;
|
||||||
QemuSemaphore sem;
|
QemuCond request_cond;
|
||||||
QEMUBH *new_thread_bh;
|
QEMUBH *new_thread_bh;
|
||||||
|
|
||||||
/* The following variables are only accessed from one AioContext. */
|
/* The following variables are only accessed from one AioContext. */
|
||||||
|
@ -74,23 +74,6 @@ struct ThreadPool {
|
||||||
int max_threads;
|
int max_threads;
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline bool back_to_sleep(ThreadPool *pool, int ret)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* The semaphore timed out, we should exit the loop except when:
|
|
||||||
* - There is work to do, we raced with the signal.
|
|
||||||
* - The max threads threshold just changed, we raced with the signal.
|
|
||||||
* - The thread pool forces a minimum number of readily available threads.
|
|
||||||
*/
|
|
||||||
if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
|
|
||||||
pool->cur_threads > pool->max_threads ||
|
|
||||||
pool->cur_threads <= pool->min_threads)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *worker_thread(void *opaque)
|
static void *worker_thread(void *opaque)
|
||||||
{
|
{
|
||||||
ThreadPool *pool = opaque;
|
ThreadPool *pool = opaque;
|
||||||
|
@ -99,20 +82,25 @@ static void *worker_thread(void *opaque)
|
||||||
pool->pending_threads--;
|
pool->pending_threads--;
|
||||||
do_spawn_thread(pool);
|
do_spawn_thread(pool);
|
||||||
|
|
||||||
while (!pool->stopping) {
|
while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
|
||||||
ThreadPoolElement *req;
|
ThreadPoolElement *req;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
do {
|
if (QTAILQ_EMPTY(&pool->request_list)) {
|
||||||
pool->idle_threads++;
|
pool->idle_threads++;
|
||||||
qemu_mutex_unlock(&pool->lock);
|
ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
|
||||||
ret = qemu_sem_timedwait(&pool->sem, 10000);
|
|
||||||
qemu_mutex_lock(&pool->lock);
|
|
||||||
pool->idle_threads--;
|
pool->idle_threads--;
|
||||||
} while (back_to_sleep(pool, ret));
|
if (ret == 0 &&
|
||||||
if (ret == -1 || pool->stopping ||
|
QTAILQ_EMPTY(&pool->request_list) &&
|
||||||
pool->cur_threads > pool->max_threads) {
|
pool->cur_threads > pool->min_threads) {
|
||||||
break;
|
/* Timed out + no work to do + no need for warm threads = exit. */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Even if there was some work to do, check if there aren't
|
||||||
|
* too many worker threads before picking it up.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
req = QTAILQ_FIRST(&pool->request_list);
|
req = QTAILQ_FIRST(&pool->request_list);
|
||||||
|
@ -134,6 +122,12 @@ static void *worker_thread(void *opaque)
|
||||||
pool->cur_threads--;
|
pool->cur_threads--;
|
||||||
qemu_cond_signal(&pool->worker_stopped);
|
qemu_cond_signal(&pool->worker_stopped);
|
||||||
qemu_mutex_unlock(&pool->lock);
|
qemu_mutex_unlock(&pool->lock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Wake up another thread, in case we got a wakeup but decided
|
||||||
|
* to exit due to pool->cur_threads > pool->max_threads.
|
||||||
|
*/
|
||||||
|
qemu_cond_signal(&pool->request_cond);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,13 +223,7 @@ static void thread_pool_cancel(BlockAIOCB *acb)
|
||||||
trace_thread_pool_cancel(elem, elem->common.opaque);
|
trace_thread_pool_cancel(elem, elem->common.opaque);
|
||||||
|
|
||||||
QEMU_LOCK_GUARD(&pool->lock);
|
QEMU_LOCK_GUARD(&pool->lock);
|
||||||
if (elem->state == THREAD_QUEUED &&
|
if (elem->state == THREAD_QUEUED) {
|
||||||
/* No thread has yet started working on elem. we can try to "steal"
|
|
||||||
* the item from the worker if we can get a signal from the
|
|
||||||
* semaphore. Because this is non-blocking, we can do it with
|
|
||||||
* the lock taken and ensure that elem will remain THREAD_QUEUED.
|
|
||||||
*/
|
|
||||||
qemu_sem_timedwait(&pool->sem, 0) == 0) {
|
|
||||||
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
|
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
|
||||||
qemu_bh_schedule(pool->completion_bh);
|
qemu_bh_schedule(pool->completion_bh);
|
||||||
|
|
||||||
|
@ -280,7 +268,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
|
||||||
}
|
}
|
||||||
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
|
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
|
||||||
qemu_mutex_unlock(&pool->lock);
|
qemu_mutex_unlock(&pool->lock);
|
||||||
qemu_sem_post(&pool->sem);
|
qemu_cond_signal(&pool->request_cond);
|
||||||
return &req->common;
|
return &req->common;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,7 +311,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
|
||||||
* We either have to:
|
* We either have to:
|
||||||
* - Increase the number available of threads until over the min_threads
|
* - Increase the number available of threads until over the min_threads
|
||||||
* threshold.
|
* threshold.
|
||||||
* - Decrease the number of available threads until under the max_threads
|
* - Bump the worker threads so that they exit, until under the max_threads
|
||||||
* threshold.
|
* threshold.
|
||||||
* - Do nothing. The current number of threads fall in between the min and
|
* - Do nothing. The current number of threads fall in between the min and
|
||||||
* max thresholds. We'll let the pool manage itself.
|
* max thresholds. We'll let the pool manage itself.
|
||||||
|
@ -333,7 +321,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = pool->cur_threads; i > pool->max_threads; i--) {
|
for (int i = pool->cur_threads; i > pool->max_threads; i--) {
|
||||||
qemu_sem_post(&pool->sem);
|
qemu_cond_signal(&pool->request_cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
qemu_mutex_unlock(&pool->lock);
|
qemu_mutex_unlock(&pool->lock);
|
||||||
|
@ -350,7 +338,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
|
||||||
pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
|
pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
|
||||||
qemu_mutex_init(&pool->lock);
|
qemu_mutex_init(&pool->lock);
|
||||||
qemu_cond_init(&pool->worker_stopped);
|
qemu_cond_init(&pool->worker_stopped);
|
||||||
qemu_sem_init(&pool->sem, 0);
|
qemu_cond_init(&pool->request_cond);
|
||||||
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
|
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
|
||||||
|
|
||||||
QLIST_INIT(&pool->head);
|
QLIST_INIT(&pool->head);
|
||||||
|
@ -383,15 +371,15 @@ void thread_pool_free(ThreadPool *pool)
|
||||||
|
|
||||||
/* Wait for worker threads to terminate */
|
/* Wait for worker threads to terminate */
|
||||||
pool->stopping = true;
|
pool->stopping = true;
|
||||||
|
qemu_cond_broadcast(&pool->request_cond);
|
||||||
while (pool->cur_threads > 0) {
|
while (pool->cur_threads > 0) {
|
||||||
qemu_sem_post(&pool->sem);
|
|
||||||
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
|
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
qemu_mutex_unlock(&pool->lock);
|
qemu_mutex_unlock(&pool->lock);
|
||||||
|
|
||||||
qemu_bh_delete(pool->completion_bh);
|
qemu_bh_delete(pool->completion_bh);
|
||||||
qemu_sem_destroy(&pool->sem);
|
qemu_cond_destroy(&pool->request_cond);
|
||||||
qemu_cond_destroy(&pool->worker_stopped);
|
qemu_cond_destroy(&pool->worker_stopped);
|
||||||
qemu_mutex_destroy(&pool->lock);
|
qemu_mutex_destroy(&pool->lock);
|
||||||
g_free(pool);
|
g_free(pool);
|
||||||
|
|
Loading…
Reference in New Issue