diff --git a/block/coroutines.h b/block/coroutines.h index 4cfb4946e6..514d169d23 100644 --- a/block/coroutines.h +++ b/block/coroutines.h @@ -66,4 +66,10 @@ int coroutine_fn bdrv_co_readv_vmstate(BlockDriverState *bs, int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs, QEMUIOVector *qiov, int64_t pos); +int generated_co_wrapper +nbd_do_establish_connection(BlockDriverState *bs, Error **errp); +int coroutine_fn +nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp); + + #endif /* BLOCK_COROUTINES_INT_H */ diff --git a/block/nbd.c b/block/nbd.c index 616f9ae6c4..3cbee762de 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -44,6 +44,7 @@ #include "block/qdict.h" #include "block/nbd.h" #include "block/block_int.h" +#include "block/coroutines.h" #include "qemu/yank.h" @@ -66,50 +67,8 @@ typedef enum NBDClientState { NBD_CLIENT_QUIT } NBDClientState; -typedef enum NBDConnectThreadState { - /* No thread, no pending results */ - CONNECT_THREAD_NONE, - - /* Thread is running, no results for now */ - CONNECT_THREAD_RUNNING, - - /* - * Thread is running, but requestor exited. Thread should close - * the new socket and free the connect state on exit. - */ - CONNECT_THREAD_RUNNING_DETACHED, - - /* Thread finished, results are stored in a state */ - CONNECT_THREAD_FAIL, - CONNECT_THREAD_SUCCESS -} NBDConnectThreadState; - -typedef struct NBDConnectThread { - /* Initialization constants */ - SocketAddress *saddr; /* address to connect to */ - /* - * Bottom half to schedule on completion. Scheduled only if bh_ctx is not - * NULL - */ - QEMUBHFunc *bh_func; - void *bh_opaque; - - /* - * Result of last attempt. Valid in FAIL and SUCCESS states. - * If you want to steal error, don't forget to set pointer to NULL. - */ - QIOChannelSocket *sioc; - Error *err; - - /* state and bh_ctx are protected by mutex */ - QemuMutex mutex; - NBDConnectThreadState state; /* current state of the thread */ - AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */ -} NBDConnectThread; - typedef struct BDRVNBDState { - QIOChannelSocket *sioc; /* The master data channel */ - QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ + QIOChannel *ioc; /* The current I/O channel */ NBDExportInfo info; CoMutex send_mutex; @@ -121,8 +80,6 @@ typedef struct BDRVNBDState { bool wait_drained_end; int in_flight; NBDClientState state; - int connect_status; - Error *connect_err; bool wait_in_flight; QEMUTimer *reconnect_delay_timer; @@ -140,20 +97,20 @@ typedef struct BDRVNBDState { char *x_dirty_bitmap; bool alloc_depth; - bool wait_connect; - NBDConnectThread *connect_thread; + NBDClientConnection *conn; } BDRVNBDState; -static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr, - Error **errp); -static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp); -static void nbd_co_establish_connection_cancel(BlockDriverState *bs, - bool detach); -static int nbd_client_handshake(BlockDriverState *bs, Error **errp); static void nbd_yank(void *opaque); -static void nbd_clear_bdrvstate(BDRVNBDState *s) +static void nbd_clear_bdrvstate(BlockDriverState *bs) { + BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + + nbd_client_connection_release(s->conn); + s->conn = NULL; + + yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); + object_unref(OBJECT(s->tlscreds)); qapi_free_SocketAddress(s->saddr); s->saddr = NULL; @@ -165,15 +122,20 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s) s->x_dirty_bitmap = NULL; } +static bool nbd_client_connected(BDRVNBDState *s) +{ + return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED; +} + static void nbd_channel_error(BDRVNBDState *s, int ret) { if (ret == -EIO) { - if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { + if (nbd_client_connected(s)) { s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT : NBD_CLIENT_CONNECTING_NOWAIT; } } else { - if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { + if (nbd_client_connected(s)) { qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } s->state = NBD_CLIENT_QUIT; @@ -188,6 +150,7 @@ static void nbd_recv_coroutines_wake_all(BDRVNBDState *s) NBDClientRequest *req = &s->requests[i]; if (req->coroutine && req->receiving) { + req->receiving = false; aio_co_wake(req->coroutine); } } @@ -271,7 +234,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs, * s->connection_co is either yielded from nbd_receive_reply or from * nbd_co_reconnect_loop() */ - if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { + if (nbd_client_connected(s)) { qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context); } @@ -291,7 +254,7 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs) s->drained = true; qemu_co_sleep_wake(&s->reconnect_sleep); - nbd_co_establish_connection_cancel(bs, false); + nbd_co_establish_connection_cancel(s->conn); reconnect_delay_timer_del(s); @@ -320,16 +283,12 @@ static void nbd_teardown_connection(BlockDriverState *bs) if (s->ioc) { /* finish any pending coroutines */ qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - } else if (s->sioc) { - /* abort negotiation */ - qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, - NULL); } s->state = NBD_CLIENT_QUIT; if (s->connection_co) { qemu_co_sleep_wake(&s->reconnect_sleep); - nbd_co_establish_connection_cancel(bs, true); + nbd_co_establish_connection_cancel(s->conn); } if (qemu_in_coroutine()) { s->teardown_co = qemu_coroutine_self(); @@ -354,239 +313,95 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s) return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT; } -static void connect_bh(void *opaque) -{ - BDRVNBDState *state = opaque; - - assert(state->wait_connect); - state->wait_connect = false; - aio_co_wake(state->connection_co); -} - -static void nbd_init_connect_thread(BDRVNBDState *s) -{ - s->connect_thread = g_new(NBDConnectThread, 1); - - *s->connect_thread = (NBDConnectThread) { - .saddr = QAPI_CLONE(SocketAddress, s->saddr), - .state = CONNECT_THREAD_NONE, - .bh_func = connect_bh, - .bh_opaque = s, - }; - - qemu_mutex_init(&s->connect_thread->mutex); -} - -static void nbd_free_connect_thread(NBDConnectThread *thr) -{ - if (thr->sioc) { - qio_channel_close(QIO_CHANNEL(thr->sioc), NULL); - } - error_free(thr->err); - qapi_free_SocketAddress(thr->saddr); - g_free(thr); -} - -static void *connect_thread_func(void *opaque) -{ - NBDConnectThread *thr = opaque; - int ret; - bool do_free = false; - - thr->sioc = qio_channel_socket_new(); - - error_free(thr->err); - thr->err = NULL; - ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err); - if (ret < 0) { - object_unref(OBJECT(thr->sioc)); - thr->sioc = NULL; - } - - qemu_mutex_lock(&thr->mutex); - - switch (thr->state) { - case CONNECT_THREAD_RUNNING: - thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS; - if (thr->bh_ctx) { - aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque); - - /* play safe, don't reuse bh_ctx on further connection attempts */ - thr->bh_ctx = NULL; - } - break; - case CONNECT_THREAD_RUNNING_DETACHED: - do_free = true; - break; - default: - abort(); - } - - qemu_mutex_unlock(&thr->mutex); - - if (do_free) { - nbd_free_connect_thread(thr); - } - - return NULL; -} - -static int coroutine_fn -nbd_co_establish_connection(BlockDriverState *bs, Error **errp) -{ - int ret; - QemuThread thread; - BDRVNBDState *s = bs->opaque; - NBDConnectThread *thr = s->connect_thread; - - if (!thr) { - /* detached */ - return -1; - } - - qemu_mutex_lock(&thr->mutex); - - switch (thr->state) { - case CONNECT_THREAD_FAIL: - case CONNECT_THREAD_NONE: - error_free(thr->err); - thr->err = NULL; - thr->state = CONNECT_THREAD_RUNNING; - qemu_thread_create(&thread, "nbd-connect", - connect_thread_func, thr, QEMU_THREAD_DETACHED); - break; - case CONNECT_THREAD_SUCCESS: - /* Previous attempt finally succeeded in background */ - thr->state = CONNECT_THREAD_NONE; - s->sioc = thr->sioc; - thr->sioc = NULL; - yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), - nbd_yank, bs); - qemu_mutex_unlock(&thr->mutex); - return 0; - case CONNECT_THREAD_RUNNING: - /* Already running, will wait */ - break; - default: - abort(); - } - - thr->bh_ctx = qemu_get_current_aio_context(); - - qemu_mutex_unlock(&thr->mutex); - - - /* - * We are going to wait for connect-thread finish, but - * nbd_client_co_drain_begin() can interrupt. - * - * Note that wait_connect variable is not visible for connect-thread. It - * doesn't need mutex protection, it used only inside home aio context of - * bs. - */ - s->wait_connect = true; - qemu_coroutine_yield(); - - if (!s->connect_thread) { - /* detached */ - return -1; - } - assert(thr == s->connect_thread); - - qemu_mutex_lock(&thr->mutex); - - switch (thr->state) { - case CONNECT_THREAD_SUCCESS: - case CONNECT_THREAD_FAIL: - thr->state = CONNECT_THREAD_NONE; - error_propagate(errp, thr->err); - thr->err = NULL; - s->sioc = thr->sioc; - thr->sioc = NULL; - if (s->sioc) { - yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), - nbd_yank, bs); - } - ret = (s->sioc ? 0 : -1); - break; - case CONNECT_THREAD_RUNNING: - case CONNECT_THREAD_RUNNING_DETACHED: - /* - * Obviously, drained section wants to start. Report the attempt as - * failed. Still connect thread is executing in background, and its - * result may be used for next connection attempt. - */ - ret = -1; - error_setg(errp, "Connection attempt cancelled by other operation"); - break; - - case CONNECT_THREAD_NONE: - /* - * Impossible. We've seen this thread running. So it should be - * running or at least give some results. - */ - abort(); - - default: - abort(); - } - - qemu_mutex_unlock(&thr->mutex); - - return ret; -} - /* - * nbd_co_establish_connection_cancel - * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to - * allow drained section to begin. - * - * If detach is true, also cleanup the state (or if thread is running, move it - * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if - * detach is true. + * Update @bs with information learned during a completed negotiation process. + * Return failure if the server's advertised options are incompatible with the + * client's needs. */ -static void nbd_co_establish_connection_cancel(BlockDriverState *bs, - bool detach) +static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp) { - BDRVNBDState *s = bs->opaque; - NBDConnectThread *thr = s->connect_thread; - bool wake = false; - bool do_free = false; + BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + int ret; - qemu_mutex_lock(&thr->mutex); - - if (thr->state == CONNECT_THREAD_RUNNING) { - /* We can cancel only in running state, when bh is not yet scheduled */ - thr->bh_ctx = NULL; - if (s->wait_connect) { - s->wait_connect = false; - wake = true; + if (s->x_dirty_bitmap) { + if (!s->info.base_allocation) { + error_setg(errp, "requested x-dirty-bitmap %s not found", + s->x_dirty_bitmap); + return -EINVAL; } - if (detach) { - thr->state = CONNECT_THREAD_RUNNING_DETACHED; - s->connect_thread = NULL; + if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) { + s->alloc_depth = true; } - } else if (detach) { - do_free = true; } - qemu_mutex_unlock(&thr->mutex); - - if (do_free) { - nbd_free_connect_thread(thr); - s->connect_thread = NULL; + if (s->info.flags & NBD_FLAG_READ_ONLY) { + ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp); + if (ret < 0) { + return ret; + } } - if (wake) { - aio_co_wake(s->connection_co); + if (s->info.flags & NBD_FLAG_SEND_FUA) { + bs->supported_write_flags = BDRV_REQ_FUA; + bs->supported_zero_flags |= BDRV_REQ_FUA; } + + if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) { + bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP; + if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) { + bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK; + } + } + + trace_nbd_client_handshake_success(s->export); + + return 0; +} + +int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs, + Error **errp) +{ + BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + int ret; + + assert(!s->ioc); + + s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp); + if (!s->ioc) { + return -ECONNREFUSED; + } + + ret = nbd_handle_updated_info(s->bs, NULL); + if (ret < 0) { + /* + * We have connected, but must fail for other reasons. + * Send NBD_CMD_DISC as a courtesy to the server. + */ + NBDRequest request = { .type = NBD_CMD_DISC }; + + nbd_send_request(s->ioc, &request); + + object_unref(OBJECT(s->ioc)); + s->ioc = NULL; + + return ret; + } + + qio_channel_set_blocking(s->ioc, false, NULL); + qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs)); + + yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank, + bs); + + /* successfully connected */ + s->state = NBD_CLIENT_CONNECTED; + qemu_co_queue_restart_all(&s->free_sema); + + return 0; } static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) { - int ret; - Error *local_err = NULL; - if (!nbd_client_connecting(s)) { return; } @@ -620,44 +435,11 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank, s->bs); - object_unref(OBJECT(s->sioc)); - s->sioc = NULL; object_unref(OBJECT(s->ioc)); s->ioc = NULL; } - if (nbd_co_establish_connection(s->bs, &local_err) < 0) { - ret = -ECONNREFUSED; - goto out; - } - - bdrv_dec_in_flight(s->bs); - - ret = nbd_client_handshake(s->bs, &local_err); - - if (s->drained) { - s->wait_drained_end = true; - while (s->drained) { - /* - * We may be entered once from nbd_client_attach_aio_context_bh - * and then from nbd_client_co_drain_end. So here is a loop. - */ - qemu_coroutine_yield(); - } - } - bdrv_inc_in_flight(s->bs); - -out: - s->connect_status = ret; - error_free(s->connect_err); - s->connect_err = NULL; - error_propagate(&s->connect_err, local_err); - - if (ret >= 0) { - /* successfully connected */ - s->state = NBD_CLIENT_CONNECTED; - qemu_co_queue_restart_all(&s->free_sema); - } + nbd_co_do_establish_connection(s->bs, NULL); } static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s) @@ -723,7 +505,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) nbd_co_reconnect_loop(s); } - if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { + if (!nbd_client_connected(s)) { continue; } @@ -767,6 +549,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) * connection_co happens through a bottom half, which can only * run after we yield. */ + s->requests[i].receiving = false; aio_co_wake(s->requests[i].coroutine); qemu_coroutine_yield(); } @@ -780,8 +563,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque) qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank, s->bs); - object_unref(OBJECT(s->sioc)); - s->sioc = NULL; object_unref(OBJECT(s->ioc)); s->ioc = NULL; } @@ -804,7 +585,7 @@ static int nbd_co_send_request(BlockDriverState *bs, qemu_co_queue_wait(&s->free_sema, &s->send_mutex); } - if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { + if (!nbd_client_connected(s)) { rc = -EIO; goto err; } @@ -831,8 +612,7 @@ static int nbd_co_send_request(BlockDriverState *bs, if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); - if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED && - rc >= 0) { + if (nbd_client_connected(s) && rc >= 0) { if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov, NULL) < 0) { rc = -EIO; @@ -1156,8 +936,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk( /* Wait until we're woken up by nbd_connection_entry. */ s->requests[i].receiving = true; qemu_coroutine_yield(); - s->requests[i].receiving = false; - if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { + assert(!s->requests[i].receiving); + if (!nbd_client_connected(s)) { error_setg(errp, "Connection closed"); return -EIO; } @@ -1316,7 +1096,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, NBDReply local_reply; NBDStructuredReplyChunk *chunk; Error *local_err = NULL; - if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { + if (!nbd_client_connected(s)) { error_setg(&local_err, "Connection closed"); nbd_iter_channel_error(iter, -EIO, &local_err); goto break_loop; @@ -1341,8 +1121,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, } /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */ - if (nbd_reply_is_simple(reply) || - qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { + if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) { goto break_loop; } @@ -1780,7 +1559,7 @@ static void nbd_yank(void *opaque) BDRVNBDState *s = (BDRVNBDState *)bs->opaque; qatomic_store_release(&s->state, NBD_CLIENT_QUIT); - qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } static void nbd_client_close(BlockDriverState *bs) @@ -1795,111 +1574,6 @@ static void nbd_client_close(BlockDriverState *bs) nbd_teardown_connection(bs); } -static int nbd_establish_connection(BlockDriverState *bs, - SocketAddress *saddr, - Error **errp) -{ - ERRP_GUARD(); - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - s->sioc = qio_channel_socket_new(); - qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client"); - - qio_channel_socket_connect_sync(s->sioc, saddr, errp); - if (*errp) { - object_unref(OBJECT(s->sioc)); - s->sioc = NULL; - return -1; - } - - yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs); - qio_channel_set_delay(QIO_CHANNEL(s->sioc), false); - - return 0; -} - -/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */ -static int nbd_client_handshake(BlockDriverState *bs, Error **errp) -{ - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - AioContext *aio_context = bdrv_get_aio_context(bs); - int ret; - - trace_nbd_client_handshake(s->export); - qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL); - qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context); - - s->info.request_sizes = true; - s->info.structured_reply = true; - s->info.base_allocation = true; - s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap); - s->info.name = g_strdup(s->export ?: ""); - ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds, - s->hostname, &s->ioc, &s->info, errp); - g_free(s->info.x_dirty_bitmap); - g_free(s->info.name); - if (ret < 0) { - yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), - nbd_yank, bs); - object_unref(OBJECT(s->sioc)); - s->sioc = NULL; - return ret; - } - if (s->x_dirty_bitmap) { - if (!s->info.base_allocation) { - error_setg(errp, "requested x-dirty-bitmap %s not found", - s->x_dirty_bitmap); - ret = -EINVAL; - goto fail; - } - if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) { - s->alloc_depth = true; - } - } - if (s->info.flags & NBD_FLAG_READ_ONLY) { - ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp); - if (ret < 0) { - goto fail; - } - } - if (s->info.flags & NBD_FLAG_SEND_FUA) { - bs->supported_write_flags = BDRV_REQ_FUA; - bs->supported_zero_flags |= BDRV_REQ_FUA; - } - if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) { - bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP; - if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) { - bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK; - } - } - - if (!s->ioc) { - s->ioc = QIO_CHANNEL(s->sioc); - object_ref(OBJECT(s->ioc)); - } - - trace_nbd_client_handshake_success(s->export); - - return 0; - - fail: - /* - * We have connected, but must fail for other reasons. - * Send NBD_CMD_DISC as a courtesy to the server. - */ - { - NBDRequest request = { .type = NBD_CMD_DISC }; - - nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request); - - yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), - nbd_yank, bs); - object_unref(OBJECT(s->sioc)); - s->sioc = NULL; - - return ret; - } -} /* * Parse nbd_open options @@ -2133,6 +1807,12 @@ static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options, goto done; } + if (socket_address_parse_named_fd(saddr, errp) < 0) { + qapi_free_SocketAddress(saddr); + saddr = NULL; + goto done; + } + done: qobject_unref(addr); visit_free(iv); @@ -2274,9 +1954,6 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options, ret = 0; error: - if (ret < 0) { - nbd_clear_bdrvstate(s); - } qemu_opts_del(opts); return ret; } @@ -2287,11 +1964,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, int ret; BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - ret = nbd_process_options(bs, options, errp); - if (ret < 0) { - return ret; - } - s->bs = bs; qemu_co_mutex_init(&s->send_mutex); qemu_co_queue_init(&s->free_sema); @@ -2300,31 +1972,29 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, return -EEXIST; } - /* - * establish TCP connection, return error if it fails - * TODO: Configurable retry-until-timeout behaviour. - */ - if (nbd_establish_connection(bs, s->saddr, errp) < 0) { - yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); - return -ECONNREFUSED; - } - - ret = nbd_client_handshake(bs, errp); + ret = nbd_process_options(bs, options, errp); if (ret < 0) { - yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); - nbd_clear_bdrvstate(s); - return ret; + goto fail; } - /* successfully connected */ - s->state = NBD_CLIENT_CONNECTED; - nbd_init_connect_thread(s); + s->conn = nbd_client_connection_new(s->saddr, true, s->export, + s->x_dirty_bitmap, s->tlscreds); + + /* TODO: Configurable retry-until-timeout behaviour. */ + ret = nbd_do_establish_connection(bs, errp); + if (ret < 0) { + goto fail; + } s->connection_co = qemu_coroutine_create(nbd_connection_entry, s); bdrv_inc_in_flight(bs); aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co); return 0; + +fail: + nbd_clear_bdrvstate(bs); + return ret; } static int nbd_co_flush(BlockDriverState *bs) @@ -2368,11 +2038,8 @@ static void nbd_refresh_limits(BlockDriverState *bs, Error **errp) static void nbd_close(BlockDriverState *bs) { - BDRVNBDState *s = bs->opaque; - nbd_client_close(bs); - yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); - nbd_clear_bdrvstate(s); + nbd_clear_bdrvstate(bs); } /* diff --git a/include/block/aio.h b/include/block/aio.h index 5f342267d5..10fcae1515 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co); * Return the AioContext whose event loop runs in the current thread. * * If called from an IOThread this will be the IOThread's AioContext. If - * called from another thread it will be the main loop AioContext. + * called from the main thread or with the "big QEMU lock" taken it + * will be the main loop AioContext. */ AioContext *qemu_get_current_aio_context(void); +void qemu_set_current_aio_context(AioContext *ctx); + /** * aio_context_setup: * @ctx: the aio context diff --git a/include/block/nbd.h b/include/block/nbd.h index 5f34d23bb0..78d101b774 100644 --- a/include/block/nbd.h +++ b/include/block/nbd.h @@ -406,4 +406,22 @@ const char *nbd_info_lookup(uint16_t info); const char *nbd_cmd_lookup(uint16_t info); const char *nbd_err_lookup(int err); +/* nbd/client-connection.c */ +typedef struct NBDClientConnection NBDClientConnection; + +void nbd_client_connection_enable_retry(NBDClientConnection *conn); + +NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr, + bool do_negotiation, + const char *export_name, + const char *x_dirty_bitmap, + QCryptoTLSCreds *tlscreds); +void nbd_client_connection_release(NBDClientConnection *conn); + +QIOChannel *coroutine_fn +nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info, + bool blocking, Error **errp); + +void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn); + #endif diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 292e61aef0..4829ff373d 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -210,13 +210,15 @@ void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock); /** * Removes the next coroutine from the CoQueue, and wake it up. * Returns true if a coroutine was removed, false if the queue is empty. + * OK to run from coroutine and non-coroutine context. */ -bool coroutine_fn qemu_co_queue_next(CoQueue *queue); +bool qemu_co_queue_next(CoQueue *queue); /** * Empties the CoQueue; all coroutines are woken up. + * OK to run from coroutine and non-coroutine context. */ -void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue); +void qemu_co_queue_restart_all(CoQueue *queue); /** * Removes the next coroutine from the CoQueue, and wake it up. Unlike diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h index 7d1f813576..0c34bf2398 100644 --- a/include/qemu/sockets.h +++ b/include/qemu/sockets.h @@ -111,4 +111,15 @@ SocketAddress *socket_remote_address(int fd, Error **errp); */ SocketAddress *socket_address_flatten(SocketAddressLegacy *addr); +/** + * socket_address_parse_named_fd: + * + * Modify @addr, replacing a named fd by its corresponding number. + * Needed for callers that plan to pass @addr to a context where the + * current monitor is not available. + * + * Return 0 on success. + */ +int socket_address_parse_named_fd(SocketAddress *addr, Error **errp); + #endif /* QEMU_SOCKETS_H */ diff --git a/iothread.c b/iothread.c index 7f086387be..2c5ccd7367 100644 --- a/iothread.c +++ b/iothread.c @@ -39,13 +39,6 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD, #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL #endif -static __thread IOThread *my_iothread; - -AioContext *qemu_get_current_aio_context(void) -{ - return my_iothread ? my_iothread->ctx : qemu_get_aio_context(); -} - static void *iothread_run(void *opaque) { IOThread *iothread = opaque; @@ -56,7 +49,7 @@ static void *iothread_run(void *opaque) * in this new thread uses glib. */ g_main_context_push_thread_default(iothread->worker_context); - my_iothread = iothread; + qemu_set_current_aio_context(iothread->ctx); iothread->thread_id = qemu_get_thread_id(); qemu_sem_post(&iothread->init_done_sem); diff --git a/nbd/client-connection.c b/nbd/client-connection.c new file mode 100644 index 0000000000..7123b1e189 --- /dev/null +++ b/nbd/client-connection.c @@ -0,0 +1,388 @@ +/* + * QEMU Block driver for NBD + * + * Copyright (c) 2021 Virtuozzo International GmbH. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" + +#include "block/nbd.h" + +#include "qapi/qapi-visit-sockets.h" +#include "qapi/clone-visitor.h" + +struct NBDClientConnection { + /* Initialization constants, never change */ + SocketAddress *saddr; /* address to connect to */ + QCryptoTLSCreds *tlscreds; + NBDExportInfo initial_info; + bool do_negotiation; + bool do_retry; + + QemuMutex mutex; + + /* + * @sioc and @err represent a connection attempt. While running + * is true, they are only used by the connection thread, and mutex + * locking is not needed. Once the thread finishes, + * nbd_co_establish_connection then steals these pointers while + * under the mutex. + */ + NBDExportInfo updated_info; + QIOChannelSocket *sioc; + QIOChannel *ioc; + Error *err; + + /* All further fields are accessed only under mutex */ + bool running; /* thread is running now */ + bool detached; /* thread is detached and should cleanup the state */ + + /* + * wait_co: if non-NULL, which coroutine to wake in + * nbd_co_establish_connection() after yield() + */ + Coroutine *wait_co; +}; + +/* + * The function isn't protected by any mutex, only call it when the client + * connection attempt has not yet started. + */ +void nbd_client_connection_enable_retry(NBDClientConnection *conn) +{ + conn->do_retry = true; +} + +NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr, + bool do_negotiation, + const char *export_name, + const char *x_dirty_bitmap, + QCryptoTLSCreds *tlscreds) +{ + NBDClientConnection *conn = g_new(NBDClientConnection, 1); + + object_ref(OBJECT(tlscreds)); + *conn = (NBDClientConnection) { + .saddr = QAPI_CLONE(SocketAddress, saddr), + .tlscreds = tlscreds, + .do_negotiation = do_negotiation, + + .initial_info.request_sizes = true, + .initial_info.structured_reply = true, + .initial_info.base_allocation = true, + .initial_info.x_dirty_bitmap = g_strdup(x_dirty_bitmap), + .initial_info.name = g_strdup(export_name ?: "") + }; + + qemu_mutex_init(&conn->mutex); + + return conn; +} + +static void nbd_client_connection_do_free(NBDClientConnection *conn) +{ + if (conn->sioc) { + qio_channel_close(QIO_CHANNEL(conn->sioc), NULL); + object_unref(OBJECT(conn->sioc)); + } + error_free(conn->err); + qapi_free_SocketAddress(conn->saddr); + object_unref(OBJECT(conn->tlscreds)); + g_free(conn->initial_info.x_dirty_bitmap); + g_free(conn->initial_info.name); + g_free(conn); +} + +/* + * Connect to @addr and do NBD negotiation if @info is not null. If @tlscreds + * are given @outioc is returned. @outioc is provided only on success. The call + * may be cancelled from other thread by simply qio_channel_shutdown(sioc). + */ +static int nbd_connect(QIOChannelSocket *sioc, SocketAddress *addr, + NBDExportInfo *info, QCryptoTLSCreds *tlscreds, + QIOChannel **outioc, Error **errp) +{ + int ret; + + if (outioc) { + *outioc = NULL; + } + + ret = qio_channel_socket_connect_sync(sioc, addr, errp); + if (ret < 0) { + return ret; + } + + qio_channel_set_delay(QIO_CHANNEL(sioc), false); + + if (!info) { + return 0; + } + + ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), tlscreds, + tlscreds ? addr->u.inet.host : NULL, + outioc, info, errp); + if (ret < 0) { + /* + * nbd_receive_negotiate() may setup tls ioc and return it even on + * failure path. In this case we should use it instead of original + * channel. + */ + if (outioc && *outioc) { + qio_channel_close(QIO_CHANNEL(*outioc), NULL); + object_unref(OBJECT(*outioc)); + *outioc = NULL; + } else { + qio_channel_close(QIO_CHANNEL(sioc), NULL); + } + + return ret; + } + + return 0; +} + +static void *connect_thread_func(void *opaque) +{ + NBDClientConnection *conn = opaque; + int ret; + bool do_free; + uint64_t timeout = 1; + uint64_t max_timeout = 16; + + qemu_mutex_lock(&conn->mutex); + while (!conn->detached) { + assert(!conn->sioc); + conn->sioc = qio_channel_socket_new(); + + qemu_mutex_unlock(&conn->mutex); + + error_free(conn->err); + conn->err = NULL; + conn->updated_info = conn->initial_info; + + ret = nbd_connect(conn->sioc, conn->saddr, + conn->do_negotiation ? &conn->updated_info : NULL, + conn->tlscreds, &conn->ioc, &conn->err); + + /* + * conn->updated_info will finally be returned to the user. Clear the + * pointers to our internally allocated strings, which are IN parameters + * of nbd_receive_negotiate() and therefore nbd_connect(). Caller + * shoudn't be interested in these fields. + */ + conn->updated_info.x_dirty_bitmap = NULL; + conn->updated_info.name = NULL; + + qemu_mutex_lock(&conn->mutex); + + if (ret < 0) { + object_unref(OBJECT(conn->sioc)); + conn->sioc = NULL; + if (conn->do_retry && !conn->detached) { + qemu_mutex_unlock(&conn->mutex); + + sleep(timeout); + if (timeout < max_timeout) { + timeout *= 2; + } + + qemu_mutex_lock(&conn->mutex); + continue; + } + } + + break; + } + + /* mutex is locked */ + + assert(conn->running); + conn->running = false; + if (conn->wait_co) { + aio_co_wake(conn->wait_co); + conn->wait_co = NULL; + } + do_free = conn->detached; + + qemu_mutex_unlock(&conn->mutex); + + if (do_free) { + nbd_client_connection_do_free(conn); + } + + return NULL; +} + +void nbd_client_connection_release(NBDClientConnection *conn) +{ + bool do_free = false; + + if (!conn) { + return; + } + + WITH_QEMU_LOCK_GUARD(&conn->mutex) { + assert(!conn->detached); + if (conn->running) { + conn->detached = true; + } else { + do_free = true; + } + if (conn->sioc) { + qio_channel_shutdown(QIO_CHANNEL(conn->sioc), + QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + } + } + + if (do_free) { + nbd_client_connection_do_free(conn); + } +} + +/* + * Get a new connection in context of @conn: + * if the thread is running, wait for completion + * if the thread already succeeded in the background, and user didn't get the + * result, just return it now + * otherwise the thread is not running, so start a thread and wait for + * completion + * + * If @blocking is false, don't wait for the thread, return immediately. + * + * If @info is not NULL, also do nbd-negotiation after successful connection. + * In this case info is used only as out parameter, and is fully initialized by + * nbd_co_establish_connection(). "IN" fields of info as well as related only to + * nbd_receive_export_list() would be zero (see description of NBDExportInfo in + * include/block/nbd.h). + */ +QIOChannel *coroutine_fn +nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info, + bool blocking, Error **errp) +{ + QemuThread thread; + + if (conn->do_negotiation) { + assert(info); + } + + WITH_QEMU_LOCK_GUARD(&conn->mutex) { + /* + * Don't call nbd_co_establish_connection() in several coroutines in + * parallel. Only one call at once is supported. + */ + assert(!conn->wait_co); + + if (!conn->running) { + if (conn->sioc) { + /* Previous attempt finally succeeded in background */ + if (conn->do_negotiation) { + memcpy(info, &conn->updated_info, sizeof(*info)); + if (conn->ioc) { + /* TLS channel now has own reference to parent */ + object_unref(OBJECT(conn->sioc)); + conn->sioc = NULL; + + return g_steal_pointer(&conn->ioc); + } + } + + assert(!conn->ioc); + + return QIO_CHANNEL(g_steal_pointer(&conn->sioc)); + } + + conn->running = true; + error_free(conn->err); + conn->err = NULL; + qemu_thread_create(&thread, "nbd-connect", + connect_thread_func, conn, QEMU_THREAD_DETACHED); + } + + if (!blocking) { + return NULL; + } + + conn->wait_co = qemu_coroutine_self(); + } + + /* + * We are going to wait for connect-thread finish, but + * nbd_co_establish_connection_cancel() can interrupt. + */ + qemu_coroutine_yield(); + + WITH_QEMU_LOCK_GUARD(&conn->mutex) { + if (conn->running) { + /* + * The connection attempt was canceled and the coroutine resumed + * before the connection thread finished its job. Report the + * attempt as failed, but leave the connection thread running, + * to reuse it for the next connection attempt. + */ + error_setg(errp, "Connection attempt cancelled by other operation"); + return NULL; + } else { + error_propagate(errp, conn->err); + conn->err = NULL; + if (!conn->sioc) { + return NULL; + } + if (conn->do_negotiation) { + memcpy(info, &conn->updated_info, sizeof(*info)); + if (conn->ioc) { + /* TLS channel now has own reference to parent */ + object_unref(OBJECT(conn->sioc)); + conn->sioc = NULL; + + return g_steal_pointer(&conn->ioc); + } + } + + assert(!conn->ioc); + + return QIO_CHANNEL(g_steal_pointer(&conn->sioc)); + } + } + + abort(); /* unreachable */ +} + +/* + * nbd_co_establish_connection_cancel + * Cancel nbd_co_establish_connection() asynchronously. + * + * Note that this function neither directly stops the thread nor closes the + * socket, but rather safely wakes nbd_co_establish_connection() which is + * sleeping in yield() + */ +void nbd_co_establish_connection_cancel(NBDClientConnection *conn) +{ + Coroutine *wait_co; + + WITH_QEMU_LOCK_GUARD(&conn->mutex) { + wait_co = g_steal_pointer(&conn->wait_co); + } + + if (wait_co) { + aio_co_wake(wait_co); + } +} diff --git a/nbd/meson.build b/nbd/meson.build index 2baaa36948..b26d70565e 100644 --- a/nbd/meson.build +++ b/nbd/meson.build @@ -1,5 +1,6 @@ block_ss.add(files( 'client.c', + 'client-connection.c', 'common.c', )) blockdev_ss.add(files( diff --git a/scripts/block-coroutine-wrapper.py b/scripts/block-coroutine-wrapper.py index 0461fd1c45..85dbeb9ecf 100644 --- a/scripts/block-coroutine-wrapper.py +++ b/scripts/block-coroutine-wrapper.py @@ -98,12 +98,13 @@ def snake_to_camel(func_name: str) -> str: def gen_wrapper(func: FuncDecl) -> str: - assert func.name.startswith('bdrv_') - assert not func.name.startswith('bdrv_co_') + assert not '_co_' in func.name assert func.return_type == 'int' assert func.args[0].type in ['BlockDriverState *', 'BdrvChild *'] - name = 'bdrv_co_' + func.name[5:] + subsystem, subname = func.name.split('_', 1) + + name = f'{subsystem}_co_{subname}' bs = 'bs' if func.args[0].type == 'BlockDriverState *' else 'child->bs' struct_name = snake_to_camel(name) diff --git a/stubs/iothread-lock.c b/stubs/iothread-lock.c index 2a6efad64a..5b45b7fc8b 100644 --- a/stubs/iothread-lock.c +++ b/stubs/iothread-lock.c @@ -3,7 +3,7 @@ bool qemu_mutex_iothread_locked(void) { - return true; + return false; } void qemu_mutex_lock_iothread_impl(const char *file, int line) diff --git a/stubs/iothread.c b/stubs/iothread.c deleted file mode 100644 index 8cc9e28c55..0000000000 --- a/stubs/iothread.c +++ /dev/null @@ -1,8 +0,0 @@ -#include "qemu/osdep.h" -#include "block/aio.h" -#include "qemu/main-loop.h" - -AioContext *qemu_get_current_aio_context(void) -{ - return qemu_get_aio_context(); -} diff --git a/stubs/meson.build b/stubs/meson.build index d4e9549dc9..2e79ff9f4d 100644 --- a/stubs/meson.build +++ b/stubs/meson.build @@ -16,7 +16,6 @@ stub_ss.add(files('fw_cfg.c')) stub_ss.add(files('gdbstub.c')) stub_ss.add(files('get-vm-name.c')) stub_ss.add(when: 'CONFIG_LINUX_IO_URING', if_true: files('io_uring.c')) -stub_ss.add(files('iothread.c')) stub_ss.add(files('iothread-lock.c')) stub_ss.add(files('isa-bus.c')) stub_ss.add(files('is-daemonized.c')) diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c index afde12b4ef..f9b0791084 100644 --- a/tests/unit/iothread.c +++ b/tests/unit/iothread.c @@ -30,13 +30,6 @@ struct IOThread { bool stopping; }; -static __thread IOThread *my_iothread; - -AioContext *qemu_get_current_aio_context(void) -{ - return my_iothread ? my_iothread->ctx : qemu_get_aio_context(); -} - static void iothread_init_gcontext(IOThread *iothread) { GSource *source; @@ -54,9 +47,9 @@ static void *iothread_run(void *opaque) rcu_register_thread(); - my_iothread = iothread; qemu_mutex_lock(&iothread->init_done_lock); iothread->ctx = aio_context_new(&error_abort); + qemu_set_current_aio_context(iothread->ctx); /* * We must connect the ctx to a GMainContext, because in older versions diff --git a/tests/unit/test-aio.c b/tests/unit/test-aio.c index 8a46078463..6feeb9a4a9 100644 --- a/tests/unit/test-aio.c +++ b/tests/unit/test-aio.c @@ -877,6 +877,42 @@ static void test_queue_chaining(void) g_assert_cmpint(data_b.i, ==, data_b.max); } +static void co_check_current_thread(void *opaque) +{ + QemuThread *main_thread = opaque; + assert(qemu_thread_is_self(main_thread)); +} + +static void *test_aio_co_enter(void *co) +{ + /* + * qemu_get_current_aio_context() should not to be the main thread + * AioContext, because this is a worker thread that has not taken + * the BQL. So aio_co_enter will schedule the coroutine in the + * main thread AioContext. + */ + aio_co_enter(qemu_get_aio_context(), co); + return NULL; +} + +static void test_worker_thread_co_enter(void) +{ + QemuThread this_thread, worker_thread; + Coroutine *co; + + qemu_thread_get_self(&this_thread); + co = qemu_coroutine_create(co_check_current_thread, &this_thread); + + qemu_thread_create(&worker_thread, "test_acquire_thread", + test_aio_co_enter, + co, QEMU_THREAD_JOINABLE); + + /* Test aio_co_enter from a worker thread. */ + qemu_thread_join(&worker_thread); + g_assert(aio_poll(ctx, true)); + g_assert(!aio_poll(ctx, false)); +} + /* End of tests. */ int main(int argc, char **argv) @@ -903,6 +939,7 @@ int main(int argc, char **argv) g_test_add_func("/aio/timer/schedule", test_timer_schedule); g_test_add_func("/aio/coroutine/queue-chaining", test_queue_chaining); + g_test_add_func("/aio/coroutine/worker-thread-co-enter", test_worker_thread_co_enter); g_test_add_func("/aio-gsource/flush", test_source_flush); g_test_add_func("/aio-gsource/bh/schedule", test_source_bh_schedule); diff --git a/util/async.c b/util/async.c index 674dbefb7c..5d9b7cc1eb 100644 --- a/util/async.c +++ b/util/async.c @@ -649,3 +649,23 @@ void aio_context_release(AioContext *ctx) { qemu_rec_mutex_unlock(&ctx->lock); } + +static __thread AioContext *my_aiocontext; + +AioContext *qemu_get_current_aio_context(void) +{ + if (my_aiocontext) { + return my_aiocontext; + } + if (qemu_mutex_iothread_locked()) { + /* Possibly in a vCPU thread. */ + return qemu_get_aio_context(); + } + return NULL; +} + +void qemu_set_current_aio_context(AioContext *ctx) +{ + assert(!my_aiocontext); + my_aiocontext = ctx; +} diff --git a/util/main-loop.c b/util/main-loop.c index d9c55df6f5..4ae5b23e99 100644 --- a/util/main-loop.c +++ b/util/main-loop.c @@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp) if (!qemu_aio_context) { return -EMFILE; } + qemu_set_current_aio_context(qemu_aio_context); qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL); gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); src = aio_get_g_source(qemu_aio_context); diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c index c415c342c1..080a240b74 100644 --- a/util/qemu-sockets.c +++ b/util/qemu-sockets.c @@ -1164,6 +1164,25 @@ static int socket_get_fd(const char *fdstr, Error **errp) return fd; } +int socket_address_parse_named_fd(SocketAddress *addr, Error **errp) +{ + int fd; + + if (addr->type != SOCKET_ADDRESS_TYPE_FD) { + return 0; + } + + fd = socket_get_fd(addr->u.fd.str, errp); + if (fd < 0) { + return fd; + } + + g_free(addr->u.fd.str); + addr->u.fd.str = g_strdup_printf("%d", fd); + + return 0; +} + int socket_connect(SocketAddress *addr, Error **errp) { int fd;