From ee19d953ecff53908a645d38a1b913fdf15a72f4 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 14 Apr 2022 19:57:52 +0200 Subject: [PATCH] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines The condition for waiting on the s->free_sema queue depends on both s->in_flight and s->state. The latter is currently using atomics, but this is quite dubious and probably wrong. Because s->state is written in the main thread too, for example by the yank callback, it cannot be protected by a CoMutex. Introduce a separate lock that can be used by nbd_co_send_request(); later on this lock will also be used for s->state. There will not be any contention on the lock unless there is a yank or reconnect, so this is not performance sensitive. Signed-off-by: Paolo Bonzini Message-Id: <20220414175756.671165-6-pbonzini@redhat.com> Reviewed-by: Eric Blake Reviewed-by: Lukas Straub Signed-off-by: Eric Blake --- block/nbd.c | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 1e7f609312..cea77becba 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -71,17 +71,22 @@ typedef struct BDRVNBDState { QIOChannel *ioc; /* The current I/O channel */ NBDExportInfo info; - CoMutex send_mutex; + /* + * Protects free_sema, in_flight, requests[].coroutine, + * reconnect_delay_timer. + */ + QemuMutex requests_lock; CoQueue free_sema; - - CoMutex receive_mutex; int in_flight; + NBDClientRequest requests[MAX_NBD_REQUESTS]; + QEMUTimer *reconnect_delay_timer; + + CoMutex send_mutex; + CoMutex receive_mutex; NBDClientState state; - QEMUTimer *reconnect_delay_timer; QEMUTimer *open_timer; - NBDClientRequest requests[MAX_NBD_REQUESTS]; NBDReply reply; BlockDriverState *bs; @@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs, return 0; } -/* called under s->send_mutex */ +/* Called with s->requests_lock taken. */ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) { bool blocking = nbd_client_connecting_wait(s); @@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) s->ioc = NULL; } - qemu_co_mutex_unlock(&s->send_mutex); + qemu_mutex_unlock(&s->requests_lock); nbd_co_do_establish_connection(s->bs, blocking, NULL); - qemu_co_mutex_lock(&s->send_mutex); + qemu_mutex_lock(&s->requests_lock); /* * The reconnect attempt is done (maybe successfully, maybe not), so @@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs, BDRVNBDState *s = (BDRVNBDState *)bs->opaque; int rc, i = -1; - qemu_co_mutex_lock(&s->send_mutex); - + qemu_mutex_lock(&s->requests_lock); while (s->in_flight == MAX_NBD_REQUESTS || (!nbd_client_connected(s) && s->in_flight > 0)) { - qemu_co_queue_wait(&s->free_sema, &s->send_mutex); + qemu_co_queue_wait(&s->free_sema, &s->requests_lock); } s->in_flight++; @@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs, } } - g_assert(qemu_in_coroutine()); assert(i < MAX_NBD_REQUESTS); - s->requests[i].coroutine = qemu_coroutine_self(); s->requests[i].offset = request->from; s->requests[i].receiving = false; + qemu_mutex_unlock(&s->requests_lock); + qemu_co_mutex_lock(&s->send_mutex); request->handle = INDEX_TO_HANDLE(s, i); assert(s->ioc); @@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs, } else { rc = nbd_send_request(s->ioc, request); } + qemu_co_mutex_unlock(&s->send_mutex); -err: if (rc < 0) { + qemu_mutex_lock(&s->requests_lock); +err: nbd_channel_error(s, rc); if (i != -1) { s->requests[i].coroutine = NULL; } s->in_flight--; qemu_co_queue_next(&s->free_sema); + qemu_mutex_unlock(&s->requests_lock); } - qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, return true; break_loop: + qemu_mutex_lock(&s->requests_lock); s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL; - - qemu_co_mutex_lock(&s->send_mutex); s->in_flight--; qemu_co_queue_next(&s->free_sema); - qemu_co_mutex_unlock(&s->send_mutex); + qemu_mutex_unlock(&s->requests_lock); return false; } @@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, BDRVNBDState *s = (BDRVNBDState *)bs->opaque; s->bs = bs; - qemu_co_mutex_init(&s->send_mutex); + qemu_mutex_init(&s->requests_lock); qemu_co_queue_init(&s->free_sema); + qemu_co_mutex_init(&s->send_mutex); qemu_co_mutex_init(&s->receive_mutex); if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) { @@ -2043,9 +2049,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs) reconnect_delay_timer_del(s); + qemu_mutex_lock(&s->requests_lock); if (s->state == NBD_CLIENT_CONNECTING_WAIT) { s->state = NBD_CLIENT_CONNECTING_NOWAIT; } + qemu_mutex_unlock(&s->requests_lock); nbd_co_establish_connection_cancel(s->conn); }