block/rbd: migrate from aio to coroutines

Signed-off-by: Peter Lieven <pl@kamp.de>
Reviewed-by: Ilya Dryomov <idryomov@gmail.com>
Message-Id: <20210702172356.11574-5-idryomov@gmail.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
This commit is contained in:
Peter Lieven 2021-07-02 19:23:54 +02:00 committed by Kevin Wolf
parent 6d9214189e
commit c3e5fac534
1 changed files with 90 additions and 162 deletions

View File

@ -78,22 +78,6 @@ typedef enum {
RBD_AIO_FLUSH RBD_AIO_FLUSH
} RBDAIOCmd; } RBDAIOCmd;
typedef struct RBDAIOCB {
BlockAIOCB common;
int64_t ret;
QEMUIOVector *qiov;
RBDAIOCmd cmd;
int error;
struct BDRVRBDState *s;
} RBDAIOCB;
typedef struct RADOSCB {
RBDAIOCB *acb;
struct BDRVRBDState *s;
int64_t size;
int64_t ret;
} RADOSCB;
typedef struct BDRVRBDState { typedef struct BDRVRBDState {
rados_t cluster; rados_t cluster;
rados_ioctx_t io_ctx; rados_ioctx_t io_ctx;
@ -105,6 +89,13 @@ typedef struct BDRVRBDState {
uint64_t object_size; uint64_t object_size;
} BDRVRBDState; } BDRVRBDState;
typedef struct RBDTask {
BlockDriverState *bs;
Coroutine *co;
bool complete;
int64_t ret;
} RBDTask;
static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
BlockdevOptionsRbd *opts, bool cache, BlockdevOptionsRbd *opts, bool cache,
const char *keypairs, const char *secretid, const char *keypairs, const char *secretid,
@ -337,13 +328,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
return ret; return ret;
} }
static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
{
RBDAIOCB *acb = rcb->acb;
iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
acb->qiov->size - offs);
}
#ifdef LIBRBD_SUPPORTS_ENCRYPTION #ifdef LIBRBD_SUPPORTS_ENCRYPTION
static int qemu_rbd_convert_luks_options( static int qemu_rbd_convert_luks_options(
RbdEncryptionOptionsLUKSBase *luks_opts, RbdEncryptionOptionsLUKSBase *luks_opts,
@ -733,46 +717,6 @@ exit:
return ret; return ret;
} }
/*
* This aio completion is being called from rbd_finish_bh() and runs in qemu
* BH context.
*/
static void qemu_rbd_complete_aio(RADOSCB *rcb)
{
RBDAIOCB *acb = rcb->acb;
int64_t r;
r = rcb->ret;
if (acb->cmd != RBD_AIO_READ) {
if (r < 0) {
acb->ret = r;
acb->error = 1;
} else if (!acb->error) {
acb->ret = rcb->size;
}
} else {
if (r < 0) {
qemu_rbd_memset(rcb, 0);
acb->ret = r;
acb->error = 1;
} else if (r < rcb->size) {
qemu_rbd_memset(rcb, r);
if (!acb->error) {
acb->ret = rcb->size;
}
} else if (!acb->error) {
acb->ret = r;
}
}
g_free(rcb);
acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
qemu_aio_unref(acb);
}
static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp) static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
{ {
const char **vals; const char **vals;
@ -1122,89 +1066,59 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
return 0; return 0;
} }
static const AIOCBInfo rbd_aiocb_info = { static void qemu_rbd_finish_bh(void *opaque)
.aiocb_size = sizeof(RBDAIOCB),
};
static void rbd_finish_bh(void *opaque)
{ {
RADOSCB *rcb = opaque; RBDTask *task = opaque;
qemu_rbd_complete_aio(rcb); task->complete = 1;
aio_co_wake(task->co);
} }
/* /*
* This is the callback function for rbd_aio_read and _write * This is the completion callback function for all rbd aio calls
* started from qemu_rbd_start_co().
* *
* Note: this function is being called from a non qemu thread so * Note: this function is being called from a non qemu thread so
* we need to be careful about what we do here. Generally we only * we need to be careful about what we do here. Generally we only
* schedule a BH, and do the rest of the io completion handling * schedule a BH, and do the rest of the io completion handling
* from rbd_finish_bh() which runs in a qemu context. * from qemu_rbd_finish_bh() which runs in a qemu context.
*/ */
static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
{ {
RBDAIOCB *acb = rcb->acb; task->ret = rbd_aio_get_return_value(c);
rcb->ret = rbd_aio_get_return_value(c);
rbd_aio_release(c); rbd_aio_release(c);
aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs), qemu_rbd_finish_bh, task);
rbd_finish_bh, rcb);
} }
static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
int64_t off, uint64_t offset,
QEMUIOVector *qiov, uint64_t bytes,
int64_t size, QEMUIOVector *qiov,
BlockCompletionFunc *cb, int flags,
void *opaque, RBDAIOCmd cmd)
RBDAIOCmd cmd)
{ {
RBDAIOCB *acb; BDRVRBDState *s = bs->opaque;
RADOSCB *rcb = NULL; RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
rbd_completion_t c; rbd_completion_t c;
int r; int r;
BDRVRBDState *s = bs->opaque; assert(!qiov || qiov->size == bytes);
acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); r = rbd_aio_create_completion(&task,
acb->cmd = cmd; (rbd_callback_t) qemu_rbd_completion_cb, &c);
acb->qiov = qiov;
assert(!qiov || qiov->size == size);
rcb = g_new(RADOSCB, 1);
acb->ret = 0;
acb->error = 0;
acb->s = s;
rcb->acb = acb;
rcb->s = acb->s;
rcb->size = size;
r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
if (r < 0) { if (r < 0) {
goto failed; return r;
} }
switch (cmd) { switch (cmd) {
case RBD_AIO_WRITE:
/*
* RBD APIs don't allow us to write more than actual size, so in order
* to support growing images, we resize the image before write
* operations that exceed the current size.
*/
if (off + size > s->image_size) {
r = qemu_rbd_resize(bs, off + size);
if (r < 0) {
goto failed_completion;
}
}
r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
break;
case RBD_AIO_READ: case RBD_AIO_READ:
r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c); r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
break;
case RBD_AIO_WRITE:
r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
break; break;
case RBD_AIO_DISCARD: case RBD_AIO_DISCARD:
r = rbd_aio_discard(s->image, off, size, c); r = rbd_aio_discard(s->image, offset, bytes, c);
break; break;
case RBD_AIO_FLUSH: case RBD_AIO_FLUSH:
r = rbd_aio_flush(s->image, c); r = rbd_aio_flush(s->image, c);
@ -1214,44 +1128,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
} }
if (r < 0) { if (r < 0) {
goto failed_completion; error_report("rbd request failed early: cmd %d offset %" PRIu64
" bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
bytes, flags, r, strerror(-r));
rbd_aio_release(c);
return r;
} }
return &acb->common;
failed_completion: while (!task.complete) {
rbd_aio_release(c); qemu_coroutine_yield();
failed: }
g_free(rcb);
qemu_aio_unref(acb); if (task.ret < 0) {
return NULL; error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
bytes, flags, task.ret, strerror(-task.ret));
return task.ret;
}
/* zero pad short reads */
if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
}
return 0;
} }
static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs, static int
uint64_t offset, uint64_t bytes, coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
QEMUIOVector *qiov, int flags, uint64_t bytes, QEMUIOVector *qiov,
BlockCompletionFunc *cb, int flags)
void *opaque)
{ {
return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque, return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
RBD_AIO_READ);
} }
static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs, static int
uint64_t offset, uint64_t bytes, coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
QEMUIOVector *qiov, int flags, uint64_t bytes, QEMUIOVector *qiov,
BlockCompletionFunc *cb, int flags)
void *opaque)
{ {
return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque, BDRVRBDState *s = bs->opaque;
RBD_AIO_WRITE); /*
* RBD APIs don't allow us to write more than actual size, so in order
* to support growing images, we resize the image before write
* operations that exceed the current size.
*/
if (offset + bytes > s->image_size) {
int r = qemu_rbd_resize(bs, offset + bytes);
if (r < 0) {
return r;
}
}
return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
} }
static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs, static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
BlockCompletionFunc *cb,
void *opaque)
{ {
return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH); return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
}
static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
int64_t offset, int count)
{
return qemu_rbd_start_co(bs, offset, count, NULL, 0, RBD_AIO_DISCARD);
} }
static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
@ -1450,16 +1389,6 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
return snap_count; return snap_count;
} }
static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
int64_t offset,
int bytes,
BlockCompletionFunc *cb,
void *opaque)
{
return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
RBD_AIO_DISCARD);
}
static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs, static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
Error **errp) Error **errp)
{ {
@ -1540,11 +1469,10 @@ static BlockDriver bdrv_rbd = {
.bdrv_co_truncate = qemu_rbd_co_truncate, .bdrv_co_truncate = qemu_rbd_co_truncate,
.protocol_name = "rbd", .protocol_name = "rbd",
.bdrv_aio_preadv = qemu_rbd_aio_preadv, .bdrv_co_preadv = qemu_rbd_co_preadv,
.bdrv_aio_pwritev = qemu_rbd_aio_pwritev, .bdrv_co_pwritev = qemu_rbd_co_pwritev,
.bdrv_co_flush_to_disk = qemu_rbd_co_flush,
.bdrv_aio_flush = qemu_rbd_aio_flush, .bdrv_co_pdiscard = qemu_rbd_co_pdiscard,
.bdrv_aio_pdiscard = qemu_rbd_aio_pdiscard,
.bdrv_snapshot_create = qemu_rbd_snap_create, .bdrv_snapshot_create = qemu_rbd_snap_create,
.bdrv_snapshot_delete = qemu_rbd_snap_remove, .bdrv_snapshot_delete = qemu_rbd_snap_remove,