Merge remote-tracking branch 'quintela/migration-pull' into staging

This commit is contained in:
Anthony Liguori 2011-10-20 08:46:55 -05:00
commit a6e43daa73
13 changed files with 557 additions and 605 deletions

View File

@ -256,6 +256,7 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
uint64_t bytes_transferred_last; uint64_t bytes_transferred_last;
double bwidth = 0; double bwidth = 0;
uint64_t expected_time = 0; uint64_t expected_time = 0;
int ret;
if (stage < 0) { if (stage < 0) {
cpu_physical_memory_set_dirty_tracking(0); cpu_physical_memory_set_dirty_tracking(0);
@ -263,8 +264,8 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
} }
if (cpu_physical_sync_dirty_bitmap(0, TARGET_PHYS_ADDR_MAX) != 0) { if (cpu_physical_sync_dirty_bitmap(0, TARGET_PHYS_ADDR_MAX) != 0) {
qemu_file_set_error(f); qemu_file_set_error(f, -EINVAL);
return 0; return -EINVAL;
} }
if (stage == 1) { if (stage == 1) {
@ -300,7 +301,7 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
bytes_transferred_last = bytes_transferred; bytes_transferred_last = bytes_transferred;
bwidth = qemu_get_clock_ns(rt_clock); bwidth = qemu_get_clock_ns(rt_clock);
while (!qemu_file_rate_limit(f)) { while ((ret = qemu_file_rate_limit(f)) == 0) {
int bytes_sent; int bytes_sent;
bytes_sent = ram_save_block(f); bytes_sent = ram_save_block(f);
@ -310,6 +311,10 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
} }
} }
if (ret < 0) {
return ret;
}
bwidth = qemu_get_clock_ns(rt_clock) - bwidth; bwidth = qemu_get_clock_ns(rt_clock) - bwidth;
bwidth = (bytes_transferred - bytes_transferred_last) / bwidth; bwidth = (bytes_transferred - bytes_transferred_last) / bwidth;
@ -371,6 +376,7 @@ int ram_load(QEMUFile *f, void *opaque, int version_id)
{ {
ram_addr_t addr; ram_addr_t addr;
int flags; int flags;
int error;
if (version_id < 3 || version_id > 4) { if (version_id < 3 || version_id > 4) {
return -EINVAL; return -EINVAL;
@ -451,8 +457,9 @@ int ram_load(QEMUFile *f, void *opaque, int version_id)
qemu_get_buffer(f, host, TARGET_PAGE_SIZE); qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
} }
if (qemu_file_has_error(f)) { error = qemu_file_get_error(f);
return -EIO; if (error) {
return error;
} }
} while (!(flags & RAM_SAVE_FLAG_EOS)); } while (!(flags & RAM_SAVE_FLAG_EOS));

View File

@ -263,7 +263,7 @@ static int mig_save_device_bulk(Monitor *mon, QEMUFile *f,
error: error:
monitor_printf(mon, "Error reading sector %" PRId64 "\n", cur_sector); monitor_printf(mon, "Error reading sector %" PRId64 "\n", cur_sector);
qemu_file_set_error(f); qemu_file_set_error(f, -EIO);
g_free(blk->buf); g_free(blk->buf);
g_free(blk); g_free(blk);
return 0; return 0;
@ -383,6 +383,7 @@ static int mig_save_device_dirty(Monitor *mon, QEMUFile *f,
int64_t total_sectors = bmds->total_sectors; int64_t total_sectors = bmds->total_sectors;
int64_t sector; int64_t sector;
int nr_sectors; int nr_sectors;
int ret = -EIO;
for (sector = bmds->cur_dirty; sector < bmds->total_sectors;) { for (sector = bmds->cur_dirty; sector < bmds->total_sectors;) {
if (bmds_aio_inflight(bmds, sector)) { if (bmds_aio_inflight(bmds, sector)) {
@ -418,8 +419,8 @@ static int mig_save_device_dirty(Monitor *mon, QEMUFile *f,
block_mig_state.submitted++; block_mig_state.submitted++;
bmds_set_aio_inflight(bmds, sector, nr_sectors, 1); bmds_set_aio_inflight(bmds, sector, nr_sectors, 1);
} else { } else {
if (bdrv_read(bmds->bs, sector, blk->buf, ret = bdrv_read(bmds->bs, sector, blk->buf, nr_sectors);
nr_sectors) < 0) { if (ret < 0) {
goto error; goto error;
} }
blk_send(f, blk); blk_send(f, blk);
@ -439,7 +440,7 @@ static int mig_save_device_dirty(Monitor *mon, QEMUFile *f,
error: error:
monitor_printf(mon, "Error reading sector %" PRId64 "\n", sector); monitor_printf(mon, "Error reading sector %" PRId64 "\n", sector);
qemu_file_set_error(f); qemu_file_set_error(f, ret);
g_free(blk->buf); g_free(blk->buf);
g_free(blk); g_free(blk);
return 0; return 0;
@ -473,7 +474,7 @@ static void flush_blks(QEMUFile* f)
break; break;
} }
if (blk->ret < 0) { if (blk->ret < 0) {
qemu_file_set_error(f); qemu_file_set_error(f, blk->ret);
break; break;
} }
blk_send(f, blk); blk_send(f, blk);
@ -556,6 +557,8 @@ static void blk_mig_cleanup(Monitor *mon)
static int block_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque) static int block_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
{ {
int ret;
DPRINTF("Enter save live stage %d submitted %d transferred %d\n", DPRINTF("Enter save live stage %d submitted %d transferred %d\n",
stage, block_mig_state.submitted, block_mig_state.transferred); stage, block_mig_state.submitted, block_mig_state.transferred);
@ -579,9 +582,10 @@ static int block_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
flush_blks(f); flush_blks(f);
if (qemu_file_has_error(f)) { ret = qemu_file_get_error(f);
if (ret) {
blk_mig_cleanup(mon); blk_mig_cleanup(mon);
return 0; return ret;
} }
blk_mig_reset_dirty_cursor(); blk_mig_reset_dirty_cursor();
@ -607,9 +611,10 @@ static int block_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
flush_blks(f); flush_blks(f);
if (qemu_file_has_error(f)) { ret = qemu_file_get_error(f);
if (ret) {
blk_mig_cleanup(mon); blk_mig_cleanup(mon);
return 0; return ret;
} }
} }
@ -624,8 +629,9 @@ static int block_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
/* report completion */ /* report completion */
qemu_put_be64(f, (100 << BDRV_SECTOR_BITS) | BLK_MIG_FLAG_PROGRESS); qemu_put_be64(f, (100 << BDRV_SECTOR_BITS) | BLK_MIG_FLAG_PROGRESS);
if (qemu_file_has_error(f)) { ret = qemu_file_get_error(f);
return 0; if (ret) {
return ret;
} }
monitor_printf(mon, "Block migration completed\n"); monitor_printf(mon, "Block migration completed\n");
@ -646,6 +652,7 @@ static int block_load(QEMUFile *f, void *opaque, int version_id)
uint8_t *buf; uint8_t *buf;
int64_t total_sectors = 0; int64_t total_sectors = 0;
int nr_sectors; int nr_sectors;
int ret;
do { do {
addr = qemu_get_be64(f); addr = qemu_get_be64(f);
@ -654,7 +661,6 @@ static int block_load(QEMUFile *f, void *opaque, int version_id)
addr >>= BDRV_SECTOR_BITS; addr >>= BDRV_SECTOR_BITS;
if (flags & BLK_MIG_FLAG_DEVICE_BLOCK) { if (flags & BLK_MIG_FLAG_DEVICE_BLOCK) {
int ret;
/* get device name */ /* get device name */
len = qemu_get_byte(f); len = qemu_get_byte(f);
qemu_get_buffer(f, (uint8_t *)device_name, len); qemu_get_buffer(f, (uint8_t *)device_name, len);
@ -704,8 +710,9 @@ static int block_load(QEMUFile *f, void *opaque, int version_id)
fprintf(stderr, "Unknown flags\n"); fprintf(stderr, "Unknown flags\n");
return -EINVAL; return -EINVAL;
} }
if (qemu_file_has_error(f)) { ret = qemu_file_get_error(f);
return -EIO; if (ret != 0) {
return ret;
} }
} while (!(flags & BLK_MIG_FLAG_EOS)); } while (!(flags & BLK_MIG_FLAG_EOS));

View File

@ -27,7 +27,6 @@ typedef struct QEMUFileBuffered
BufferedCloseFunc *close; BufferedCloseFunc *close;
void *opaque; void *opaque;
QEMUFile *file; QEMUFile *file;
int has_error;
int freeze_output; int freeze_output;
size_t bytes_xfer; size_t bytes_xfer;
size_t xfer_limit; size_t xfer_limit;
@ -72,9 +71,11 @@ static void buffered_append(QEMUFileBuffered *s,
static void buffered_flush(QEMUFileBuffered *s) static void buffered_flush(QEMUFileBuffered *s)
{ {
size_t offset = 0; size_t offset = 0;
int error;
if (s->has_error) { error = qemu_file_get_error(s->file);
DPRINTF("flush when error, bailing\n"); if (error != 0) {
DPRINTF("flush when error, bailing: %s\n", strerror(-error));
return; return;
} }
@ -93,7 +94,7 @@ static void buffered_flush(QEMUFileBuffered *s)
if (ret <= 0) { if (ret <= 0) {
DPRINTF("error flushing data, %zd\n", ret); DPRINTF("error flushing data, %zd\n", ret);
s->has_error = 1; qemu_file_set_error(s->file, ret);
break; break;
} else { } else {
DPRINTF("flushed %zd byte(s)\n", ret); DPRINTF("flushed %zd byte(s)\n", ret);
@ -109,14 +110,15 @@ static void buffered_flush(QEMUFileBuffered *s)
static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
{ {
QEMUFileBuffered *s = opaque; QEMUFileBuffered *s = opaque;
int offset = 0; int offset = 0, error;
ssize_t ret; ssize_t ret;
DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
if (s->has_error) { error = qemu_file_get_error(s->file);
DPRINTF("flush when error, bailing\n"); if (error) {
return -EINVAL; DPRINTF("flush when error, bailing: %s\n", strerror(-error));
return error;
} }
DPRINTF("unfreezing output\n"); DPRINTF("unfreezing output\n");
@ -139,7 +141,7 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
if (ret <= 0) { if (ret <= 0) {
DPRINTF("error putting\n"); DPRINTF("error putting\n");
s->has_error = 1; qemu_file_set_error(s->file, ret);
offset = -EINVAL; offset = -EINVAL;
break; break;
} }
@ -173,10 +175,10 @@ static int buffered_close(void *opaque)
DPRINTF("closing\n"); DPRINTF("closing\n");
while (!s->has_error && s->buffer_size) { while (!qemu_file_get_error(s->file) && s->buffer_size) {
buffered_flush(s); buffered_flush(s);
if (s->freeze_output) if (s->freeze_output)
s->wait_for_unfreeze(s); s->wait_for_unfreeze(s->opaque);
} }
ret = s->close(s->opaque); ret = s->close(s->opaque);
@ -189,13 +191,21 @@ static int buffered_close(void *opaque)
return ret; return ret;
} }
/*
* The meaning of the return values is:
* 0: We can continue sending
* 1: Time to stop
* negative: There has been an error
*/
static int buffered_rate_limit(void *opaque) static int buffered_rate_limit(void *opaque)
{ {
QEMUFileBuffered *s = opaque; QEMUFileBuffered *s = opaque;
int ret;
if (s->has_error) ret = qemu_file_get_error(s->file);
return 0; if (ret) {
return ret;
}
if (s->freeze_output) if (s->freeze_output)
return 1; return 1;
@ -208,9 +218,9 @@ static int buffered_rate_limit(void *opaque)
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate) static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
{ {
QEMUFileBuffered *s = opaque; QEMUFileBuffered *s = opaque;
if (s->has_error) if (qemu_file_get_error(s->file)) {
goto out; goto out;
}
if (new_rate > SIZE_MAX) { if (new_rate > SIZE_MAX) {
new_rate = SIZE_MAX; new_rate = SIZE_MAX;
} }
@ -232,7 +242,7 @@ static void buffered_rate_tick(void *opaque)
{ {
QEMUFileBuffered *s = opaque; QEMUFileBuffered *s = opaque;
if (s->has_error) { if (qemu_file_get_error(s->file)) {
buffered_close(s); buffered_close(s);
return; return;
} }

View File

@ -29,7 +29,7 @@ typedef struct {
DeviceState qdev; DeviceState qdev;
uint32_t chip_size; uint32_t chip_size;
char *filename; char *filename;
QEMUFile *file; FILE *file;
uint8_t *contents; uint8_t *contents;
} NvRamState; } NvRamState;
@ -70,9 +70,9 @@ static void nvram_writeb (void *opaque, target_phys_addr_t addr, uint32_t val)
s->contents[addr] = val; s->contents[addr] = val;
if (s->file) { if (s->file) {
qemu_fseek(s->file, addr, SEEK_SET); fseek(s->file, addr, SEEK_SET);
qemu_put_byte(s->file, (int)val); fputc(val, s->file);
qemu_fflush(s->file); fflush(s->file);
} }
} }
@ -108,15 +108,17 @@ static int nvram_post_load(void *opaque, int version_id)
/* Close file, as filename may has changed in load/store process */ /* Close file, as filename may has changed in load/store process */
if (s->file) { if (s->file) {
qemu_fclose(s->file); fclose(s->file);
} }
/* Write back nvram contents */ /* Write back nvram contents */
s->file = qemu_fopen(s->filename, "wb"); s->file = fopen(s->filename, "wb");
if (s->file) { if (s->file) {
/* Write back contents, as 'wb' mode cleaned the file */ /* Write back contents, as 'wb' mode cleaned the file */
qemu_put_buffer(s->file, s->contents, s->chip_size); if (fwrite(s->contents, s->chip_size, 1, s->file) != 1) {
qemu_fflush(s->file); printf("nvram_post_load: short write\n");
}
fflush(s->file);
} }
return 0; return 0;
@ -143,7 +145,7 @@ typedef struct {
static int nvram_sysbus_initfn(SysBusDevice *dev) static int nvram_sysbus_initfn(SysBusDevice *dev)
{ {
NvRamState *s = &FROM_SYSBUS(SysBusNvRamState, dev)->nvram; NvRamState *s = &FROM_SYSBUS(SysBusNvRamState, dev)->nvram;
QEMUFile *file; FILE *file;
int s_io; int s_io;
s->contents = g_malloc0(s->chip_size); s->contents = g_malloc0(s->chip_size);
@ -153,11 +155,13 @@ static int nvram_sysbus_initfn(SysBusDevice *dev)
sysbus_init_mmio(dev, s->chip_size, s_io); sysbus_init_mmio(dev, s->chip_size, s_io);
/* Read current file */ /* Read current file */
file = qemu_fopen(s->filename, "rb"); file = fopen(s->filename, "rb");
if (file) { if (file) {
/* Read nvram contents */ /* Read nvram contents */
qemu_get_buffer(file, s->contents, s->chip_size); if (fread(s->contents, s->chip_size, 1, file) != 1) {
qemu_fclose(file); printf("nvram_sysbus_initfn: short read\n");
}
fclose(file);
} }
nvram_post_load(s, 0); nvram_post_load(s, 0);

View File

@ -85,8 +85,8 @@ uint64_t qemu_get_be64(QEMUFile *f);
int qemu_file_rate_limit(QEMUFile *f); int qemu_file_rate_limit(QEMUFile *f);
int64_t qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate); int64_t qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate);
int64_t qemu_file_get_rate_limit(QEMUFile *f); int64_t qemu_file_get_rate_limit(QEMUFile *f);
int qemu_file_has_error(QEMUFile *f); int qemu_file_get_error(QEMUFile *f);
void qemu_file_set_error(QEMUFile *f); void qemu_file_set_error(QEMUFile *f, int error);
/* Try to send any outstanding data. This function is useful when output is /* Try to send any outstanding data. This function is useful when output is
* halted due to rate limiting or EAGAIN errors occur as it can be used to * halted due to rate limiting or EAGAIN errors occur as it can be used to

View File

@ -32,17 +32,17 @@
do { } while (0) do { } while (0)
#endif #endif
static int file_errno(FdMigrationState *s) static int file_errno(MigrationState *s)
{ {
return errno; return errno;
} }
static int file_write(FdMigrationState *s, const void * buf, size_t size) static int file_write(MigrationState *s, const void * buf, size_t size)
{ {
return write(s->fd, buf, size); return write(s->fd, buf, size);
} }
static int exec_close(FdMigrationState *s) static int exec_close(MigrationState *s)
{ {
int ret = 0; int ret = 0;
DPRINTF("exec_close\n"); DPRINTF("exec_close\n");
@ -61,22 +61,14 @@ static int exec_close(FdMigrationState *s)
return ret; return ret;
} }
MigrationState *exec_start_outgoing_migration(Monitor *mon, int exec_start_outgoing_migration(MigrationState *s, const char *command)
const char *command,
int64_t bandwidth_limit,
int detach,
int blk,
int inc)
{ {
FdMigrationState *s;
FILE *f; FILE *f;
s = g_malloc0(sizeof(*s));
f = popen(command, "w"); f = popen(command, "w");
if (f == NULL) { if (f == NULL) {
DPRINTF("Unable to popen exec target\n"); DPRINTF("Unable to popen exec target\n");
goto err_after_alloc; goto err_after_popen;
} }
s->fd = fileno(f); s->fd = fileno(f);
@ -92,29 +84,14 @@ MigrationState *exec_start_outgoing_migration(Monitor *mon,
s->close = exec_close; s->close = exec_close;
s->get_error = file_errno; s->get_error = file_errno;
s->write = file_write; s->write = file_write;
s->mig_state.cancel = migrate_fd_cancel;
s->mig_state.get_status = migrate_fd_get_status;
s->mig_state.release = migrate_fd_release;
s->mig_state.blk = blk;
s->mig_state.shared = inc;
s->state = MIG_STATE_ACTIVE;
s->mon = NULL;
s->bandwidth_limit = bandwidth_limit;
if (!detach) {
migrate_fd_monitor_suspend(s, mon);
}
migrate_fd_connect(s); migrate_fd_connect(s);
return &s->mig_state; return 0;
err_after_open: err_after_open:
pclose(f); pclose(f);
err_after_alloc: err_after_popen:
g_free(s); return -1;
return NULL;
} }
static void exec_accept_incoming_migration(void *opaque) static void exec_accept_incoming_migration(void *opaque)

View File

@ -30,17 +30,17 @@
do { } while (0) do { } while (0)
#endif #endif
static int fd_errno(FdMigrationState *s) static int fd_errno(MigrationState *s)
{ {
return errno; return errno;
} }
static int fd_write(FdMigrationState *s, const void * buf, size_t size) static int fd_write(MigrationState *s, const void * buf, size_t size)
{ {
return write(s->fd, buf, size); return write(s->fd, buf, size);
} }
static int fd_close(FdMigrationState *s) static int fd_close(MigrationState *s)
{ {
DPRINTF("fd_close\n"); DPRINTF("fd_close\n");
if (s->fd != -1) { if (s->fd != -1) {
@ -50,21 +50,12 @@ static int fd_close(FdMigrationState *s)
return 0; return 0;
} }
MigrationState *fd_start_outgoing_migration(Monitor *mon, int fd_start_outgoing_migration(MigrationState *s, const char *fdname)
const char *fdname,
int64_t bandwidth_limit,
int detach,
int blk,
int inc)
{ {
FdMigrationState *s; s->fd = monitor_get_fd(s->mon, fdname);
s = g_malloc0(sizeof(*s));
s->fd = monitor_get_fd(mon, fdname);
if (s->fd == -1) { if (s->fd == -1) {
DPRINTF("fd_migration: invalid file descriptor identifier\n"); DPRINTF("fd_migration: invalid file descriptor identifier\n");
goto err_after_alloc; goto err_after_get_fd;
} }
if (fcntl(s->fd, F_SETFL, O_NONBLOCK) == -1) { if (fcntl(s->fd, F_SETFL, O_NONBLOCK) == -1) {
@ -75,29 +66,14 @@ MigrationState *fd_start_outgoing_migration(Monitor *mon,
s->get_error = fd_errno; s->get_error = fd_errno;
s->write = fd_write; s->write = fd_write;
s->close = fd_close; s->close = fd_close;
s->mig_state.cancel = migrate_fd_cancel;
s->mig_state.get_status = migrate_fd_get_status;
s->mig_state.release = migrate_fd_release;
s->mig_state.blk = blk;
s->mig_state.shared = inc;
s->state = MIG_STATE_ACTIVE;
s->mon = NULL;
s->bandwidth_limit = bandwidth_limit;
if (!detach) {
migrate_fd_monitor_suspend(s, mon);
}
migrate_fd_connect(s); migrate_fd_connect(s);
return &s->mig_state; return 0;
err_after_open: err_after_open:
close(s->fd); close(s->fd);
err_after_alloc: err_after_get_fd:
g_free(s); return -1;
return NULL;
} }
static void fd_accept_incoming_migration(void *opaque) static void fd_accept_incoming_migration(void *opaque)

View File

@ -28,17 +28,17 @@
do { } while (0) do { } while (0)
#endif #endif
static int socket_errno(FdMigrationState *s) static int socket_errno(MigrationState *s)
{ {
return socket_error(); return socket_error();
} }
static int socket_write(FdMigrationState *s, const void * buf, size_t size) static int socket_write(MigrationState *s, const void * buf, size_t size)
{ {
return send(s->fd, buf, size, 0); return send(s->fd, buf, size, 0);
} }
static int tcp_close(FdMigrationState *s) static int tcp_close(MigrationState *s)
{ {
DPRINTF("tcp_close\n"); DPRINTF("tcp_close\n");
if (s->fd != -1) { if (s->fd != -1) {
@ -48,17 +48,16 @@ static int tcp_close(FdMigrationState *s)
return 0; return 0;
} }
static void tcp_wait_for_connect(void *opaque) static void tcp_wait_for_connect(void *opaque)
{ {
FdMigrationState *s = opaque; MigrationState *s = opaque;
int val, ret; int val, ret;
socklen_t valsize = sizeof(val); socklen_t valsize = sizeof(val);
DPRINTF("connect completed\n"); DPRINTF("connect completed\n");
do { do {
ret = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, (void *) &val, &valsize); ret = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, (void *) &val, &valsize);
} while (ret == -1 && (s->get_error(s)) == EINTR); } while (ret == -1 && (socket_error()) == EINTR);
if (ret < 0) { if (ret < 0) {
migrate_fd_error(s); migrate_fd_error(s);
@ -75,63 +74,46 @@ static void tcp_wait_for_connect(void *opaque)
} }
} }
MigrationState *tcp_start_outgoing_migration(Monitor *mon, int tcp_start_outgoing_migration(MigrationState *s, const char *host_port)
const char *host_port,
int64_t bandwidth_limit,
int detach,
int blk,
int inc)
{ {
struct sockaddr_in addr; struct sockaddr_in addr;
FdMigrationState *s;
int ret; int ret;
if (parse_host_port(&addr, host_port) < 0) ret = parse_host_port(&addr, host_port);
return NULL; if (ret < 0) {
return ret;
s = g_malloc0(sizeof(*s)); }
s->get_error = socket_errno; s->get_error = socket_errno;
s->write = socket_write; s->write = socket_write;
s->close = tcp_close; s->close = tcp_close;
s->mig_state.cancel = migrate_fd_cancel;
s->mig_state.get_status = migrate_fd_get_status;
s->mig_state.release = migrate_fd_release;
s->mig_state.blk = blk;
s->mig_state.shared = inc;
s->state = MIG_STATE_ACTIVE;
s->mon = NULL;
s->bandwidth_limit = bandwidth_limit;
s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
if (s->fd == -1) { if (s->fd == -1) {
g_free(s); DPRINTF("Unable to open socket");
return NULL; return -socket_error();
} }
socket_set_nonblock(s->fd); socket_set_nonblock(s->fd);
if (!detach) {
migrate_fd_monitor_suspend(s, mon);
}
do { do {
ret = connect(s->fd, (struct sockaddr *)&addr, sizeof(addr)); ret = connect(s->fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret == -1) if (ret == -1) {
ret = -(s->get_error(s)); ret = -socket_error();
}
if (ret == -EINPROGRESS || ret == -EWOULDBLOCK) if (ret == -EINPROGRESS || ret == -EWOULDBLOCK) {
qemu_set_fd_handler2(s->fd, NULL, NULL, tcp_wait_for_connect, s); qemu_set_fd_handler2(s->fd, NULL, NULL, tcp_wait_for_connect, s);
return 0;
}
} while (ret == -EINTR); } while (ret == -EINTR);
if (ret < 0 && ret != -EINPROGRESS && ret != -EWOULDBLOCK) { if (ret < 0) {
DPRINTF("connect failed\n"); DPRINTF("connect failed\n");
migrate_fd_error(s); migrate_fd_error(s);
} else if (ret >= 0) return ret;
}
migrate_fd_connect(s); migrate_fd_connect(s);
return 0;
return &s->mig_state;
} }
static void tcp_accept_incoming_migration(void *opaque) static void tcp_accept_incoming_migration(void *opaque)
@ -174,23 +156,27 @@ int tcp_start_incoming_migration(const char *host_port)
int val; int val;
int s; int s;
DPRINTF("Attempting to start an incoming migration\n");
if (parse_host_port(&addr, host_port) < 0) { if (parse_host_port(&addr, host_port) < 0) {
fprintf(stderr, "invalid host/port combination: %s\n", host_port); fprintf(stderr, "invalid host/port combination: %s\n", host_port);
return -EINVAL; return -EINVAL;
} }
s = qemu_socket(PF_INET, SOCK_STREAM, 0); s = qemu_socket(PF_INET, SOCK_STREAM, 0);
if (s == -1) if (s == -1) {
return -socket_error(); return -socket_error();
}
val = 1; val = 1;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val)); setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val));
if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
goto err; goto err;
}
if (listen(s, 1) == -1) if (listen(s, 1) == -1) {
goto err; goto err;
}
qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL,
(void *)(intptr_t)s); (void *)(intptr_t)s);

View File

@ -28,17 +28,17 @@
do { } while (0) do { } while (0)
#endif #endif
static int unix_errno(FdMigrationState *s) static int unix_errno(MigrationState *s)
{ {
return errno; return errno;
} }
static int unix_write(FdMigrationState *s, const void * buf, size_t size) static int unix_write(MigrationState *s, const void * buf, size_t size)
{ {
return write(s->fd, buf, size); return write(s->fd, buf, size);
} }
static int unix_close(FdMigrationState *s) static int unix_close(MigrationState *s)
{ {
DPRINTF("unix_close\n"); DPRINTF("unix_close\n");
if (s->fd != -1) { if (s->fd != -1) {
@ -50,14 +50,14 @@ static int unix_close(FdMigrationState *s)
static void unix_wait_for_connect(void *opaque) static void unix_wait_for_connect(void *opaque)
{ {
FdMigrationState *s = opaque; MigrationState *s = opaque;
int val, ret; int val, ret;
socklen_t valsize = sizeof(val); socklen_t valsize = sizeof(val);
DPRINTF("connect completed\n"); DPRINTF("connect completed\n");
do { do {
ret = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, (void *) &val, &valsize); ret = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, (void *) &val, &valsize);
} while (ret == -1 && (s->get_error(s)) == EINTR); } while (ret == -1 && errno == EINTR);
if (ret < 0) { if (ret < 0) {
migrate_fd_error(s); migrate_fd_error(s);
@ -74,72 +74,43 @@ static void unix_wait_for_connect(void *opaque)
} }
} }
MigrationState *unix_start_outgoing_migration(Monitor *mon, int unix_start_outgoing_migration(MigrationState *s, const char *path)
const char *path,
int64_t bandwidth_limit,
int detach,
int blk,
int inc)
{ {
FdMigrationState *s;
struct sockaddr_un addr; struct sockaddr_un addr;
int ret; int ret;
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path); snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path);
s = g_malloc0(sizeof(*s));
s->get_error = unix_errno; s->get_error = unix_errno;
s->write = unix_write; s->write = unix_write;
s->close = unix_close; s->close = unix_close;
s->mig_state.cancel = migrate_fd_cancel;
s->mig_state.get_status = migrate_fd_get_status;
s->mig_state.release = migrate_fd_release;
s->mig_state.blk = blk;
s->mig_state.shared = inc;
s->state = MIG_STATE_ACTIVE;
s->mon = NULL;
s->bandwidth_limit = bandwidth_limit;
s->fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0); s->fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
if (s->fd < 0) { if (s->fd == -1) {
DPRINTF("Unable to open socket"); DPRINTF("Unable to open socket");
goto err_after_alloc; return -errno;
} }
socket_set_nonblock(s->fd); socket_set_nonblock(s->fd);
do { do {
ret = connect(s->fd, (struct sockaddr *)&addr, sizeof(addr)); ret = connect(s->fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret == -1) if (ret == -1) {
ret = -(s->get_error(s)); ret = -errno;
}
if (ret == -EINPROGRESS || ret == -EWOULDBLOCK) if (ret == -EINPROGRESS || ret == -EWOULDBLOCK) {
qemu_set_fd_handler2(s->fd, NULL, NULL, unix_wait_for_connect, s); qemu_set_fd_handler2(s->fd, NULL, NULL, unix_wait_for_connect, s);
return 0;
}
} while (ret == -EINTR); } while (ret == -EINTR);
if (ret < 0 && ret != -EINPROGRESS && ret != -EWOULDBLOCK) { if (ret < 0) {
DPRINTF("connect failed\n"); DPRINTF("connect failed\n");
goto err_after_open; migrate_fd_error(s);
return ret;
} }
if (!detach) {
migrate_fd_monitor_suspend(s, mon);
}
if (ret >= 0)
migrate_fd_connect(s); migrate_fd_connect(s);
return 0;
return &s->mig_state;
err_after_open:
close(s->fd);
err_after_alloc:
g_free(s);
return NULL;
} }
static void unix_accept_incoming_migration(void *opaque) static void unix_accept_incoming_migration(void *opaque)
@ -152,13 +123,13 @@ static void unix_accept_incoming_migration(void *opaque)
do { do {
c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen); c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
} while (c == -1 && socket_error() == EINTR); } while (c == -1 && errno == EINTR);
DPRINTF("accepted migration\n"); DPRINTF("accepted migration\n");
if (c == -1) { if (c == -1) {
fprintf(stderr, "could not accept migration connection\n"); fprintf(stderr, "could not accept migration connection\n");
return; goto out2;
} }
f = qemu_fopen_socket(c); f = qemu_fopen_socket(c);
@ -170,45 +141,49 @@ static void unix_accept_incoming_migration(void *opaque)
process_incoming_migration(f); process_incoming_migration(f);
qemu_fclose(f); qemu_fclose(f);
out: out:
close(c);
out2:
qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL);
close(s); close(s);
close(c);
} }
int unix_start_incoming_migration(const char *path) int unix_start_incoming_migration(const char *path)
{ {
struct sockaddr_un un; struct sockaddr_un addr;
int sock; int s;
int ret;
DPRINTF("Attempting to start an incoming migration\n"); DPRINTF("Attempting to start an incoming migration\n");
sock = qemu_socket(PF_UNIX, SOCK_STREAM, 0); s = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
if (sock < 0) { if (s == -1) {
fprintf(stderr, "Could not open unix socket: %s\n", strerror(errno)); fprintf(stderr, "Could not open unix socket: %s\n", strerror(errno));
return -EINVAL; return -errno;
} }
memset(&un, 0, sizeof(un)); memset(&addr, 0, sizeof(addr));
un.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
snprintf(un.sun_path, sizeof(un.sun_path), "%s", path); snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path);
unlink(un.sun_path); unlink(addr.sun_path);
if (bind(sock, (struct sockaddr*) &un, sizeof(un)) < 0) { if (bind(s, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
fprintf(stderr, "bind(unix:%s): %s\n", un.sun_path, strerror(errno)); ret = -errno;
fprintf(stderr, "bind(unix:%s): %s\n", addr.sun_path, strerror(errno));
goto err; goto err;
} }
if (listen(sock, 1) < 0) { if (listen(s, 1) == -1) {
fprintf(stderr, "listen(unix:%s): %s\n", un.sun_path, strerror(errno)); fprintf(stderr, "listen(unix:%s): %s\n", addr.sun_path,
strerror(errno));
ret = -errno;
goto err; goto err;
} }
qemu_set_fd_handler2(sock, NULL, unix_accept_incoming_migration, NULL, qemu_set_fd_handler2(s, NULL, unix_accept_incoming_migration, NULL,
(void *)(intptr_t)sock); (void *)(intptr_t)s);
return 0; return 0;
err: err:
close(sock); close(s);
return ret;
return -EINVAL;
} }

View File

@ -31,14 +31,33 @@
do { } while (0) do { } while (0)
#endif #endif
/* Migration speed throttling */ enum {
static int64_t max_throttle = (32 << 20); MIG_STATE_ERROR,
MIG_STATE_SETUP,
MIG_STATE_CANCELLED,
MIG_STATE_ACTIVE,
MIG_STATE_COMPLETED,
};
static MigrationState *current_migration; #define MAX_THROTTLE (32 << 20) /* Migration speed throttling */
static NotifierList migration_state_notifiers = static NotifierList migration_state_notifiers =
NOTIFIER_LIST_INITIALIZER(migration_state_notifiers); NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
/* When we add fault tolerance, we could have several
migrations at once. For now we don't need to add
dynamic creation of migration */
static MigrationState *migrate_get_current(void)
{
static MigrationState current_migration = {
.state = MIG_STATE_SETUP,
.bandwidth_limit = MAX_THROTTLE,
};
return &current_migration;
}
int qemu_start_incoming_migration(const char *uri) int qemu_start_incoming_migration(const char *uri)
{ {
const char *p; const char *p;
@ -77,87 +96,6 @@ void process_incoming_migration(QEMUFile *f)
} }
} }
int do_migrate(Monitor *mon, const QDict *qdict, QObject **ret_data)
{
MigrationState *s = NULL;
const char *p;
int detach = qdict_get_try_bool(qdict, "detach", 0);
int blk = qdict_get_try_bool(qdict, "blk", 0);
int inc = qdict_get_try_bool(qdict, "inc", 0);
const char *uri = qdict_get_str(qdict, "uri");
if (current_migration &&
current_migration->get_status(current_migration) == MIG_STATE_ACTIVE) {
monitor_printf(mon, "migration already in progress\n");
return -1;
}
if (qemu_savevm_state_blocked(mon)) {
return -1;
}
if (strstart(uri, "tcp:", &p)) {
s = tcp_start_outgoing_migration(mon, p, max_throttle, detach,
blk, inc);
#if !defined(WIN32)
} else if (strstart(uri, "exec:", &p)) {
s = exec_start_outgoing_migration(mon, p, max_throttle, detach,
blk, inc);
} else if (strstart(uri, "unix:", &p)) {
s = unix_start_outgoing_migration(mon, p, max_throttle, detach,
blk, inc);
} else if (strstart(uri, "fd:", &p)) {
s = fd_start_outgoing_migration(mon, p, max_throttle, detach,
blk, inc);
#endif
} else {
monitor_printf(mon, "unknown migration protocol: %s\n", uri);
return -1;
}
if (s == NULL) {
monitor_printf(mon, "migration failed\n");
return -1;
}
if (current_migration) {
current_migration->release(current_migration);
}
current_migration = s;
notifier_list_notify(&migration_state_notifiers, NULL);
return 0;
}
int do_migrate_cancel(Monitor *mon, const QDict *qdict, QObject **ret_data)
{
MigrationState *s = current_migration;
if (s)
s->cancel(s);
return 0;
}
int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data)
{
int64_t d;
FdMigrationState *s;
d = qdict_get_int(qdict, "value");
if (d < 0) {
d = 0;
}
max_throttle = d;
s = migrate_to_fms(current_migration);
if (s && s->file) {
qemu_file_set_rate_limit(s->file, max_throttle);
}
return 0;
}
/* amount of nanoseconds we are willing to wait for migration to be down. /* amount of nanoseconds we are willing to wait for migration to be down.
* the choice of nanoseconds is because it is the maximum resolution that * the choice of nanoseconds is because it is the maximum resolution that
* get_clock() can achieve. It is an internal measure. All user-visible * get_clock() can achieve. It is an internal measure. All user-visible
@ -169,18 +107,6 @@ uint64_t migrate_max_downtime(void)
return max_downtime; return max_downtime;
} }
int do_migrate_set_downtime(Monitor *mon, const QDict *qdict,
QObject **ret_data)
{
double d;
d = qdict_get_double(qdict, "value") * 1e9;
d = MAX(0, MIN(UINT64_MAX, d));
max_downtime = (uint64_t)d;
return 0;
}
static void migrate_print_status(Monitor *mon, const char *name, static void migrate_print_status(Monitor *mon, const char *name,
const QDict *status_dict) const QDict *status_dict)
{ {
@ -228,10 +154,12 @@ static void migrate_put_status(QDict *qdict, const char *name,
void do_info_migrate(Monitor *mon, QObject **ret_data) void do_info_migrate(Monitor *mon, QObject **ret_data)
{ {
QDict *qdict; QDict *qdict;
MigrationState *s = current_migration; MigrationState *s = migrate_get_current();
if (s) { switch (s->state) {
switch (s->get_status(s)) { case MIG_STATE_SETUP:
/* no migration has happened ever */
break;
case MIG_STATE_ACTIVE: case MIG_STATE_ACTIVE:
qdict = qdict_new(); qdict = qdict_new();
qdict_put(qdict, "status", qstring_from_str("active")); qdict_put(qdict, "status", qstring_from_str("active"));
@ -258,11 +186,10 @@ void do_info_migrate(Monitor *mon, QObject **ret_data)
break; break;
} }
} }
}
/* shared migration helpers */ /* shared migration helpers */
void migrate_fd_monitor_suspend(FdMigrationState *s, Monitor *mon) static void migrate_fd_monitor_suspend(MigrationState *s, Monitor *mon)
{ {
s->mon = mon; s->mon = mon;
if (monitor_suspend(mon) == 0) { if (monitor_suspend(mon) == 0) {
@ -273,15 +200,7 @@ void migrate_fd_monitor_suspend(FdMigrationState *s, Monitor *mon)
} }
} }
void migrate_fd_error(FdMigrationState *s) static int migrate_fd_cleanup(MigrationState *s)
{
DPRINTF("setting error state\n");
s->state = MIG_STATE_ERROR;
notifier_list_notify(&migration_state_notifiers, NULL);
migrate_fd_cleanup(s);
}
int migrate_fd_cleanup(FdMigrationState *s)
{ {
int ret = 0; int ret = 0;
@ -307,19 +226,47 @@ int migrate_fd_cleanup(FdMigrationState *s)
return ret; return ret;
} }
void migrate_fd_put_notify(void *opaque) void migrate_fd_error(MigrationState *s)
{ {
FdMigrationState *s = opaque; DPRINTF("setting error state\n");
s->state = MIG_STATE_ERROR;
notifier_list_notify(&migration_state_notifiers, s);
migrate_fd_cleanup(s);
}
static void migrate_fd_completed(MigrationState *s)
{
DPRINTF("setting completed state\n");
if (migrate_fd_cleanup(s) < 0) {
s->state = MIG_STATE_ERROR;
} else {
s->state = MIG_STATE_COMPLETED;
runstate_set(RUN_STATE_POSTMIGRATE);
}
notifier_list_notify(&migration_state_notifiers, s);
}
static void migrate_fd_put_notify(void *opaque)
{
MigrationState *s = opaque;
qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
qemu_file_put_notify(s->file); qemu_file_put_notify(s->file);
if (qemu_file_get_error(s->file)) {
migrate_fd_error(s);
}
} }
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) static ssize_t migrate_fd_put_buffer(void *opaque, const void *data,
size_t size)
{ {
FdMigrationState *s = opaque; MigrationState *s = opaque;
ssize_t ret; ssize_t ret;
if (s->state != MIG_STATE_ACTIVE) {
return -EIO;
}
do { do {
ret = s->write(s, data, size); ret = s->write(s, data, size);
} while (ret == -1 && ((s->get_error(s)) == EINTR)); } while (ret == -1 && ((s->get_error(s)) == EINTR));
@ -329,115 +276,61 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
if (ret == -EAGAIN) { if (ret == -EAGAIN) {
qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
} else if (ret < 0) {
s->state = MIG_STATE_ERROR;
notifier_list_notify(&migration_state_notifiers, NULL);
} }
return ret; return ret;
} }
void migrate_fd_connect(FdMigrationState *s) static void migrate_fd_put_ready(void *opaque)
{ {
MigrationState *s = opaque;
int ret; int ret;
s->file = qemu_fopen_ops_buffered(s,
s->bandwidth_limit,
migrate_fd_put_buffer,
migrate_fd_put_ready,
migrate_fd_wait_for_unfreeze,
migrate_fd_close);
DPRINTF("beginning savevm\n");
ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
s->mig_state.shared);
if (ret < 0) {
DPRINTF("failed, %d\n", ret);
migrate_fd_error(s);
return;
}
migrate_fd_put_ready(s);
}
void migrate_fd_put_ready(void *opaque)
{
FdMigrationState *s = opaque;
if (s->state != MIG_STATE_ACTIVE) { if (s->state != MIG_STATE_ACTIVE) {
DPRINTF("put_ready returning because of non-active state\n"); DPRINTF("put_ready returning because of non-active state\n");
return; return;
} }
DPRINTF("iterate\n"); DPRINTF("iterate\n");
if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { ret = qemu_savevm_state_iterate(s->mon, s->file);
int state; if (ret < 0) {
migrate_fd_error(s);
} else if (ret == 1) {
int old_vm_running = runstate_is_running(); int old_vm_running = runstate_is_running();
DPRINTF("done iterating\n"); DPRINTF("done iterating\n");
vm_stop_force_state(RUN_STATE_FINISH_MIGRATE); vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) { if (qemu_savevm_state_complete(s->mon, s->file) < 0) {
if (old_vm_running) { migrate_fd_error(s);
vm_start();
}
state = MIG_STATE_ERROR;
} else { } else {
state = MIG_STATE_COMPLETED; migrate_fd_completed(s);
} }
if (migrate_fd_cleanup(s) < 0) { if (s->state != MIG_STATE_COMPLETED) {
if (old_vm_running) { if (old_vm_running) {
vm_start(); vm_start();
} }
state = MIG_STATE_ERROR;
} }
if (state == MIG_STATE_COMPLETED) {
runstate_set(RUN_STATE_POSTMIGRATE);
}
s->state = state;
notifier_list_notify(&migration_state_notifiers, NULL);
} }
} }
int migrate_fd_get_status(MigrationState *mig_state) static void migrate_fd_cancel(MigrationState *s)
{ {
FdMigrationState *s = migrate_to_fms(mig_state);
return s->state;
}
void migrate_fd_cancel(MigrationState *mig_state)
{
FdMigrationState *s = migrate_to_fms(mig_state);
if (s->state != MIG_STATE_ACTIVE) if (s->state != MIG_STATE_ACTIVE)
return; return;
DPRINTF("cancelling migration\n"); DPRINTF("cancelling migration\n");
s->state = MIG_STATE_CANCELLED; s->state = MIG_STATE_CANCELLED;
notifier_list_notify(&migration_state_notifiers, NULL); notifier_list_notify(&migration_state_notifiers, s);
qemu_savevm_state_cancel(s->mon, s->file); qemu_savevm_state_cancel(s->mon, s->file);
migrate_fd_cleanup(s); migrate_fd_cleanup(s);
} }
void migrate_fd_release(MigrationState *mig_state) static void migrate_fd_wait_for_unfreeze(void *opaque)
{ {
FdMigrationState *s = migrate_to_fms(mig_state); MigrationState *s = opaque;
DPRINTF("releasing state\n");
if (s->state == MIG_STATE_ACTIVE) {
s->state = MIG_STATE_CANCELLED;
notifier_list_notify(&migration_state_notifiers, NULL);
migrate_fd_cleanup(s);
}
g_free(s);
}
void migrate_fd_wait_for_unfreeze(void *opaque)
{
FdMigrationState *s = opaque;
int ret; int ret;
DPRINTF("wait for unfreeze\n"); DPRINTF("wait for unfreeze\n");
@ -452,11 +345,15 @@ void migrate_fd_wait_for_unfreeze(void *opaque)
ret = select(s->fd + 1, NULL, &wfds, NULL, NULL); ret = select(s->fd + 1, NULL, &wfds, NULL, NULL);
} while (ret == -1 && (s->get_error(s)) == EINTR); } while (ret == -1 && (s->get_error(s)) == EINTR);
if (ret == -1) {
qemu_file_set_error(s->file, -s->get_error(s));
}
} }
int migrate_fd_close(void *opaque) static int migrate_fd_close(void *opaque)
{ {
FdMigrationState *s = opaque; MigrationState *s = opaque;
if (s->mon) { if (s->mon) {
monitor_resume(s->mon); monitor_resume(s->mon);
@ -475,11 +372,129 @@ void remove_migration_state_change_notifier(Notifier *notify)
notifier_list_remove(&migration_state_notifiers, notify); notifier_list_remove(&migration_state_notifiers, notify);
} }
int get_migration_state(void) bool migration_has_finished(MigrationState *s)
{ {
if (current_migration) { return s->state == MIG_STATE_COMPLETED;
return migrate_fd_get_status(current_migration); }
void migrate_fd_connect(MigrationState *s)
{
int ret;
s->state = MIG_STATE_ACTIVE;
s->file = qemu_fopen_ops_buffered(s,
s->bandwidth_limit,
migrate_fd_put_buffer,
migrate_fd_put_ready,
migrate_fd_wait_for_unfreeze,
migrate_fd_close);
DPRINTF("beginning savevm\n");
ret = qemu_savevm_state_begin(s->mon, s->file, s->blk, s->shared);
if (ret < 0) {
DPRINTF("failed, %d\n", ret);
migrate_fd_error(s);
return;
}
migrate_fd_put_ready(s);
}
static MigrationState *migrate_init(Monitor *mon, int detach, int blk, int inc)
{
MigrationState *s = migrate_get_current();
int64_t bandwidth_limit = s->bandwidth_limit;
memset(s, 0, sizeof(*s));
s->bandwidth_limit = bandwidth_limit;
s->blk = blk;
s->shared = inc;
s->mon = NULL;
s->bandwidth_limit = bandwidth_limit;
s->state = MIG_STATE_SETUP;
if (!detach) {
migrate_fd_monitor_suspend(s, mon);
}
return s;
}
int do_migrate(Monitor *mon, const QDict *qdict, QObject **ret_data)
{
MigrationState *s = migrate_get_current();
const char *p;
int detach = qdict_get_try_bool(qdict, "detach", 0);
int blk = qdict_get_try_bool(qdict, "blk", 0);
int inc = qdict_get_try_bool(qdict, "inc", 0);
const char *uri = qdict_get_str(qdict, "uri");
int ret;
if (s->state == MIG_STATE_ACTIVE) {
monitor_printf(mon, "migration already in progress\n");
return -1;
}
if (qemu_savevm_state_blocked(mon)) {
return -1;
}
s = migrate_init(mon, detach, blk, inc);
if (strstart(uri, "tcp:", &p)) {
ret = tcp_start_outgoing_migration(s, p);
#if !defined(WIN32)
} else if (strstart(uri, "exec:", &p)) {
ret = exec_start_outgoing_migration(s, p);
} else if (strstart(uri, "unix:", &p)) {
ret = unix_start_outgoing_migration(s, p);
} else if (strstart(uri, "fd:", &p)) {
ret = fd_start_outgoing_migration(s, p);
#endif
} else { } else {
return MIG_STATE_ERROR; monitor_printf(mon, "unknown migration protocol: %s\n", uri);
ret = -EINVAL;
} }
if (ret < 0) {
monitor_printf(mon, "migration failed: %s\n", strerror(-ret));
return ret;
}
notifier_list_notify(&migration_state_notifiers, s);
return 0;
}
int do_migrate_cancel(Monitor *mon, const QDict *qdict, QObject **ret_data)
{
migrate_fd_cancel(migrate_get_current());
return 0;
}
int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data)
{
int64_t d;
MigrationState *s;
d = qdict_get_int(qdict, "value");
if (d < 0) {
d = 0;
}
s = migrate_get_current();
s->bandwidth_limit = d;
qemu_file_set_rate_limit(s->file, s->bandwidth_limit);
return 0;
}
int do_migrate_set_downtime(Monitor *mon, const QDict *qdict,
QObject **ret_data)
{
double d;
d = qdict_get_double(qdict, "value") * 1e9;
d = MAX(0, MIN(UINT64_MAX, d));
max_downtime = (uint64_t)d;
return 0;
} }

View File

@ -18,37 +18,21 @@
#include "qemu-common.h" #include "qemu-common.h"
#include "notify.h" #include "notify.h"
#define MIG_STATE_ERROR -1
#define MIG_STATE_COMPLETED 0
#define MIG_STATE_CANCELLED 1
#define MIG_STATE_ACTIVE 2
typedef struct MigrationState MigrationState; typedef struct MigrationState MigrationState;
struct MigrationState struct MigrationState
{ {
/* FIXME: add more accessors to print migration info */
void (*cancel)(MigrationState *s);
int (*get_status)(MigrationState *s);
void (*release)(MigrationState *s);
int blk;
int shared;
};
typedef struct FdMigrationState FdMigrationState;
struct FdMigrationState
{
MigrationState mig_state;
int64_t bandwidth_limit; int64_t bandwidth_limit;
QEMUFile *file; QEMUFile *file;
int fd; int fd;
Monitor *mon; Monitor *mon;
int state; int state;
int (*get_error)(struct FdMigrationState*); int (*get_error)(MigrationState *s);
int (*close)(struct FdMigrationState*); int (*close)(MigrationState *s);
int (*write)(struct FdMigrationState*, const void *, size_t); int (*write)(MigrationState *s, const void *buff, size_t size);
void *opaque; void *opaque;
int blk;
int shared;
}; };
void process_incoming_migration(QEMUFile *f); void process_incoming_migration(QEMUFile *f);
@ -72,72 +56,27 @@ void do_info_migrate(Monitor *mon, QObject **ret_data);
int exec_start_incoming_migration(const char *host_port); int exec_start_incoming_migration(const char *host_port);
MigrationState *exec_start_outgoing_migration(Monitor *mon, int exec_start_outgoing_migration(MigrationState *s, const char *host_port);
const char *host_port,
int64_t bandwidth_limit,
int detach,
int blk,
int inc);
int tcp_start_incoming_migration(const char *host_port); int tcp_start_incoming_migration(const char *host_port);
MigrationState *tcp_start_outgoing_migration(Monitor *mon, int tcp_start_outgoing_migration(MigrationState *s, const char *host_port);
const char *host_port,
int64_t bandwidth_limit,
int detach,
int blk,
int inc);
int unix_start_incoming_migration(const char *path); int unix_start_incoming_migration(const char *path);
MigrationState *unix_start_outgoing_migration(Monitor *mon, int unix_start_outgoing_migration(MigrationState *s, const char *path);
const char *path,
int64_t bandwidth_limit,
int detach,
int blk,
int inc);
int fd_start_incoming_migration(const char *path); int fd_start_incoming_migration(const char *path);
MigrationState *fd_start_outgoing_migration(Monitor *mon, int fd_start_outgoing_migration(MigrationState *s, const char *fdname);
const char *fdname,
int64_t bandwidth_limit,
int detach,
int blk,
int inc);
void migrate_fd_monitor_suspend(FdMigrationState *s, Monitor *mon); void migrate_fd_error(MigrationState *s);
void migrate_fd_error(FdMigrationState *s); void migrate_fd_connect(MigrationState *s);
int migrate_fd_cleanup(FdMigrationState *s);
void migrate_fd_put_notify(void *opaque);
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
void migrate_fd_connect(FdMigrationState *s);
void migrate_fd_put_ready(void *opaque);
int migrate_fd_get_status(MigrationState *mig_state);
void migrate_fd_cancel(MigrationState *mig_state);
void migrate_fd_release(MigrationState *mig_state);
void migrate_fd_wait_for_unfreeze(void *opaque);
int migrate_fd_close(void *opaque);
static inline FdMigrationState *migrate_to_fms(MigrationState *mig_state)
{
return container_of(mig_state, FdMigrationState, mig_state);
}
void add_migration_state_change_notifier(Notifier *notify); void add_migration_state_change_notifier(Notifier *notify);
void remove_migration_state_change_notifier(Notifier *notify); void remove_migration_state_change_notifier(Notifier *notify);
int get_migration_state(void); bool migration_has_finished(MigrationState *);
uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_transferred(void); uint64_t ram_bytes_transferred(void);

236
savevm.c
View File

@ -173,7 +173,7 @@ struct QEMUFile {
int buf_size; /* 0 when writing */ int buf_size; /* 0 when writing */
uint8_t buf[IO_BUF_SIZE]; uint8_t buf[IO_BUF_SIZE];
int has_error; int last_error;
}; };
typedef struct QEMUFileStdio typedef struct QEMUFileStdio
@ -425,14 +425,14 @@ QEMUFile *qemu_fopen_ops(void *opaque, QEMUFilePutBufferFunc *put_buffer,
return f; return f;
} }
int qemu_file_has_error(QEMUFile *f) int qemu_file_get_error(QEMUFile *f)
{ {
return f->has_error; return f->last_error;
} }
void qemu_file_set_error(QEMUFile *f) void qemu_file_set_error(QEMUFile *f, int ret)
{ {
f->has_error = 1; f->last_error = ret;
} }
void qemu_fflush(QEMUFile *f) void qemu_fflush(QEMUFile *f)
@ -447,7 +447,7 @@ void qemu_fflush(QEMUFile *f)
if (len > 0) if (len > 0)
f->buf_offset += f->buf_index; f->buf_offset += f->buf_index;
else else
f->has_error = 1; f->last_error = -EINVAL;
f->buf_index = 0; f->buf_index = 0;
} }
} }
@ -455,6 +455,7 @@ void qemu_fflush(QEMUFile *f)
static void qemu_fill_buffer(QEMUFile *f) static void qemu_fill_buffer(QEMUFile *f)
{ {
int len; int len;
int pending;
if (!f->get_buffer) if (!f->get_buffer)
return; return;
@ -462,13 +463,20 @@ static void qemu_fill_buffer(QEMUFile *f)
if (f->is_write) if (f->is_write)
abort(); abort();
len = f->get_buffer(f->opaque, f->buf, f->buf_offset, IO_BUF_SIZE); pending = f->buf_size - f->buf_index;
if (len > 0) { if (pending > 0) {
memmove(f->buf, f->buf + f->buf_index, pending);
}
f->buf_index = 0; f->buf_index = 0;
f->buf_size = len; f->buf_size = pending;
len = f->get_buffer(f->opaque, f->buf + pending, f->buf_offset,
IO_BUF_SIZE - pending);
if (len > 0) {
f->buf_size += len;
f->buf_offset += len; f->buf_offset += len;
} else if (len != -EAGAIN) } else if (len != -EAGAIN)
f->has_error = 1; f->last_error = len;
} }
int qemu_fclose(QEMUFile *f) int qemu_fclose(QEMUFile *f)
@ -490,13 +498,13 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
{ {
int l; int l;
if (!f->has_error && f->is_write == 0 && f->buf_index > 0) { if (!f->last_error && f->is_write == 0 && f->buf_index > 0) {
fprintf(stderr, fprintf(stderr,
"Attempted to write to buffer while read buffer is not empty\n"); "Attempted to write to buffer while read buffer is not empty\n");
abort(); abort();
} }
while (!f->has_error && size > 0) { while (!f->last_error && size > 0) {
l = IO_BUF_SIZE - f->buf_index; l = IO_BUF_SIZE - f->buf_index;
if (l > size) if (l > size)
l = size; l = size;
@ -512,7 +520,7 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
void qemu_put_byte(QEMUFile *f, int v) void qemu_put_byte(QEMUFile *f, int v)
{ {
if (!f->has_error && f->is_write == 0 && f->buf_index > 0) { if (!f->last_error && f->is_write == 0 && f->buf_index > 0) {
fprintf(stderr, fprintf(stderr,
"Attempted to write to buffer while read buffer is not empty\n"); "Attempted to write to buffer while read buffer is not empty\n");
abort(); abort();
@ -524,56 +532,86 @@ void qemu_put_byte(QEMUFile *f, int v)
qemu_fflush(f); qemu_fflush(f);
} }
int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size1) static void qemu_file_skip(QEMUFile *f, int size)
{ {
int size, l; if (f->buf_index + size <= f->buf_size) {
f->buf_index += size;
if (f->is_write)
abort();
size = size1;
while (size > 0) {
l = f->buf_size - f->buf_index;
if (l == 0) {
qemu_fill_buffer(f);
l = f->buf_size - f->buf_index;
if (l == 0)
break;
} }
if (l > size)
l = size;
memcpy(buf, f->buf + f->buf_index, l);
f->buf_index += l;
buf += l;
size -= l;
}
return size1 - size;
} }
static int qemu_peek_byte(QEMUFile *f) static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset)
{ {
if (f->is_write) int pending;
abort(); int index;
if (f->buf_index >= f->buf_size) { if (f->is_write) {
abort();
}
index = f->buf_index + offset;
pending = f->buf_size - index;
if (pending < size) {
qemu_fill_buffer(f); qemu_fill_buffer(f);
if (f->buf_index >= f->buf_size) index = f->buf_index + offset;
pending = f->buf_size - index;
}
if (pending <= 0) {
return 0; return 0;
} }
return f->buf[f->buf_index]; if (size > pending) {
size = pending;
}
memcpy(buf, f->buf + index, size);
return size;
}
int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
{
int pending = size;
int done = 0;
while (pending > 0) {
int res;
res = qemu_peek_buffer(f, buf, pending, 0);
if (res == 0) {
return done;
}
qemu_file_skip(f, res);
buf += res;
pending -= res;
done += res;
}
return done;
}
static int qemu_peek_byte(QEMUFile *f, int offset)
{
int index = f->buf_index + offset;
if (f->is_write) {
abort();
}
if (index >= f->buf_size) {
qemu_fill_buffer(f);
index = f->buf_index + offset;
if (index >= f->buf_size) {
return 0;
}
}
return f->buf[index];
} }
int qemu_get_byte(QEMUFile *f) int qemu_get_byte(QEMUFile *f)
{ {
if (f->is_write) int result;
abort();
if (f->buf_index >= f->buf_size) { result = qemu_peek_byte(f, 0);
qemu_fill_buffer(f); qemu_file_skip(f, 1);
if (f->buf_index >= f->buf_size) return result;
return 0;
}
return f->buf[f->buf_index++];
} }
int64_t qemu_ftell(QEMUFile *f) int64_t qemu_ftell(QEMUFile *f)
@ -1466,6 +1504,7 @@ int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
int shared) int shared)
{ {
SaveStateEntry *se; SaveStateEntry *se;
int ret;
QTAILQ_FOREACH(se, &savevm_handlers, entry) { QTAILQ_FOREACH(se, &savevm_handlers, entry) {
if(se->set_params == NULL) { if(se->set_params == NULL) {
@ -1495,17 +1534,27 @@ int qemu_savevm_state_begin(Monitor *mon, QEMUFile *f, int blk_enable,
qemu_put_be32(f, se->instance_id); qemu_put_be32(f, se->instance_id);
qemu_put_be32(f, se->version_id); qemu_put_be32(f, se->version_id);
se->save_live_state(mon, f, QEMU_VM_SECTION_START, se->opaque); ret = se->save_live_state(mon, f, QEMU_VM_SECTION_START, se->opaque);
} if (ret < 0) {
qemu_savevm_state_cancel(mon, f);
if (qemu_file_has_error(f)) { return ret;
}
}
ret = qemu_file_get_error(f);
if (ret != 0) {
qemu_savevm_state_cancel(mon, f); qemu_savevm_state_cancel(mon, f);
return -EIO;
} }
return 0; return ret;
} }
/*
* this funtion has three return values:
* negative: there was one error, and we have -errno.
* 0 : We haven't finished, caller have to go again
* 1 : We have finished, we can go to complete phase
*/
int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f) int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f)
{ {
SaveStateEntry *se; SaveStateEntry *se;
@ -1520,7 +1569,7 @@ int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f)
qemu_put_be32(f, se->section_id); qemu_put_be32(f, se->section_id);
ret = se->save_live_state(mon, f, QEMU_VM_SECTION_PART, se->opaque); ret = se->save_live_state(mon, f, QEMU_VM_SECTION_PART, se->opaque);
if (!ret) { if (ret <= 0) {
/* Do not proceed to the next vmstate before this one reported /* Do not proceed to the next vmstate before this one reported
completion of the current stage. This serializes the migration completion of the current stage. This serializes the migration
and reduces the probability that a faster changing state is and reduces the probability that a faster changing state is
@ -1528,21 +1577,20 @@ int qemu_savevm_state_iterate(Monitor *mon, QEMUFile *f)
break; break;
} }
} }
if (ret != 0) {
if (ret) return ret;
return 1;
if (qemu_file_has_error(f)) {
qemu_savevm_state_cancel(mon, f);
return -EIO;
} }
ret = qemu_file_get_error(f);
return 0; if (ret != 0) {
qemu_savevm_state_cancel(mon, f);
}
return ret;
} }
int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f) int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f)
{ {
SaveStateEntry *se; SaveStateEntry *se;
int ret;
cpu_synchronize_all_states(); cpu_synchronize_all_states();
@ -1554,7 +1602,10 @@ int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f)
qemu_put_byte(f, QEMU_VM_SECTION_END); qemu_put_byte(f, QEMU_VM_SECTION_END);
qemu_put_be32(f, se->section_id); qemu_put_be32(f, se->section_id);
se->save_live_state(mon, f, QEMU_VM_SECTION_END, se->opaque); ret = se->save_live_state(mon, f, QEMU_VM_SECTION_END, se->opaque);
if (ret < 0) {
return ret;
}
} }
QTAILQ_FOREACH(se, &savevm_handlers, entry) { QTAILQ_FOREACH(se, &savevm_handlers, entry) {
@ -1580,10 +1631,7 @@ int qemu_savevm_state_complete(Monitor *mon, QEMUFile *f)
qemu_put_byte(f, QEMU_VM_EOF); qemu_put_byte(f, QEMU_VM_EOF);
if (qemu_file_has_error(f)) return qemu_file_get_error(f);
return -EIO;
return 0;
} }
void qemu_savevm_state_cancel(Monitor *mon, QEMUFile *f) void qemu_savevm_state_cancel(Monitor *mon, QEMUFile *f)
@ -1619,8 +1667,9 @@ static int qemu_savevm_state(Monitor *mon, QEMUFile *f)
ret = qemu_savevm_state_complete(mon, f); ret = qemu_savevm_state_complete(mon, f);
out: out:
if (qemu_file_has_error(f)) if (ret == 0) {
ret = -EIO; ret = qemu_file_get_error(f);
}
return ret; return ret;
} }
@ -1659,29 +1708,36 @@ static const VMStateDescription *vmstate_get_subsection(const VMStateSubsection
static int vmstate_subsection_load(QEMUFile *f, const VMStateDescription *vmsd, static int vmstate_subsection_load(QEMUFile *f, const VMStateDescription *vmsd,
void *opaque) void *opaque)
{ {
const VMStateSubsection *sub = vmsd->subsections; while (qemu_peek_byte(f, 0) == QEMU_VM_SUBSECTION) {
if (!sub || !sub->needed) {
return 0;
}
while (qemu_peek_byte(f) == QEMU_VM_SUBSECTION) {
char idstr[256]; char idstr[256];
int ret; int ret;
uint8_t version_id, len; uint8_t version_id, len, size;
const VMStateDescription *sub_vmsd; const VMStateDescription *sub_vmsd;
qemu_get_byte(f); /* subsection */ len = qemu_peek_byte(f, 1);
len = qemu_get_byte(f); if (len < strlen(vmsd->name) + 1) {
qemu_get_buffer(f, (uint8_t *)idstr, len); /* subsection name has be be "section_name/a" */
idstr[len] = 0; return 0;
version_id = qemu_get_be32(f); }
size = qemu_peek_buffer(f, (uint8_t *)idstr, len, 2);
if (size != len) {
return 0;
}
idstr[size] = 0;
sub_vmsd = vmstate_get_subsection(sub, idstr); if (strncmp(vmsd->name, idstr, strlen(vmsd->name)) != 0) {
/* it don't have a valid subsection name */
return 0;
}
sub_vmsd = vmstate_get_subsection(vmsd->subsections, idstr);
if (sub_vmsd == NULL) { if (sub_vmsd == NULL) {
return -ENOENT; return -ENOENT;
} }
assert(!sub_vmsd->subsections); qemu_file_skip(f, 1); /* subsection */
qemu_file_skip(f, 1); /* len */
qemu_file_skip(f, len); /* idstr */
version_id = qemu_get_be32(f);
ret = vmstate_load_state(f, sub_vmsd, opaque, version_id); ret = vmstate_load_state(f, sub_vmsd, opaque, version_id);
if (ret) { if (ret) {
return ret; return ret;
@ -1705,7 +1761,6 @@ static void vmstate_subsection_save(QEMUFile *f, const VMStateDescription *vmsd,
qemu_put_byte(f, len); qemu_put_byte(f, len);
qemu_put_buffer(f, (uint8_t *)vmsd->name, len); qemu_put_buffer(f, (uint8_t *)vmsd->name, len);
qemu_put_be32(f, vmsd->version_id); qemu_put_be32(f, vmsd->version_id);
assert(!vmsd->subsections);
vmstate_save_state(f, vmsd, opaque); vmstate_save_state(f, vmsd, opaque);
} }
sub++; sub++;
@ -1831,8 +1886,9 @@ out:
g_free(le); g_free(le);
} }
if (qemu_file_has_error(f)) if (ret == 0) {
ret = -EIO; ret = qemu_file_get_error(f);
}
return ret; return ret;
} }

View File

@ -447,9 +447,9 @@ void do_info_spice(Monitor *mon, QObject **ret_data)
static void migration_state_notifier(Notifier *notifier, void *data) static void migration_state_notifier(Notifier *notifier, void *data)
{ {
int state = get_migration_state(); MigrationState *s = data;
if (state == MIG_STATE_COMPLETED) { if (migration_has_finished(s)) {
#if SPICE_SERVER_VERSION >= 0x000701 /* 0.7.1 */ #if SPICE_SERVER_VERSION >= 0x000701 /* 0.7.1 */
spice_server_migrate_switch(spice_server); spice_server_migrate_switch(spice_server);
#endif #endif