nbd patches for 2019-10-22

- add ability for NBD client reconnect
 -----BEGIN PGP SIGNATURE-----
 
 iQEzBAABCAAdFiEEccLMIrHEYCkn0vOqp6FrSiUnQ2oFAl2vsoQACgkQp6FrSiUn
 Q2ogtQf/bSt3RuugySwRFxrw2pdDluIZOdfeOq4ytOaEQPJYiOzi28Vfs8ZnvfEq
 XLShJXoJJA51dMtlqJbuq4Iw9G4YGcn5pax08MjocdIGF+TwCnaSf0KumwefT4yr
 je0EdWAE1E9OtVlbNdjmCUpJ4oFFEsKj95wo8iNlYT2H8Trk7Y1DmmhsMJF/15qf
 MupqYINYrQUMDqGoMYHx/k8Iwhb2jZzWwGjD9dGKCDjUtw/DYNSgsj5YFOrll4+z
 uynB2D784PAr+qFd4N3QSgG/wCayNIXspBGfDK1HWahZV9KSvSq46uVf1mWYE6HS
 pa0keH+LNVW9TXjzsOmaToVy52E9Vw==
 =3ifl
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/ericb/tags/pull-nbd-2019-10-22' into staging

nbd patches for 2019-10-22

- add ability for NBD client reconnect

# gpg: Signature made Wed 23 Oct 2019 02:53:08 BST
# gpg:                using RSA key 71C2CC22B1C4602927D2F3AAA7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg:                 aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg:                 aka "[jpeg image of size 6874]" [full]
# Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2  F3AA A7A1 6B4A 2527 436A

* remotes/ericb/tags/pull-nbd-2019-10-22:
  iotests: test nbd reconnect
  block/nbd: nbd reconnect
  qemu-coroutine-sleep: introduce qemu_co_sleep_wake

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2019-10-23 16:06:13 +01:00
commit f78398bfe5
7 changed files with 447 additions and 78 deletions

View File

@ -1,6 +1,7 @@
/* /*
* QEMU Block driver for NBD * QEMU Block driver for NBD
* *
* Copyright (c) 2019 Virtuozzo International GmbH.
* Copyright (C) 2016 Red Hat, Inc. * Copyright (C) 2016 Red Hat, Inc.
* Copyright (C) 2008 Bull S.A.S. * Copyright (C) 2008 Bull S.A.S.
* Author: Laurent Vivier <Laurent.Vivier@bull.net> * Author: Laurent Vivier <Laurent.Vivier@bull.net>
@ -55,6 +56,8 @@ typedef struct {
} NBDClientRequest; } NBDClientRequest;
typedef enum NBDClientState { typedef enum NBDClientState {
NBD_CLIENT_CONNECTING_WAIT,
NBD_CLIENT_CONNECTING_NOWAIT,
NBD_CLIENT_CONNECTED, NBD_CLIENT_CONNECTED,
NBD_CLIENT_QUIT NBD_CLIENT_QUIT
} NBDClientState; } NBDClientState;
@ -67,8 +70,14 @@ typedef struct BDRVNBDState {
CoMutex send_mutex; CoMutex send_mutex;
CoQueue free_sema; CoQueue free_sema;
Coroutine *connection_co; Coroutine *connection_co;
QemuCoSleepState *connection_co_sleep_ns_state;
bool drained;
bool wait_drained_end;
int in_flight; int in_flight;
NBDClientState state; NBDClientState state;
int connect_status;
Error *connect_err;
bool wait_in_flight;
NBDClientRequest requests[MAX_NBD_REQUESTS]; NBDClientRequest requests[MAX_NBD_REQUESTS];
NBDReply reply; NBDReply reply;
@ -83,10 +92,21 @@ typedef struct BDRVNBDState {
char *x_dirty_bitmap; char *x_dirty_bitmap;
} BDRVNBDState; } BDRVNBDState;
/* @ret will be used for reconnect in future */ static int nbd_client_connect(BlockDriverState *bs, Error **errp);
static void nbd_channel_error(BDRVNBDState *s, int ret) static void nbd_channel_error(BDRVNBDState *s, int ret)
{ {
s->state = NBD_CLIENT_QUIT; if (ret == -EIO) {
if (s->state == NBD_CLIENT_CONNECTED) {
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
NBD_CLIENT_CONNECTING_NOWAIT;
}
} else {
if (s->state == NBD_CLIENT_CONNECTED) {
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
s->state = NBD_CLIENT_QUIT;
}
} }
static void nbd_recv_coroutines_wake_all(BDRVNBDState *s) static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
@ -129,7 +149,13 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
{ {
BDRVNBDState *s = (BDRVNBDState *)bs->opaque; BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context); /*
* s->connection_co is either yielded from nbd_receive_reply or from
* nbd_co_reconnect_loop()
*/
if (s->state == NBD_CLIENT_CONNECTED) {
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
}
bdrv_inc_in_flight(bs); bdrv_inc_in_flight(bs);
@ -140,24 +166,150 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs); aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
} }
static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->drained = true;
if (s->connection_co_sleep_ns_state) {
qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
}
}
static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->drained = false;
if (s->wait_drained_end) {
s->wait_drained_end = false;
aio_co_wake(s->connection_co);
}
}
static void nbd_teardown_connection(BlockDriverState *bs) static void nbd_teardown_connection(BlockDriverState *bs)
{ {
BDRVNBDState *s = (BDRVNBDState *)bs->opaque; BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
assert(s->ioc); if (s->state == NBD_CLIENT_CONNECTED) {
/* finish any pending coroutines */
/* finish any pending coroutines */ assert(s->ioc);
qio_channel_shutdown(s->ioc, qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
QIO_CHANNEL_SHUTDOWN_BOTH, }
NULL); s->state = NBD_CLIENT_QUIT;
if (s->connection_co) {
if (s->connection_co_sleep_ns_state) {
qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
}
}
BDRV_POLL_WHILE(bs, s->connection_co); BDRV_POLL_WHILE(bs, s->connection_co);
}
nbd_client_detach_aio_context(bs); static bool nbd_client_connecting(BDRVNBDState *s)
object_unref(OBJECT(s->sioc)); {
s->sioc = NULL; return s->state == NBD_CLIENT_CONNECTING_WAIT ||
object_unref(OBJECT(s->ioc)); s->state == NBD_CLIENT_CONNECTING_NOWAIT;
s->ioc = NULL; }
static bool nbd_client_connecting_wait(BDRVNBDState *s)
{
return s->state == NBD_CLIENT_CONNECTING_WAIT;
}
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
Error *local_err = NULL;
if (!nbd_client_connecting(s)) {
return;
}
/* Wait for completion of all in-flight requests */
qemu_co_mutex_lock(&s->send_mutex);
while (s->in_flight > 0) {
qemu_co_mutex_unlock(&s->send_mutex);
nbd_recv_coroutines_wake_all(s);
s->wait_in_flight = true;
qemu_coroutine_yield();
s->wait_in_flight = false;
qemu_co_mutex_lock(&s->send_mutex);
}
qemu_co_mutex_unlock(&s->send_mutex);
if (!nbd_client_connecting(s)) {
return;
}
/*
* Now we are sure that nobody is accessing the channel, and no one will
* try until we set the state to CONNECTED.
*/
/* Finalize previous connection if any */
if (s->ioc) {
nbd_client_detach_aio_context(s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
s->connect_status = nbd_client_connect(s->bs, &local_err);
error_free(s->connect_err);
s->connect_err = NULL;
error_propagate(&s->connect_err, local_err);
if (s->connect_status < 0) {
/* failed attempt */
return;
}
/* successfully connected */
s->state = NBD_CLIENT_CONNECTED;
qemu_co_queue_restart_all(&s->free_sema);
}
static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
{
uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
nbd_reconnect_attempt(s);
while (nbd_client_connecting(s)) {
if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
{
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
qemu_co_queue_restart_all(&s->free_sema);
}
qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
&s->connection_co_sleep_ns_state);
if (s->drained) {
bdrv_dec_in_flight(s->bs);
s->wait_drained_end = true;
while (s->drained) {
/*
* We may be entered once from nbd_client_attach_aio_context_bh
* and then from nbd_client_co_drain_end. So here is a loop.
*/
qemu_coroutine_yield();
}
bdrv_inc_in_flight(s->bs);
}
if (timeout < max_timeout) {
timeout *= 2;
}
nbd_reconnect_attempt(s);
}
} }
static coroutine_fn void nbd_connection_entry(void *opaque) static coroutine_fn void nbd_connection_entry(void *opaque)
@ -177,16 +329,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
* Therefore we keep an additional in_flight reference all the time and * Therefore we keep an additional in_flight reference all the time and
* only drop it temporarily here. * only drop it temporarily here.
*/ */
if (nbd_client_connecting(s)) {
nbd_co_reconnect_loop(s);
}
if (s->state != NBD_CLIENT_CONNECTED) {
continue;
}
assert(s->reply.handle == 0); assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err); ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
if (local_err) { if (local_err) {
trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err)); trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
error_free(local_err); error_free(local_err);
local_err = NULL;
} }
if (ret <= 0) { if (ret <= 0) {
nbd_channel_error(s, ret ? ret : -EIO); nbd_channel_error(s, ret ? ret : -EIO);
break; continue;
} }
/* /*
@ -201,7 +363,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
(nbd_reply_is_structured(&s->reply) && !s->info.structured_reply)) (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
{ {
nbd_channel_error(s, -EINVAL); nbd_channel_error(s, -EINVAL);
break; continue;
} }
/* /*
@ -220,10 +382,19 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
qemu_coroutine_yield(); qemu_coroutine_yield();
} }
qemu_co_queue_restart_all(&s->free_sema);
nbd_recv_coroutines_wake_all(s); nbd_recv_coroutines_wake_all(s);
bdrv_dec_in_flight(s->bs); bdrv_dec_in_flight(s->bs);
s->connection_co = NULL; s->connection_co = NULL;
if (s->ioc) {
nbd_client_detach_aio_context(s->bs);
object_unref(OBJECT(s->sioc));
s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
aio_wait_kick(); aio_wait_kick();
} }
@ -235,7 +406,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
int rc, i = -1; int rc, i = -1;
qemu_co_mutex_lock(&s->send_mutex); qemu_co_mutex_lock(&s->send_mutex);
while (s->in_flight == MAX_NBD_REQUESTS) { while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
qemu_co_queue_wait(&s->free_sema, &s->send_mutex); qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
} }
@ -286,7 +457,11 @@ err:
s->requests[i].coroutine = NULL; s->requests[i].coroutine = NULL;
s->in_flight--; s->in_flight--;
} }
qemu_co_queue_next(&s->free_sema); if (s->in_flight == 0 && s->wait_in_flight) {
aio_co_wake(s->connection_co);
} else {
qemu_co_queue_next(&s->free_sema);
}
} }
qemu_co_mutex_unlock(&s->send_mutex); qemu_co_mutex_unlock(&s->send_mutex);
return rc; return rc;
@ -666,10 +841,15 @@ static coroutine_fn int nbd_co_receive_one_chunk(
} else { } else {
/* For assert at loop start in nbd_connection_entry */ /* For assert at loop start in nbd_connection_entry */
*reply = s->reply; *reply = s->reply;
s->reply.handle = 0;
} }
s->reply.handle = 0;
if (s->connection_co) { if (s->connection_co && !s->wait_in_flight) {
/*
* We must check s->wait_in_flight, because we may entered by
* nbd_recv_coroutines_wake_all(), in this case we should not
* wake connection_co here, it will woken by last request.
*/
aio_co_wake(s->connection_co); aio_co_wake(s->connection_co);
} }
@ -781,7 +961,11 @@ break_loop:
qemu_co_mutex_lock(&s->send_mutex); qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--; s->in_flight--;
qemu_co_queue_next(&s->free_sema); if (s->in_flight == 0 && s->wait_in_flight) {
aio_co_wake(s->connection_co);
} else {
qemu_co_queue_next(&s->free_sema);
}
qemu_co_mutex_unlock(&s->send_mutex); qemu_co_mutex_unlock(&s->send_mutex);
return false; return false;
@ -927,20 +1111,26 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
} else { } else {
assert(request->type != NBD_CMD_WRITE); assert(request->type != NBD_CMD_WRITE);
} }
ret = nbd_co_send_request(bs, request, write_qiov);
if (ret < 0) {
return ret;
}
ret = nbd_co_receive_return_code(s, request->handle, do {
&request_ret, &local_err); ret = nbd_co_send_request(bs, request, write_qiov);
if (local_err) { if (ret < 0) {
trace_nbd_co_request_fail(request->from, request->len, request->handle, continue;
request->flags, request->type, }
nbd_cmd_lookup(request->type),
ret, error_get_pretty(local_err)); ret = nbd_co_receive_return_code(s, request->handle,
error_free(local_err); &request_ret, &local_err);
} if (local_err) {
trace_nbd_co_request_fail(request->from, request->len,
request->handle, request->flags,
request->type,
nbd_cmd_lookup(request->type),
ret, error_get_pretty(local_err));
error_free(local_err);
local_err = NULL;
}
} while (ret < 0 && nbd_client_connecting_wait(s));
return ret ? ret : request_ret; return ret ? ret : request_ret;
} }
@ -981,20 +1171,24 @@ static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
request.len -= slop; request.len -= slop;
} }
ret = nbd_co_send_request(bs, &request, NULL); do {
if (ret < 0) { ret = nbd_co_send_request(bs, &request, NULL);
return ret; if (ret < 0) {
} continue;
}
ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
&request_ret, &local_err);
if (local_err) {
trace_nbd_co_request_fail(request.from, request.len, request.handle,
request.flags, request.type,
nbd_cmd_lookup(request.type),
ret, error_get_pretty(local_err));
error_free(local_err);
local_err = NULL;
}
} while (ret < 0 && nbd_client_connecting_wait(s));
ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
&request_ret, &local_err);
if (local_err) {
trace_nbd_co_request_fail(request.from, request.len, request.handle,
request.flags, request.type,
nbd_cmd_lookup(request.type),
ret, error_get_pretty(local_err));
error_free(local_err);
}
return ret ? ret : request_ret; return ret ? ret : request_ret;
} }
@ -1131,20 +1325,25 @@ static int coroutine_fn nbd_client_co_block_status(
if (s->info.min_block) { if (s->info.min_block) {
assert(QEMU_IS_ALIGNED(request.len, s->info.min_block)); assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
} }
ret = nbd_co_send_request(bs, &request, NULL); do {
if (ret < 0) { ret = nbd_co_send_request(bs, &request, NULL);
return ret; if (ret < 0) {
} continue;
}
ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
&extent, &request_ret,
&local_err);
if (local_err) {
trace_nbd_co_request_fail(request.from, request.len, request.handle,
request.flags, request.type,
nbd_cmd_lookup(request.type),
ret, error_get_pretty(local_err));
error_free(local_err);
local_err = NULL;
}
} while (ret < 0 && nbd_client_connecting_wait(s));
ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
&extent, &request_ret, &local_err);
if (local_err) {
trace_nbd_co_request_fail(request.from, request.len, request.handle,
request.flags, request.type,
nbd_cmd_lookup(request.type),
ret, error_get_pretty(local_err));
error_free(local_err);
}
if (ret < 0 || request_ret < 0) { if (ret < 0 || request_ret < 0) {
return ret ? ret : request_ret; return ret ? ret : request_ret;
} }
@ -1175,9 +1374,9 @@ static void nbd_client_close(BlockDriverState *bs)
BDRVNBDState *s = (BDRVNBDState *)bs->opaque; BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
NBDRequest request = { .type = NBD_CMD_DISC }; NBDRequest request = { .type = NBD_CMD_DISC };
assert(s->ioc); if (s->ioc) {
nbd_send_request(s->ioc, &request);
nbd_send_request(s->ioc, &request); }
nbd_teardown_connection(bs); nbd_teardown_connection(bs);
} }
@ -1821,6 +2020,8 @@ static BlockDriver bdrv_nbd = {
.bdrv_getlength = nbd_getlength, .bdrv_getlength = nbd_getlength,
.bdrv_detach_aio_context = nbd_client_detach_aio_context, .bdrv_detach_aio_context = nbd_client_detach_aio_context,
.bdrv_attach_aio_context = nbd_client_attach_aio_context, .bdrv_attach_aio_context = nbd_client_attach_aio_context,
.bdrv_co_drain_begin = nbd_client_co_drain_begin,
.bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename, .bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status, .bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname, .bdrv_dirname = nbd_dirname,
@ -1844,6 +2045,8 @@ static BlockDriver bdrv_nbd_tcp = {
.bdrv_getlength = nbd_getlength, .bdrv_getlength = nbd_getlength,
.bdrv_detach_aio_context = nbd_client_detach_aio_context, .bdrv_detach_aio_context = nbd_client_detach_aio_context,
.bdrv_attach_aio_context = nbd_client_attach_aio_context, .bdrv_attach_aio_context = nbd_client_attach_aio_context,
.bdrv_co_drain_begin = nbd_client_co_drain_begin,
.bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename, .bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status, .bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname, .bdrv_dirname = nbd_dirname,
@ -1867,6 +2070,8 @@ static BlockDriver bdrv_nbd_unix = {
.bdrv_getlength = nbd_getlength, .bdrv_getlength = nbd_getlength,
.bdrv_detach_aio_context = nbd_client_detach_aio_context, .bdrv_detach_aio_context = nbd_client_detach_aio_context,
.bdrv_attach_aio_context = nbd_client_attach_aio_context, .bdrv_attach_aio_context = nbd_client_attach_aio_context,
.bdrv_co_drain_begin = nbd_client_co_drain_begin,
.bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename, .bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status, .bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname, .bdrv_dirname = nbd_dirname,

View File

@ -273,10 +273,29 @@ void qemu_co_rwlock_wrlock(CoRwlock *lock);
*/ */
void qemu_co_rwlock_unlock(CoRwlock *lock); void qemu_co_rwlock_unlock(CoRwlock *lock);
typedef struct QemuCoSleepState QemuCoSleepState;
/** /**
* Yield the coroutine for a given duration * Yield the coroutine for a given duration. During this yield, @sleep_state
* (if not NULL) is set to an opaque pointer, which may be used for
* qemu_co_sleep_wake(). Be careful, the pointer is set back to zero when the
* timer fires. Don't save the obtained value to other variables and don't call
* qemu_co_sleep_wake from another aio context.
*/ */
void coroutine_fn qemu_co_sleep_ns(QEMUClockType type, int64_t ns); void coroutine_fn qemu_co_sleep_ns_wakeable(QEMUClockType type, int64_t ns,
QemuCoSleepState **sleep_state);
static inline void coroutine_fn qemu_co_sleep_ns(QEMUClockType type, int64_t ns)
{
qemu_co_sleep_ns_wakeable(type, ns, NULL);
}
/**
* Wake a coroutine if it is sleeping in qemu_co_sleep_ns. The timer will be
* deleted. @sleep_state must be the variable whose address was given to
* qemu_co_sleep_ns() and should be checked to be non-NULL before calling
* qemu_co_sleep_wake().
*/
void qemu_co_sleep_wake(QemuCoSleepState *sleep_state);
/** /**
* Yield until a file descriptor becomes readable * Yield until a file descriptor becomes readable

95
tests/qemu-iotests/264 Executable file
View File

@ -0,0 +1,95 @@
#!/usr/bin/env python
#
# Test nbd reconnect
#
# Copyright (c) 2019 Virtuozzo International GmbH.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import time
import iotests
from iotests import qemu_img_create, qemu_io_silent_check, file_path, \
qemu_nbd_popen, log
disk_a, disk_b, nbd_sock = file_path('disk_a', 'disk_b', 'nbd-sock')
nbd_uri = 'nbd+unix:///?socket=' + nbd_sock
size = 5 * 1024 * 1024
wait_limit = 3
wait_step = 0.2
qemu_img_create('-f', iotests.imgfmt, disk_a, str(size))
qemu_img_create('-f', iotests.imgfmt, disk_b, str(size))
srv = qemu_nbd_popen('-k', nbd_sock, '-f', iotests.imgfmt, disk_b)
# Wait for NBD server availability
t = 0
ok = False
while t < wait_limit:
ok = qemu_io_silent_check('-f', 'raw', '-c', 'read 0 512', nbd_uri)
if ok:
break
time.sleep(wait_step)
t += wait_step
assert ok
vm = iotests.VM().add_drive(disk_a)
vm.launch()
vm.hmp_qemu_io('drive0', 'write 0 {}'.format(size))
vm.qmp_log('blockdev-add', filters=[iotests.filter_qmp_testfiles],
**{'node_name': 'backup0',
'driver': 'raw',
'file': {'driver': 'nbd',
'server': {'type': 'unix', 'path': nbd_sock},
'reconnect-delay': 10}})
vm.qmp_log('blockdev-backup', device='drive0', sync='full', target='backup0',
speed=(1 * 1024 * 1024))
# Wait for some progress
t = 0
while t < wait_limit:
jobs = vm.qmp('query-block-jobs')['return']
if jobs and jobs[0]['offset'] > 0:
break
time.sleep(wait_step)
t += wait_step
if jobs and jobs[0]['offset'] > 0:
log('Backup job is started')
log('Kill NBD server')
srv.kill()
srv.wait()
jobs = vm.qmp('query-block-jobs')['return']
if jobs and jobs[0]['offset'] < jobs[0]['len']:
log('Backup job is still in progress')
vm.qmp_log('block-job-set-speed', device='drive0', speed=0)
# Emulate server down time for 1 second
time.sleep(1)
log('Start NBD server')
srv = qemu_nbd_popen('-k', nbd_sock, '-f', iotests.imgfmt, disk_b)
e = vm.event_wait('BLOCK_JOB_COMPLETED')
log('Backup completed: {}'.format(e['data']['offset']))
vm.qmp_log('blockdev-del', node_name='backup0')
srv.kill()
vm.shutdown()

View File

@ -0,0 +1,13 @@
{"execute": "blockdev-add", "arguments": {"driver": "raw", "file": {"driver": "nbd", "reconnect-delay": 10, "server": {"path": "TEST_DIR/PID-nbd-sock", "type": "unix"}}, "node-name": "backup0"}}
{"return": {}}
{"execute": "blockdev-backup", "arguments": {"device": "drive0", "speed": 1048576, "sync": "full", "target": "backup0"}}
{"return": {}}
Backup job is started
Kill NBD server
Backup job is still in progress
{"execute": "block-job-set-speed", "arguments": {"device": "drive0", "speed": 0}}
{"return": {}}
Start NBD server
Backup completed: 5242880
{"execute": "blockdev-del", "arguments": {"node-name": "backup0"}}
{"return": {}}

View File

@ -276,6 +276,7 @@
260 rw quick 260 rw quick
262 rw quick migration 262 rw quick migration
263 rw quick 263 rw quick
264 rw
265 rw auto quick 265 rw auto quick
266 rw quick 266 rw quick
267 rw auto quick snapshot 267 rw auto quick snapshot

View File

@ -165,6 +165,13 @@ def qemu_io_silent(*args):
(-exitcode, ' '.join(args))) (-exitcode, ' '.join(args)))
return exitcode return exitcode
def qemu_io_silent_check(*args):
'''Run qemu-io and return the true if subprocess returned 0'''
args = qemu_io_args + list(args)
exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
stderr=subprocess.STDOUT)
return exitcode == 0
def get_virtio_scsi_device(): def get_virtio_scsi_device():
if qemu_default_machine == 's390-ccw-virtio': if qemu_default_machine == 's390-ccw-virtio':
return 'virtio-scsi-ccw' return 'virtio-scsi-ccw'
@ -230,6 +237,10 @@ def qemu_nbd_early_pipe(*args):
else: else:
return exitcode, subp.communicate()[0] return exitcode, subp.communicate()[0]
def qemu_nbd_popen(*args):
'''Run qemu-nbd in daemon mode and return the parent's exit code'''
return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
'''Return True if two image files are identical''' '''Return True if two image files are identical'''
return qemu_img('compare', '-f', fmt1, return qemu_img('compare', '-f', fmt1,

View File

@ -17,31 +17,56 @@
#include "qemu/timer.h" #include "qemu/timer.h"
#include "block/aio.h" #include "block/aio.h"
static void co_sleep_cb(void *opaque) static const char *qemu_co_sleep_ns__scheduled = "qemu_co_sleep_ns";
{
Coroutine *co = opaque;
struct QemuCoSleepState {
Coroutine *co;
QEMUTimer *ts;
QemuCoSleepState **user_state_pointer;
};
void qemu_co_sleep_wake(QemuCoSleepState *sleep_state)
{
/* Write of schedule protected by barrier write in aio_co_schedule */ /* Write of schedule protected by barrier write in aio_co_schedule */
atomic_set(&co->scheduled, NULL); const char *scheduled = atomic_cmpxchg(&sleep_state->co->scheduled,
aio_co_wake(co); qemu_co_sleep_ns__scheduled, NULL);
assert(scheduled == qemu_co_sleep_ns__scheduled);
if (sleep_state->user_state_pointer) {
*sleep_state->user_state_pointer = NULL;
}
timer_del(sleep_state->ts);
aio_co_wake(sleep_state->co);
} }
void coroutine_fn qemu_co_sleep_ns(QEMUClockType type, int64_t ns) static void co_sleep_cb(void *opaque)
{
qemu_co_sleep_wake(opaque);
}
void coroutine_fn qemu_co_sleep_ns_wakeable(QEMUClockType type, int64_t ns,
QemuCoSleepState **sleep_state)
{ {
AioContext *ctx = qemu_get_current_aio_context(); AioContext *ctx = qemu_get_current_aio_context();
QEMUTimer *ts; QemuCoSleepState state = {
Coroutine *co = qemu_coroutine_self(); .co = qemu_coroutine_self(),
.ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, &state),
.user_state_pointer = sleep_state,
};
const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL, __func__); const char *scheduled = atomic_cmpxchg(&state.co->scheduled, NULL,
qemu_co_sleep_ns__scheduled);
if (scheduled) { if (scheduled) {
fprintf(stderr, fprintf(stderr,
"%s: Co-routine was already scheduled in '%s'\n", "%s: Co-routine was already scheduled in '%s'\n",
__func__, scheduled); __func__, scheduled);
abort(); abort();
} }
ts = aio_timer_new(ctx, type, SCALE_NS, co_sleep_cb, co);
timer_mod(ts, qemu_clock_get_ns(type) + ns); if (sleep_state) {
*sleep_state = &state;
}
timer_mod(state.ts, qemu_clock_get_ns(type) + ns);
qemu_coroutine_yield(); qemu_coroutine_yield();
timer_del(ts); timer_free(state.ts);
timer_free(ts);
} }