sheepdog: use coroutines

This makes the sheepdog block driver support bdrv_co_readv/writev
instead of bdrv_aio_readv/writev.

With this patch, Sheepdog network I/O becomes fully asynchronous.  The
block driver yields back when send/recv returns EAGAIN, and is resumed
when the sheepdog network connection is ready for the operation.

Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
This commit is contained in:
MORITA Kazutaka 2011-08-12 21:33:15 +09:00 committed by Kevin Wolf
parent ab0997e0af
commit 2df4624662
1 changed files with 93 additions and 57 deletions

View File

@ -274,7 +274,7 @@ struct SheepdogAIOCB {
int ret; int ret;
enum AIOCBState aiocb_type; enum AIOCBState aiocb_type;
QEMUBH *bh; Coroutine *coroutine;
void (*aio_done_func)(SheepdogAIOCB *); void (*aio_done_func)(SheepdogAIOCB *);
int canceled; int canceled;
@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState {
char *port; char *port;
int fd; int fd;
CoMutex lock;
Coroutine *co_send;
Coroutine *co_recv;
uint32_t aioreq_seq_num; uint32_t aioreq_seq_num;
QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head; QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
} BDRVSheepdogState; } BDRVSheepdogState;
@ -346,19 +350,16 @@ static const char * sd_strerror(int err)
/* /*
* Sheepdog I/O handling: * Sheepdog I/O handling:
* *
* 1. In the sd_aio_readv/writev, read/write requests are added to the * 1. In sd_co_rw_vector, we send the I/O requests to the server and
* QEMU Bottom Halves. * link the requests to the outstanding_list in the
* BDRVSheepdogState. The function exits without waiting for
* receiving the response.
* *
* 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O * 2. We receive the response in aio_read_response, the fd handler to
* requests to the server and link the requests to the
* outstanding_list in the BDRVSheepdogState. we exits the
* function without waiting for receiving the response.
*
* 3. We receive the response in aio_read_response, the fd handler to
* the sheepdog connection. If metadata update is needed, we send * the sheepdog connection. If metadata update is needed, we send
* the write request to the vdi object in sd_write_done, the write * the write request to the vdi object in sd_write_done, the write
* completion function. The AIOCB callback is not called until all * completion function. We switch back to sd_co_readv/writev after
* the requests belonging to the AIOCB are finished. * all the requests belonging to the AIOCB are finished.
*/ */
static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb, static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
static void sd_finish_aiocb(SheepdogAIOCB *acb) static void sd_finish_aiocb(SheepdogAIOCB *acb)
{ {
if (!acb->canceled) { if (!acb->canceled) {
acb->common.cb(acb->common.opaque, acb->ret); qemu_coroutine_enter(acb->coroutine, NULL);
} }
qemu_aio_release(acb); qemu_aio_release(acb);
} }
@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
* Sheepdog cannot cancel the requests which are already sent to * Sheepdog cannot cancel the requests which are already sent to
* the servers, so we just complete the request with -EIO here. * the servers, so we just complete the request with -EIO here.
*/ */
acb->common.cb(acb->common.opaque, -EIO); acb->ret = -EIO;
qemu_coroutine_enter(acb->coroutine, NULL);
acb->canceled = 1; acb->canceled = 1;
} }
@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
acb->aio_done_func = NULL; acb->aio_done_func = NULL;
acb->canceled = 0; acb->canceled = 0;
acb->bh = NULL; acb->coroutine = qemu_coroutine_self();
acb->ret = 0; acb->ret = 0;
QLIST_INIT(&acb->aioreq_head); QLIST_INIT(&acb->aioreq_head);
return acb; return acb;
} }
static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
{
if (acb->bh) {
error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
return -EIO;
}
acb->bh = qemu_bh_new(cb, acb);
qemu_bh_schedule(acb->bh);
return 0;
}
#ifdef _WIN32 #ifdef _WIN32
struct msghdr { struct msghdr {
@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len,
again: again:
ret = do_send_recv(sockfd, iov, len, iov_offset, write); ret = do_send_recv(sockfd, iov, len, iov_offset, write);
if (ret < 0) { if (ret < 0) {
if (errno == EINTR || errno == EAGAIN) { if (errno == EINTR) {
goto again;
}
if (errno == EAGAIN) {
if (qemu_in_coroutine()) {
qemu_coroutine_yield();
}
goto again; goto again;
} }
error_report("failed to recv a rsp, %s", strerror(errno)); error_report("failed to recv a rsp, %s", strerror(errno));
@ -793,14 +789,14 @@ static void aio_read_response(void *opaque)
unsigned long idx; unsigned long idx;
if (QLIST_EMPTY(&s->outstanding_aio_head)) { if (QLIST_EMPTY(&s->outstanding_aio_head)) {
return; goto out;
} }
/* read a header */ /* read a header */
ret = do_read(fd, &rsp, sizeof(rsp)); ret = do_read(fd, &rsp, sizeof(rsp));
if (ret) { if (ret) {
error_report("failed to get the header, %s", strerror(errno)); error_report("failed to get the header, %s", strerror(errno));
return; goto out;
} }
/* find the right aio_req from the outstanding_aio list */ /* find the right aio_req from the outstanding_aio list */
@ -811,7 +807,7 @@ static void aio_read_response(void *opaque)
} }
if (!aio_req) { if (!aio_req) {
error_report("cannot find aio_req %x", rsp.id); error_report("cannot find aio_req %x", rsp.id);
return; goto out;
} }
acb = aio_req->aiocb; acb = aio_req->aiocb;
@ -847,7 +843,7 @@ static void aio_read_response(void *opaque)
aio_req->iov_offset); aio_req->iov_offset);
if (ret) { if (ret) {
error_report("failed to get the data, %s", strerror(errno)); error_report("failed to get the data, %s", strerror(errno));
return; goto out;
} }
break; break;
} }
@ -861,10 +857,30 @@ static void aio_read_response(void *opaque)
if (!rest) { if (!rest) {
/* /*
* We've finished all requests which belong to the AIOCB, so * We've finished all requests which belong to the AIOCB, so
* we can call the callback now. * we can switch back to sd_co_readv/writev now.
*/ */
acb->aio_done_func(acb); acb->aio_done_func(acb);
} }
out:
s->co_recv = NULL;
}
static void co_read_response(void *opaque)
{
BDRVSheepdogState *s = opaque;
if (!s->co_recv) {
s->co_recv = qemu_coroutine_create(aio_read_response);
}
qemu_coroutine_enter(s->co_recv, opaque);
}
static void co_write_request(void *opaque)
{
BDRVSheepdogState *s = opaque;
qemu_coroutine_enter(s->co_send, NULL);
} }
static int aio_flush_request(void *opaque) static int aio_flush_request(void *opaque)
@ -924,7 +940,7 @@ static int get_sheep_fd(BDRVSheepdogState *s)
return -1; return -1;
} }
qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request, qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request,
NULL, s); NULL, s);
return fd; return fd;
} }
@ -1091,6 +1107,10 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
hdr.id = aio_req->id; hdr.id = aio_req->id;
qemu_co_mutex_lock(&s->lock);
s->co_send = qemu_coroutine_self();
qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
aio_flush_request, NULL, s);
set_cork(s->fd, 1); set_cork(s->fd, 1);
/* send a header */ /* send a header */
@ -1109,6 +1129,9 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
} }
set_cork(s->fd, 0); set_cork(s->fd, 0);
qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
aio_flush_request, NULL, s);
qemu_co_mutex_unlock(&s->lock);
return 0; return 0;
} }
@ -1225,6 +1248,7 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE; bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
strncpy(s->name, vdi, sizeof(s->name)); strncpy(s->name, vdi, sizeof(s->name));
qemu_co_mutex_init(&s->lock);
g_free(buf); g_free(buf);
return 0; return 0;
out: out:
@ -1491,7 +1515,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
/* /*
* This function is called after writing data objects. If we need to * This function is called after writing data objects. If we need to
* update metadata, this sends a write request to the vdi object. * update metadata, this sends a write request to the vdi object.
* Otherwise, this calls the AIOCB callback. * Otherwise, this switches back to sd_co_readv/writev.
*/ */
static void sd_write_done(SheepdogAIOCB *acb) static void sd_write_done(SheepdogAIOCB *acb)
{ {
@ -1587,8 +1611,11 @@ out:
* waiting the response. The responses are received in the * waiting the response. The responses are received in the
* `aio_read_response' function which is called from the main loop as * `aio_read_response' function which is called from the main loop as
* a fd handler. * a fd handler.
*
* Returns 1 when we need to wait a response, 0 when there is no sent
* request and -errno in error cases.
*/ */
static void sd_readv_writev_bh_cb(void *p) static int sd_co_rw_vector(void *p)
{ {
SheepdogAIOCB *acb = p; SheepdogAIOCB *acb = p;
int ret = 0; int ret = 0;
@ -1600,9 +1627,6 @@ static void sd_readv_writev_bh_cb(void *p)
SheepdogInode *inode = &s->inode; SheepdogInode *inode = &s->inode;
AIOReq *aio_req; AIOReq *aio_req;
qemu_bh_delete(acb->bh);
acb->bh = NULL;
if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) { if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
/* /*
* In the case we open the snapshot VDI, Sheepdog creates the * In the case we open the snapshot VDI, Sheepdog creates the
@ -1684,42 +1708,47 @@ static void sd_readv_writev_bh_cb(void *p)
} }
out: out:
if (QLIST_EMPTY(&acb->aioreq_head)) { if (QLIST_EMPTY(&acb->aioreq_head)) {
sd_finish_aiocb(acb); return acb->ret;
} }
return 1;
} }
static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num, static int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
QEMUIOVector *qiov, int nb_sectors, int nb_sectors, QEMUIOVector *qiov)
BlockDriverCompletionFunc *cb,
void *opaque)
{ {
SheepdogAIOCB *acb; SheepdogAIOCB *acb;
int ret;
if (bs->growable && sector_num + nb_sectors > bs->total_sectors) { if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
/* TODO: shouldn't block here */ /* TODO: shouldn't block here */
if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) { if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
return NULL; return -EIO;
} }
bs->total_sectors = sector_num + nb_sectors; bs->total_sectors = sector_num + nb_sectors;
} }
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
acb->aio_done_func = sd_write_done; acb->aio_done_func = sd_write_done;
acb->aiocb_type = AIOCB_WRITE_UDATA; acb->aiocb_type = AIOCB_WRITE_UDATA;
sd_schedule_bh(sd_readv_writev_bh_cb, acb); ret = sd_co_rw_vector(acb);
return &acb->common; if (ret <= 0) {
qemu_aio_release(acb);
return ret;
}
qemu_coroutine_yield();
return acb->ret;
} }
static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num, static int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
QEMUIOVector *qiov, int nb_sectors, int nb_sectors, QEMUIOVector *qiov)
BlockDriverCompletionFunc *cb,
void *opaque)
{ {
SheepdogAIOCB *acb; SheepdogAIOCB *acb;
int i; int i, ret;
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
acb->aiocb_type = AIOCB_READ_UDATA; acb->aiocb_type = AIOCB_READ_UDATA;
acb->aio_done_func = sd_finish_aiocb; acb->aio_done_func = sd_finish_aiocb;
@ -1731,8 +1760,15 @@ static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len); memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
} }
sd_schedule_bh(sd_readv_writev_bh_cb, acb); ret = sd_co_rw_vector(acb);
return &acb->common; if (ret <= 0) {
qemu_aio_release(acb);
return ret;
}
qemu_coroutine_yield();
return acb->ret;
} }
static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@ -2062,8 +2098,8 @@ BlockDriver bdrv_sheepdog = {
.bdrv_getlength = sd_getlength, .bdrv_getlength = sd_getlength,
.bdrv_truncate = sd_truncate, .bdrv_truncate = sd_truncate,
.bdrv_aio_readv = sd_aio_readv, .bdrv_co_readv = sd_co_readv,
.bdrv_aio_writev = sd_aio_writev, .bdrv_co_writev = sd_co_writev,
.bdrv_snapshot_create = sd_snapshot_create, .bdrv_snapshot_create = sd_snapshot_create,
.bdrv_snapshot_goto = sd_snapshot_goto, .bdrv_snapshot_goto = sd_snapshot_goto,