nbd patches for 2021-06-15

- bug fixes in coroutine aio context handling
 - rework NBD client connection logic to perform more work in coroutine
 -----BEGIN PGP SIGNATURE-----
 
 iQEzBAABCAAdFiEEccLMIrHEYCkn0vOqp6FrSiUnQ2oFAmDM2AMACgkQp6FrSiUn
 Q2rUTAf/bfDbe4YrB/1yS4dTnrOW87PNO6FD9Ofy/hhsgoItmcP5PJuDuoigjwKQ
 sU+wAVGEIxeuYtkdfyi29QPzNu8YPUZ0XNooCMvff0u+qrenVJywLAloRnMWdvIo
 WyR7/JzWGcClBdO9aLZ6ntM9MznavBZ5kByQ+nBkcAl7aB0uyUg5n8GP+rZCF4JN
 fOUykVxi6X1Zjgw/eecUkj97sKxBaj9WHCXANdpoS9nEFU0MXHyjUqCmU9ubUluX
 tdu/N3NbqHCRTchpdk3aD7/xST8ZtACQE+nJ1xo9nf3F8kuafEX67XwdaHzQzZP3
 z8QE21fGacd6EUY+lqw9HYaihnB6eQ==
 =RHlj
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/ericb/tags/pull-nbd-2021-06-15-v2' into staging

nbd patches for 2021-06-15

- bug fixes in coroutine aio context handling
- rework NBD client connection logic to perform more work in coroutine

# gpg: Signature made Fri 18 Jun 2021 18:29:39 BST
# gpg:                using RSA key 71C2CC22B1C4602927D2F3AAA7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg:                 aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg:                 aka "[jpeg image of size 6874]" [full]
# Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2  F3AA A7A1 6B4A 2527 436A

* remotes/ericb/tags/pull-nbd-2021-06-15-v2: (34 commits)
  block/nbd: safer transition to receiving request
  block/nbd: add nbd_client_connected() helper
  block/nbd: reuse nbd_co_do_establish_connection() in nbd_open()
  nbd/client-connection: add option for non-blocking connection attempt
  block/nbd: split nbd_co_do_establish_connection out of nbd_reconnect_attempt
  block-coroutine-wrapper: allow non bdrv_ prefix
  nbd/client-connection: return only one io channel
  block/nbd: drop BDRVNBDState::sioc
  block/nbd: don't touch s->sioc in nbd_teardown_connection()
  block/nbd: use negotiation of NBDClientConnection
  block/nbd: split nbd_handle_updated_info out of nbd_client_handshake()
  nbd/client-connection: shutdown connection on release
  nbd/client-connection: implement connection retry
  nbd/client-connection: add possibility of negotiation
  nbd/client-connection: use QEMU_LOCK_GUARD
  nbd: move connection code from block/nbd to nbd/client-connection
  block/nbd: introduce nbd_client_connection_release()
  block/nbd: introduce nbd_client_connection_new()
  block/nbd: rename NBDConnectThread to NBDClientConnection
  block/nbd: make nbd_co_establish_connection_cancel() bs-independent
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2021-06-20 21:20:13 +01:00
commit e4bfa6cd68
18 changed files with 643 additions and 492 deletions

View File

@ -66,4 +66,10 @@ int coroutine_fn bdrv_co_readv_vmstate(BlockDriverState *bs,
int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
QEMUIOVector *qiov, int64_t pos);
int generated_co_wrapper
nbd_do_establish_connection(BlockDriverState *bs, Error **errp);
int coroutine_fn
nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
#endif /* BLOCK_COROUTINES_INT_H */

View File

@ -44,6 +44,7 @@
#include "block/qdict.h"
#include "block/nbd.h"
#include "block/block_int.h"
#include "block/coroutines.h"
#include "qemu/yank.h"
@ -66,50 +67,8 @@ typedef enum NBDClientState {
NBD_CLIENT_QUIT
} NBDClientState;
typedef enum NBDConnectThreadState {
/* No thread, no pending results */
CONNECT_THREAD_NONE,
/* Thread is running, no results for now */
CONNECT_THREAD_RUNNING,
/*
* Thread is running, but requestor exited. Thread should close
* the new socket and free the connect state on exit.
*/
CONNECT_THREAD_RUNNING_DETACHED,
/* Thread finished, results are stored in a state */
CONNECT_THREAD_FAIL,
CONNECT_THREAD_SUCCESS
} NBDConnectThreadState;
typedef struct NBDConnectThread {
/* Initialization constants */
SocketAddress *saddr; /* address to connect to */
/*
* Bottom half to schedule on completion. Scheduled only if bh_ctx is not
* NULL
*/
QEMUBHFunc *bh_func;
void *bh_opaque;
/*
* Result of last attempt. Valid in FAIL and SUCCESS states.
* If you want to steal error, don't forget to set pointer to NULL.
*/
QIOChannelSocket *sioc;
Error *err;
/* state and bh_ctx are protected by mutex */
QemuMutex mutex;
NBDConnectThreadState state; /* current state of the thread */
AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
} NBDConnectThread;
typedef struct BDRVNBDState {
QIOChannelSocket *sioc; /* The master data channel */
QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
QIOChannel *ioc; /* The current I/O channel */
NBDExportInfo info;
CoMutex send_mutex;
@ -121,8 +80,6 @@ typedef struct BDRVNBDState {
bool wait_drained_end;
int in_flight;
NBDClientState state;
int connect_status;
Error *connect_err;
bool wait_in_flight;
QEMUTimer *reconnect_delay_timer;
@ -140,20 +97,20 @@ typedef struct BDRVNBDState {
char *x_dirty_bitmap;
bool alloc_depth;
bool wait_connect;
NBDConnectThread *connect_thread;
NBDClientConnection *conn;
} BDRVNBDState;
static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
Error **errp);
static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp);
static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
bool detach);
static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
static void nbd_yank(void *opaque);
static void nbd_clear_bdrvstate(BDRVNBDState *s)
static void nbd_clear_bdrvstate(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
nbd_client_connection_release(s->conn);
s->conn = NULL;
yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
object_unref(OBJECT(s->tlscreds));
qapi_free_SocketAddress(s->saddr);
s->saddr = NULL;
@ -165,15 +122,20 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s)
s->x_dirty_bitmap = NULL;
}
static bool nbd_client_connected(BDRVNBDState *s)
{
return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
}
static void nbd_channel_error(BDRVNBDState *s, int ret)
{
if (ret == -EIO) {
if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
if (nbd_client_connected(s)) {
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
NBD_CLIENT_CONNECTING_NOWAIT;
}
} else {
if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
if (nbd_client_connected(s)) {
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
s->state = NBD_CLIENT_QUIT;
@ -188,6 +150,7 @@ static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
NBDClientRequest *req = &s->requests[i];
if (req->coroutine && req->receiving) {
req->receiving = false;
aio_co_wake(req->coroutine);
}
}
@ -271,7 +234,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
* s->connection_co is either yielded from nbd_receive_reply or from
* nbd_co_reconnect_loop()
*/
if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
if (nbd_client_connected(s)) {
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
}
@ -291,7 +254,7 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
s->drained = true;
qemu_co_sleep_wake(&s->reconnect_sleep);
nbd_co_establish_connection_cancel(bs, false);
nbd_co_establish_connection_cancel(s->conn);
reconnect_delay_timer_del(s);
@ -320,16 +283,12 @@ static void nbd_teardown_connection(BlockDriverState *bs)
if (s->ioc) {
/* finish any pending coroutines */
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
} else if (s->sioc) {
/* abort negotiation */
qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
}
s->state = NBD_CLIENT_QUIT;
if (s->connection_co) {
qemu_co_sleep_wake(&s->reconnect_sleep);
nbd_co_establish_connection_cancel(bs, true);
nbd_co_establish_connection_cancel(s->conn);
}
if (qemu_in_coroutine()) {
s->teardown_co = qemu_coroutine_self();
@ -354,239 +313,95 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
}
static void connect_bh(void *opaque)
{
BDRVNBDState *state = opaque;
assert(state->wait_connect);
state->wait_connect = false;
aio_co_wake(state->connection_co);
}
static void nbd_init_connect_thread(BDRVNBDState *s)
{
s->connect_thread = g_new(NBDConnectThread, 1);
*s->connect_thread = (NBDConnectThread) {
.saddr = QAPI_CLONE(SocketAddress, s->saddr),
.state = CONNECT_THREAD_NONE,
.bh_func = connect_bh,
.bh_opaque = s,
};
qemu_mutex_init(&s->connect_thread->mutex);
}
static void nbd_free_connect_thread(NBDConnectThread *thr)
{
if (thr->sioc) {
qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
}
error_free(thr->err);
qapi_free_SocketAddress(thr->saddr);
g_free(thr);
}
static void *connect_thread_func(void *opaque)
{
NBDConnectThread *thr = opaque;
int ret;
bool do_free = false;
thr->sioc = qio_channel_socket_new();
error_free(thr->err);
thr->err = NULL;
ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
if (ret < 0) {
object_unref(OBJECT(thr->sioc));
thr->sioc = NULL;
}
qemu_mutex_lock(&thr->mutex);
switch (thr->state) {
case CONNECT_THREAD_RUNNING:
thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
if (thr->bh_ctx) {
aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
/* play safe, don't reuse bh_ctx on further connection attempts */
thr->bh_ctx = NULL;
}
break;
case CONNECT_THREAD_RUNNING_DETACHED:
do_free = true;
break;
default:
abort();
}
qemu_mutex_unlock(&thr->mutex);
if (do_free) {
nbd_free_connect_thread(thr);
}
return NULL;
}
static int coroutine_fn
nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
{
int ret;
QemuThread thread;
BDRVNBDState *s = bs->opaque;
NBDConnectThread *thr = s->connect_thread;
if (!thr) {
/* detached */
return -1;
}
qemu_mutex_lock(&thr->mutex);
switch (thr->state) {
case CONNECT_THREAD_FAIL:
case CONNECT_THREAD_NONE:
error_free(thr->err);
thr->err = NULL;
thr->state = CONNECT_THREAD_RUNNING;
qemu_thread_create(&thread, "nbd-connect",
connect_thread_func, thr, QEMU_THREAD_DETACHED);
break;
case CONNECT_THREAD_SUCCESS:
/* Previous attempt finally succeeded in background */
thr->state = CONNECT_THREAD_NONE;
s->sioc = thr->sioc;
thr->sioc = NULL;
yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
nbd_yank, bs);
qemu_mutex_unlock(&thr->mutex);
return 0;
case CONNECT_THREAD_RUNNING:
/* Already running, will wait */
break;
default:
abort();
}
thr->bh_ctx = qemu_get_current_aio_context();
qemu_mutex_unlock(&thr->mutex);
/*
* We are going to wait for connect-thread finish, but
* nbd_client_co_drain_begin() can interrupt.
*
* Note that wait_connect variable is not visible for connect-thread. It
* doesn't need mutex protection, it used only inside home aio context of
* bs.
*/
s->wait_connect = true;
qemu_coroutine_yield();
if (!s->connect_thread) {
/* detached */
return -1;
}
assert(thr == s->connect_thread);
qemu_mutex_lock(&thr->mutex);
switch (thr->state) {
case CONNECT_THREAD_SUCCESS:
case CONNECT_THREAD_FAIL:
thr->state = CONNECT_THREAD_NONE;
error_propagate(errp, thr->err);
thr->err = NULL;
s->sioc = thr->sioc;
thr->sioc = NULL;
if (s->sioc) {
yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
nbd_yank, bs);
}
ret = (s->sioc ? 0 : -1);
break;
case CONNECT_THREAD_RUNNING:
case CONNECT_THREAD_RUNNING_DETACHED:
/*
* Obviously, drained section wants to start. Report the attempt as
* failed. Still connect thread is executing in background, and its
* result may be used for next connection attempt.
*/
ret = -1;
error_setg(errp, "Connection attempt cancelled by other operation");
break;
case CONNECT_THREAD_NONE:
/*
* Impossible. We've seen this thread running. So it should be
* running or at least give some results.
*/
abort();
default:
abort();
}
qemu_mutex_unlock(&thr->mutex);
return ret;
}
/*
* nbd_co_establish_connection_cancel
* Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
* allow drained section to begin.
*
* If detach is true, also cleanup the state (or if thread is running, move it
* to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
* detach is true.
* Update @bs with information learned during a completed negotiation process.
* Return failure if the server's advertised options are incompatible with the
* client's needs.
*/
static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
bool detach)
static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
{
BDRVNBDState *s = bs->opaque;
NBDConnectThread *thr = s->connect_thread;
bool wake = false;
bool do_free = false;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int ret;
qemu_mutex_lock(&thr->mutex);
if (thr->state == CONNECT_THREAD_RUNNING) {
/* We can cancel only in running state, when bh is not yet scheduled */
thr->bh_ctx = NULL;
if (s->wait_connect) {
s->wait_connect = false;
wake = true;
if (s->x_dirty_bitmap) {
if (!s->info.base_allocation) {
error_setg(errp, "requested x-dirty-bitmap %s not found",
s->x_dirty_bitmap);
return -EINVAL;
}
if (detach) {
thr->state = CONNECT_THREAD_RUNNING_DETACHED;
s->connect_thread = NULL;
if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
s->alloc_depth = true;
}
} else if (detach) {
do_free = true;
}
qemu_mutex_unlock(&thr->mutex);
if (do_free) {
nbd_free_connect_thread(thr);
s->connect_thread = NULL;
if (s->info.flags & NBD_FLAG_READ_ONLY) {
ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
if (ret < 0) {
return ret;
}
}
if (wake) {
aio_co_wake(s->connection_co);
if (s->info.flags & NBD_FLAG_SEND_FUA) {
bs->supported_write_flags = BDRV_REQ_FUA;
bs->supported_zero_flags |= BDRV_REQ_FUA;
}
if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
}
}
trace_nbd_client_handshake_success(s->export);
return 0;
}
int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
Error **errp)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int ret;
assert(!s->ioc);
s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
if (!s->ioc) {
return -ECONNREFUSED;
}
ret = nbd_handle_updated_info(s->bs, NULL);
if (ret < 0) {
/*
* We have connected, but must fail for other reasons.
* Send NBD_CMD_DISC as a courtesy to the server.
*/
NBDRequest request = { .type = NBD_CMD_DISC };
nbd_send_request(s->ioc, &request);
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
return ret;
}
qio_channel_set_blocking(s->ioc, false, NULL);
qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
bs);
/* successfully connected */
s->state = NBD_CLIENT_CONNECTED;
qemu_co_queue_restart_all(&s->free_sema);
return 0;
}
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
int ret;
Error *local_err = NULL;
if (!nbd_client_connecting(s)) {
return;
}
@ -620,44 +435,11 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
if (nbd_co_establish_connection(s->bs, &local_err) < 0) {
ret = -ECONNREFUSED;
goto out;
}
bdrv_dec_in_flight(s->bs);
ret = nbd_client_handshake(s->bs, &local_err);
if (s->drained) {
s->wait_drained_end = true;
while (s->drained) {
/*
* We may be entered once from nbd_client_attach_aio_context_bh
* and then from nbd_client_co_drain_end. So here is a loop.
*/
qemu_coroutine_yield();
}
}
bdrv_inc_in_flight(s->bs);
out:
s->connect_status = ret;
error_free(s->connect_err);
s->connect_err = NULL;
error_propagate(&s->connect_err, local_err);
if (ret >= 0) {
/* successfully connected */
s->state = NBD_CLIENT_CONNECTED;
qemu_co_queue_restart_all(&s->free_sema);
}
nbd_co_do_establish_connection(s->bs, NULL);
}
static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
@ -723,7 +505,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
nbd_co_reconnect_loop(s);
}
if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
if (!nbd_client_connected(s)) {
continue;
}
@ -767,6 +549,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
* connection_co happens through a bottom half, which can only
* run after we yield.
*/
s->requests[i].receiving = false;
aio_co_wake(s->requests[i].coroutine);
qemu_coroutine_yield();
}
@ -780,8 +563,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
@ -804,7 +585,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
}
if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
if (!nbd_client_connected(s)) {
rc = -EIO;
goto err;
}
@ -831,8 +612,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED &&
rc >= 0) {
if (nbd_client_connected(s) && rc >= 0) {
if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
NULL) < 0) {
rc = -EIO;
@ -1156,8 +936,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
/* Wait until we're woken up by nbd_connection_entry. */
s->requests[i].receiving = true;
qemu_coroutine_yield();
s->requests[i].receiving = false;
if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
assert(!s->requests[i].receiving);
if (!nbd_client_connected(s)) {
error_setg(errp, "Connection closed");
return -EIO;
}
@ -1316,7 +1096,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
NBDReply local_reply;
NBDStructuredReplyChunk *chunk;
Error *local_err = NULL;
if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
if (!nbd_client_connected(s)) {
error_setg(&local_err, "Connection closed");
nbd_iter_channel_error(iter, -EIO, &local_err);
goto break_loop;
@ -1341,8 +1121,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
}
/* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
if (nbd_reply_is_simple(reply) ||
qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
goto break_loop;
}
@ -1780,7 +1559,7 @@ static void nbd_yank(void *opaque)
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
static void nbd_client_close(BlockDriverState *bs)
@ -1795,111 +1574,6 @@ static void nbd_client_close(BlockDriverState *bs)
nbd_teardown_connection(bs);
}
static int nbd_establish_connection(BlockDriverState *bs,
SocketAddress *saddr,
Error **errp)
{
ERRP_GUARD();
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->sioc = qio_channel_socket_new();
qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client");
qio_channel_socket_connect_sync(s->sioc, saddr, errp);
if (*errp) {
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return -1;
}
yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs);
qio_channel_set_delay(QIO_CHANNEL(s->sioc), false);
return 0;
}
/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */
static int nbd_client_handshake(BlockDriverState *bs, Error **errp)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
AioContext *aio_context = bdrv_get_aio_context(bs);
int ret;
trace_nbd_client_handshake(s->export);
qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL);
qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context);
s->info.request_sizes = true;
s->info.structured_reply = true;
s->info.base_allocation = true;
s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
s->info.name = g_strdup(s->export ?: "");
ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds,
s->hostname, &s->ioc, &s->info, errp);
g_free(s->info.x_dirty_bitmap);
g_free(s->info.name);
if (ret < 0) {
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
nbd_yank, bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return ret;
}
if (s->x_dirty_bitmap) {
if (!s->info.base_allocation) {
error_setg(errp, "requested x-dirty-bitmap %s not found",
s->x_dirty_bitmap);
ret = -EINVAL;
goto fail;
}
if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
s->alloc_depth = true;
}
}
if (s->info.flags & NBD_FLAG_READ_ONLY) {
ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
if (ret < 0) {
goto fail;
}
}
if (s->info.flags & NBD_FLAG_SEND_FUA) {
bs->supported_write_flags = BDRV_REQ_FUA;
bs->supported_zero_flags |= BDRV_REQ_FUA;
}
if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
}
}
if (!s->ioc) {
s->ioc = QIO_CHANNEL(s->sioc);
object_ref(OBJECT(s->ioc));
}
trace_nbd_client_handshake_success(s->export);
return 0;
fail:
/*
* We have connected, but must fail for other reasons.
* Send NBD_CMD_DISC as a courtesy to the server.
*/
{
NBDRequest request = { .type = NBD_CMD_DISC };
nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request);
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
nbd_yank, bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
return ret;
}
}
/*
* Parse nbd_open options
@ -2133,6 +1807,12 @@ static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
goto done;
}
if (socket_address_parse_named_fd(saddr, errp) < 0) {
qapi_free_SocketAddress(saddr);
saddr = NULL;
goto done;
}
done:
qobject_unref(addr);
visit_free(iv);
@ -2274,9 +1954,6 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
ret = 0;
error:
if (ret < 0) {
nbd_clear_bdrvstate(s);
}
qemu_opts_del(opts);
return ret;
}
@ -2287,11 +1964,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
int ret;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
ret = nbd_process_options(bs, options, errp);
if (ret < 0) {
return ret;
}
s->bs = bs;
qemu_co_mutex_init(&s->send_mutex);
qemu_co_queue_init(&s->free_sema);
@ -2300,31 +1972,29 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
return -EEXIST;
}
/*
* establish TCP connection, return error if it fails
* TODO: Configurable retry-until-timeout behaviour.
*/
if (nbd_establish_connection(bs, s->saddr, errp) < 0) {
yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
return -ECONNREFUSED;
}
ret = nbd_client_handshake(bs, errp);
ret = nbd_process_options(bs, options, errp);
if (ret < 0) {
yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
nbd_clear_bdrvstate(s);
return ret;
goto fail;
}
/* successfully connected */
s->state = NBD_CLIENT_CONNECTED;
nbd_init_connect_thread(s);
s->conn = nbd_client_connection_new(s->saddr, true, s->export,
s->x_dirty_bitmap, s->tlscreds);
/* TODO: Configurable retry-until-timeout behaviour. */
ret = nbd_do_establish_connection(bs, errp);
if (ret < 0) {
goto fail;
}
s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
bdrv_inc_in_flight(bs);
aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
return 0;
fail:
nbd_clear_bdrvstate(bs);
return ret;
}
static int nbd_co_flush(BlockDriverState *bs)
@ -2368,11 +2038,8 @@ static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
static void nbd_close(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
nbd_client_close(bs);
yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
nbd_clear_bdrvstate(s);
nbd_clear_bdrvstate(bs);
}
/*

View File

@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
* Return the AioContext whose event loop runs in the current thread.
*
* If called from an IOThread this will be the IOThread's AioContext. If
* called from another thread it will be the main loop AioContext.
* called from the main thread or with the "big QEMU lock" taken it
* will be the main loop AioContext.
*/
AioContext *qemu_get_current_aio_context(void);
void qemu_set_current_aio_context(AioContext *ctx);
/**
* aio_context_setup:
* @ctx: the aio context

View File

@ -406,4 +406,22 @@ const char *nbd_info_lookup(uint16_t info);
const char *nbd_cmd_lookup(uint16_t info);
const char *nbd_err_lookup(int err);
/* nbd/client-connection.c */
typedef struct NBDClientConnection NBDClientConnection;
void nbd_client_connection_enable_retry(NBDClientConnection *conn);
NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr,
bool do_negotiation,
const char *export_name,
const char *x_dirty_bitmap,
QCryptoTLSCreds *tlscreds);
void nbd_client_connection_release(NBDClientConnection *conn);
QIOChannel *coroutine_fn
nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
bool blocking, Error **errp);
void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
#endif

View File

@ -210,13 +210,15 @@ void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock);
/**
* Removes the next coroutine from the CoQueue, and wake it up.
* Returns true if a coroutine was removed, false if the queue is empty.
* OK to run from coroutine and non-coroutine context.
*/
bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
bool qemu_co_queue_next(CoQueue *queue);
/**
* Empties the CoQueue; all coroutines are woken up.
* OK to run from coroutine and non-coroutine context.
*/
void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
void qemu_co_queue_restart_all(CoQueue *queue);
/**
* Removes the next coroutine from the CoQueue, and wake it up. Unlike

View File

@ -111,4 +111,15 @@ SocketAddress *socket_remote_address(int fd, Error **errp);
*/
SocketAddress *socket_address_flatten(SocketAddressLegacy *addr);
/**
* socket_address_parse_named_fd:
*
* Modify @addr, replacing a named fd by its corresponding number.
* Needed for callers that plan to pass @addr to a context where the
* current monitor is not available.
*
* Return 0 on success.
*/
int socket_address_parse_named_fd(SocketAddress *addr, Error **errp);
#endif /* QEMU_SOCKETS_H */

View File

@ -39,13 +39,6 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
#define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
#endif
static __thread IOThread *my_iothread;
AioContext *qemu_get_current_aio_context(void)
{
return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
}
static void *iothread_run(void *opaque)
{
IOThread *iothread = opaque;
@ -56,7 +49,7 @@ static void *iothread_run(void *opaque)
* in this new thread uses glib.
*/
g_main_context_push_thread_default(iothread->worker_context);
my_iothread = iothread;
qemu_set_current_aio_context(iothread->ctx);
iothread->thread_id = qemu_get_thread_id();
qemu_sem_post(&iothread->init_done_sem);

388
nbd/client-connection.c Normal file
View File

@ -0,0 +1,388 @@
/*
* QEMU Block driver for NBD
*
* Copyright (c) 2021 Virtuozzo International GmbH.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu/osdep.h"
#include "block/nbd.h"
#include "qapi/qapi-visit-sockets.h"
#include "qapi/clone-visitor.h"
struct NBDClientConnection {
/* Initialization constants, never change */
SocketAddress *saddr; /* address to connect to */
QCryptoTLSCreds *tlscreds;
NBDExportInfo initial_info;
bool do_negotiation;
bool do_retry;
QemuMutex mutex;
/*
* @sioc and @err represent a connection attempt. While running
* is true, they are only used by the connection thread, and mutex
* locking is not needed. Once the thread finishes,
* nbd_co_establish_connection then steals these pointers while
* under the mutex.
*/
NBDExportInfo updated_info;
QIOChannelSocket *sioc;
QIOChannel *ioc;
Error *err;
/* All further fields are accessed only under mutex */
bool running; /* thread is running now */
bool detached; /* thread is detached and should cleanup the state */
/*
* wait_co: if non-NULL, which coroutine to wake in
* nbd_co_establish_connection() after yield()
*/
Coroutine *wait_co;
};
/*
* The function isn't protected by any mutex, only call it when the client
* connection attempt has not yet started.
*/
void nbd_client_connection_enable_retry(NBDClientConnection *conn)
{
conn->do_retry = true;
}
NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr,
bool do_negotiation,
const char *export_name,
const char *x_dirty_bitmap,
QCryptoTLSCreds *tlscreds)
{
NBDClientConnection *conn = g_new(NBDClientConnection, 1);
object_ref(OBJECT(tlscreds));
*conn = (NBDClientConnection) {
.saddr = QAPI_CLONE(SocketAddress, saddr),
.tlscreds = tlscreds,
.do_negotiation = do_negotiation,
.initial_info.request_sizes = true,
.initial_info.structured_reply = true,
.initial_info.base_allocation = true,
.initial_info.x_dirty_bitmap = g_strdup(x_dirty_bitmap),
.initial_info.name = g_strdup(export_name ?: "")
};
qemu_mutex_init(&conn->mutex);
return conn;
}
static void nbd_client_connection_do_free(NBDClientConnection *conn)
{
if (conn->sioc) {
qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
object_unref(OBJECT(conn->sioc));
}
error_free(conn->err);
qapi_free_SocketAddress(conn->saddr);
object_unref(OBJECT(conn->tlscreds));
g_free(conn->initial_info.x_dirty_bitmap);
g_free(conn->initial_info.name);
g_free(conn);
}
/*
* Connect to @addr and do NBD negotiation if @info is not null. If @tlscreds
* are given @outioc is returned. @outioc is provided only on success. The call
* may be cancelled from other thread by simply qio_channel_shutdown(sioc).
*/
static int nbd_connect(QIOChannelSocket *sioc, SocketAddress *addr,
NBDExportInfo *info, QCryptoTLSCreds *tlscreds,
QIOChannel **outioc, Error **errp)
{
int ret;
if (outioc) {
*outioc = NULL;
}
ret = qio_channel_socket_connect_sync(sioc, addr, errp);
if (ret < 0) {
return ret;
}
qio_channel_set_delay(QIO_CHANNEL(sioc), false);
if (!info) {
return 0;
}
ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), tlscreds,
tlscreds ? addr->u.inet.host : NULL,
outioc, info, errp);
if (ret < 0) {
/*
* nbd_receive_negotiate() may setup tls ioc and return it even on
* failure path. In this case we should use it instead of original
* channel.
*/
if (outioc && *outioc) {
qio_channel_close(QIO_CHANNEL(*outioc), NULL);
object_unref(OBJECT(*outioc));
*outioc = NULL;
} else {
qio_channel_close(QIO_CHANNEL(sioc), NULL);
}
return ret;
}
return 0;
}
static void *connect_thread_func(void *opaque)
{
NBDClientConnection *conn = opaque;
int ret;
bool do_free;
uint64_t timeout = 1;
uint64_t max_timeout = 16;
qemu_mutex_lock(&conn->mutex);
while (!conn->detached) {
assert(!conn->sioc);
conn->sioc = qio_channel_socket_new();
qemu_mutex_unlock(&conn->mutex);
error_free(conn->err);
conn->err = NULL;
conn->updated_info = conn->initial_info;
ret = nbd_connect(conn->sioc, conn->saddr,
conn->do_negotiation ? &conn->updated_info : NULL,
conn->tlscreds, &conn->ioc, &conn->err);
/*
* conn->updated_info will finally be returned to the user. Clear the
* pointers to our internally allocated strings, which are IN parameters
* of nbd_receive_negotiate() and therefore nbd_connect(). Caller
* shoudn't be interested in these fields.
*/
conn->updated_info.x_dirty_bitmap = NULL;
conn->updated_info.name = NULL;
qemu_mutex_lock(&conn->mutex);
if (ret < 0) {
object_unref(OBJECT(conn->sioc));
conn->sioc = NULL;
if (conn->do_retry && !conn->detached) {
qemu_mutex_unlock(&conn->mutex);
sleep(timeout);
if (timeout < max_timeout) {
timeout *= 2;
}
qemu_mutex_lock(&conn->mutex);
continue;
}
}
break;
}
/* mutex is locked */
assert(conn->running);
conn->running = false;
if (conn->wait_co) {
aio_co_wake(conn->wait_co);
conn->wait_co = NULL;
}
do_free = conn->detached;
qemu_mutex_unlock(&conn->mutex);
if (do_free) {
nbd_client_connection_do_free(conn);
}
return NULL;
}
void nbd_client_connection_release(NBDClientConnection *conn)
{
bool do_free = false;
if (!conn) {
return;
}
WITH_QEMU_LOCK_GUARD(&conn->mutex) {
assert(!conn->detached);
if (conn->running) {
conn->detached = true;
} else {
do_free = true;
}
if (conn->sioc) {
qio_channel_shutdown(QIO_CHANNEL(conn->sioc),
QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
}
if (do_free) {
nbd_client_connection_do_free(conn);
}
}
/*
* Get a new connection in context of @conn:
* if the thread is running, wait for completion
* if the thread already succeeded in the background, and user didn't get the
* result, just return it now
* otherwise the thread is not running, so start a thread and wait for
* completion
*
* If @blocking is false, don't wait for the thread, return immediately.
*
* If @info is not NULL, also do nbd-negotiation after successful connection.
* In this case info is used only as out parameter, and is fully initialized by
* nbd_co_establish_connection(). "IN" fields of info as well as related only to
* nbd_receive_export_list() would be zero (see description of NBDExportInfo in
* include/block/nbd.h).
*/
QIOChannel *coroutine_fn
nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
bool blocking, Error **errp)
{
QemuThread thread;
if (conn->do_negotiation) {
assert(info);
}
WITH_QEMU_LOCK_GUARD(&conn->mutex) {
/*
* Don't call nbd_co_establish_connection() in several coroutines in
* parallel. Only one call at once is supported.
*/
assert(!conn->wait_co);
if (!conn->running) {
if (conn->sioc) {
/* Previous attempt finally succeeded in background */
if (conn->do_negotiation) {
memcpy(info, &conn->updated_info, sizeof(*info));
if (conn->ioc) {
/* TLS channel now has own reference to parent */
object_unref(OBJECT(conn->sioc));
conn->sioc = NULL;
return g_steal_pointer(&conn->ioc);
}
}
assert(!conn->ioc);
return QIO_CHANNEL(g_steal_pointer(&conn->sioc));
}
conn->running = true;
error_free(conn->err);
conn->err = NULL;
qemu_thread_create(&thread, "nbd-connect",
connect_thread_func, conn, QEMU_THREAD_DETACHED);
}
if (!blocking) {
return NULL;
}
conn->wait_co = qemu_coroutine_self();
}
/*
* We are going to wait for connect-thread finish, but
* nbd_co_establish_connection_cancel() can interrupt.
*/
qemu_coroutine_yield();
WITH_QEMU_LOCK_GUARD(&conn->mutex) {
if (conn->running) {
/*
* The connection attempt was canceled and the coroutine resumed
* before the connection thread finished its job. Report the
* attempt as failed, but leave the connection thread running,
* to reuse it for the next connection attempt.
*/
error_setg(errp, "Connection attempt cancelled by other operation");
return NULL;
} else {
error_propagate(errp, conn->err);
conn->err = NULL;
if (!conn->sioc) {
return NULL;
}
if (conn->do_negotiation) {
memcpy(info, &conn->updated_info, sizeof(*info));
if (conn->ioc) {
/* TLS channel now has own reference to parent */
object_unref(OBJECT(conn->sioc));
conn->sioc = NULL;
return g_steal_pointer(&conn->ioc);
}
}
assert(!conn->ioc);
return QIO_CHANNEL(g_steal_pointer(&conn->sioc));
}
}
abort(); /* unreachable */
}
/*
* nbd_co_establish_connection_cancel
* Cancel nbd_co_establish_connection() asynchronously.
*
* Note that this function neither directly stops the thread nor closes the
* socket, but rather safely wakes nbd_co_establish_connection() which is
* sleeping in yield()
*/
void nbd_co_establish_connection_cancel(NBDClientConnection *conn)
{
Coroutine *wait_co;
WITH_QEMU_LOCK_GUARD(&conn->mutex) {
wait_co = g_steal_pointer(&conn->wait_co);
}
if (wait_co) {
aio_co_wake(wait_co);
}
}

View File

@ -1,5 +1,6 @@
block_ss.add(files(
'client.c',
'client-connection.c',
'common.c',
))
blockdev_ss.add(files(

View File

@ -98,12 +98,13 @@ def snake_to_camel(func_name: str) -> str:
def gen_wrapper(func: FuncDecl) -> str:
assert func.name.startswith('bdrv_')
assert not func.name.startswith('bdrv_co_')
assert not '_co_' in func.name
assert func.return_type == 'int'
assert func.args[0].type in ['BlockDriverState *', 'BdrvChild *']
name = 'bdrv_co_' + func.name[5:]
subsystem, subname = func.name.split('_', 1)
name = f'{subsystem}_co_{subname}'
bs = 'bs' if func.args[0].type == 'BlockDriverState *' else 'child->bs'
struct_name = snake_to_camel(name)

View File

@ -3,7 +3,7 @@
bool qemu_mutex_iothread_locked(void)
{
return true;
return false;
}
void qemu_mutex_lock_iothread_impl(const char *file, int line)

View File

@ -1,8 +0,0 @@
#include "qemu/osdep.h"
#include "block/aio.h"
#include "qemu/main-loop.h"
AioContext *qemu_get_current_aio_context(void)
{
return qemu_get_aio_context();
}

View File

@ -16,7 +16,6 @@ stub_ss.add(files('fw_cfg.c'))
stub_ss.add(files('gdbstub.c'))
stub_ss.add(files('get-vm-name.c'))
stub_ss.add(when: 'CONFIG_LINUX_IO_URING', if_true: files('io_uring.c'))
stub_ss.add(files('iothread.c'))
stub_ss.add(files('iothread-lock.c'))
stub_ss.add(files('isa-bus.c'))
stub_ss.add(files('is-daemonized.c'))

View File

@ -30,13 +30,6 @@ struct IOThread {
bool stopping;
};
static __thread IOThread *my_iothread;
AioContext *qemu_get_current_aio_context(void)
{
return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
}
static void iothread_init_gcontext(IOThread *iothread)
{
GSource *source;
@ -54,9 +47,9 @@ static void *iothread_run(void *opaque)
rcu_register_thread();
my_iothread = iothread;
qemu_mutex_lock(&iothread->init_done_lock);
iothread->ctx = aio_context_new(&error_abort);
qemu_set_current_aio_context(iothread->ctx);
/*
* We must connect the ctx to a GMainContext, because in older versions

View File

@ -877,6 +877,42 @@ static void test_queue_chaining(void)
g_assert_cmpint(data_b.i, ==, data_b.max);
}
static void co_check_current_thread(void *opaque)
{
QemuThread *main_thread = opaque;
assert(qemu_thread_is_self(main_thread));
}
static void *test_aio_co_enter(void *co)
{
/*
* qemu_get_current_aio_context() should not to be the main thread
* AioContext, because this is a worker thread that has not taken
* the BQL. So aio_co_enter will schedule the coroutine in the
* main thread AioContext.
*/
aio_co_enter(qemu_get_aio_context(), co);
return NULL;
}
static void test_worker_thread_co_enter(void)
{
QemuThread this_thread, worker_thread;
Coroutine *co;
qemu_thread_get_self(&this_thread);
co = qemu_coroutine_create(co_check_current_thread, &this_thread);
qemu_thread_create(&worker_thread, "test_acquire_thread",
test_aio_co_enter,
co, QEMU_THREAD_JOINABLE);
/* Test aio_co_enter from a worker thread. */
qemu_thread_join(&worker_thread);
g_assert(aio_poll(ctx, true));
g_assert(!aio_poll(ctx, false));
}
/* End of tests. */
int main(int argc, char **argv)
@ -903,6 +939,7 @@ int main(int argc, char **argv)
g_test_add_func("/aio/timer/schedule", test_timer_schedule);
g_test_add_func("/aio/coroutine/queue-chaining", test_queue_chaining);
g_test_add_func("/aio/coroutine/worker-thread-co-enter", test_worker_thread_co_enter);
g_test_add_func("/aio-gsource/flush", test_source_flush);
g_test_add_func("/aio-gsource/bh/schedule", test_source_bh_schedule);

View File

@ -649,3 +649,23 @@ void aio_context_release(AioContext *ctx)
{
qemu_rec_mutex_unlock(&ctx->lock);
}
static __thread AioContext *my_aiocontext;
AioContext *qemu_get_current_aio_context(void)
{
if (my_aiocontext) {
return my_aiocontext;
}
if (qemu_mutex_iothread_locked()) {
/* Possibly in a vCPU thread. */
return qemu_get_aio_context();
}
return NULL;
}
void qemu_set_current_aio_context(AioContext *ctx)
{
assert(!my_aiocontext);
my_aiocontext = ctx;
}

View File

@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
if (!qemu_aio_context) {
return -EMFILE;
}
qemu_set_current_aio_context(qemu_aio_context);
qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
src = aio_get_g_source(qemu_aio_context);

View File

@ -1164,6 +1164,25 @@ static int socket_get_fd(const char *fdstr, Error **errp)
return fd;
}
int socket_address_parse_named_fd(SocketAddress *addr, Error **errp)
{
int fd;
if (addr->type != SOCKET_ADDRESS_TYPE_FD) {
return 0;
}
fd = socket_get_fd(addr->u.fd.str, errp);
if (fd < 0) {
return fd;
}
g_free(addr->u.fd.str);
addr->u.fd.str = g_strdup_printf("%d", fd);
return 0;
}
int socket_connect(SocketAddress *addr, Error **errp)
{
int fd;