From 334d15d504633a4610b8f4de4e01d8bba24c2655 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Mon, 20 Jan 2020 15:52:52 +0100 Subject: [PATCH 01/18] migration-test: Use g_free() instead of free() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan Quintela Reviewed-by: Thomas Huth Reviewed-by: Philippe Mathieu-Daudé --- tests/qtest/migration-test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c index 26e2e77289..b6a74a05ce 100644 --- a/tests/qtest/migration-test.c +++ b/tests/qtest/migration-test.c @@ -1291,7 +1291,7 @@ static void test_multifd_tcp(void) wait_for_serial("dest_serial"); wait_for_migration_complete(from); test_migrate_end(from, to, true); - free(uri); + g_free(uri); } int main(int argc, char **argv) From 3d4095b222d97393b1c2c6e514951ec7798f1c43 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 18 Dec 2019 05:12:36 +0100 Subject: [PATCH 02/18] multifd: Make sure that we don't do any IO after an error Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index d2208b5534..f95d656c26 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -3445,7 +3445,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) { RAMState **temp = opaque; RAMState *rs = *temp; - int ret; + int ret = 0; int i; int64_t t0; int done = 0; @@ -3524,12 +3524,14 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_ROUND); out: - multifd_send_sync_main(rs); - qemu_put_be64(f, RAM_SAVE_FLAG_EOS); - qemu_fflush(f); - ram_counters.transferred += 8; + if (ret >= 0) { + multifd_send_sync_main(rs); + qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); + ram_counters.transferred += 8; - ret = qemu_file_get_error(f); + ret = qemu_file_get_error(f); + } if (ret < 0) { return ret; } @@ -3581,9 +3583,11 @@ static int ram_save_complete(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_FINISH); } - multifd_send_sync_main(rs); - qemu_put_be64(f, RAM_SAVE_FLAG_EOS); - qemu_fflush(f); + if (ret >= 0) { + multifd_send_sync_main(rs); + qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); + } return ret; } From a555b8092abc6f1bbe4b64c516679cbd68fcfbd8 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 18 Dec 2019 05:11:31 +0100 Subject: [PATCH 03/18] qemu-file: Don't do IO after shutdown Be sure that we are not doing neither read/write after shutdown of the QEMUFile. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/qemu-file.c | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 26fb25ddc1..bbb2b63927 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -53,6 +53,8 @@ struct QEMUFile { int last_error; Error *last_error_obj; + /* has the file has been shutdown */ + bool shutdown; }; /* @@ -61,10 +63,18 @@ struct QEMUFile { */ int qemu_file_shutdown(QEMUFile *f) { + int ret; + + f->shutdown = true; if (!f->ops->shut_down) { return -ENOSYS; } - return f->ops->shut_down(f->opaque, true, true, NULL); + ret = f->ops->shut_down(f->opaque, true, true, NULL); + + if (!f->last_error) { + qemu_file_set_error(f, -EIO); + } + return ret; } /* @@ -214,6 +224,9 @@ void qemu_fflush(QEMUFile *f) return; } + if (f->shutdown) { + return; + } if (f->iovcnt > 0) { expect = iov_size(f->iov, f->iovcnt); ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos, @@ -328,6 +341,10 @@ static ssize_t qemu_fill_buffer(QEMUFile *f) f->buf_index = 0; f->buf_size = pending; + if (f->shutdown) { + return 0; + } + len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos, IO_BUF_SIZE - pending, &local_error); if (len > 0) { @@ -642,6 +659,9 @@ int64_t qemu_ftell(QEMUFile *f) int qemu_file_rate_limit(QEMUFile *f) { + if (f->shutdown) { + return 1; + } if (qemu_file_get_error(f)) { return 1; } From b69a0227a803256ad270283872d40ff768f4d56d Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 11:36:12 +0100 Subject: [PATCH 04/18] migration: Don't send data if we have stopped If we do a cancel, we got out without one error, but we can't do the rest of the output as in a normal situation. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/migration/ram.c b/migration/ram.c index f95d656c26..3fd7fdffcf 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -3524,7 +3524,8 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) ram_control_after_iterate(f, RAM_CONTROL_ROUND); out: - if (ret >= 0) { + if (ret >= 0 + && migration_is_setup_or_active(migrate_get_current()->state)) { multifd_send_sync_main(rs); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); qemu_fflush(f); From d795f47466ead5f189f2efbcf9080dc9c0b92443 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 18 Dec 2019 05:13:40 +0100 Subject: [PATCH 05/18] migration-test: Make sure that multifd and cancel works Test that this sequence works: - launch source - launch target - start migration - cancel migration - relaunch target - do migration again Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- tests/qtest/migration-test.c | 112 ++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/tests/qtest/migration-test.c b/tests/qtest/migration-test.c index b6a74a05ce..cf27ebbc9d 100644 --- a/tests/qtest/migration-test.c +++ b/tests/qtest/migration-test.c @@ -424,6 +424,14 @@ static void migrate_recover(QTestState *who, const char *uri) qobject_unref(rsp); } +static void migrate_cancel(QTestState *who) +{ + QDict *rsp; + + rsp = wait_command(who, "{ 'execute': 'migrate_cancel' }"); + qobject_unref(rsp); +} + static void migrate_set_capability(QTestState *who, const char *capability, bool value) { @@ -456,6 +464,8 @@ static void migrate_postcopy_start(QTestState *from, QTestState *to) typedef struct { bool hide_stderr; bool use_shmem; + /* only launch the target process */ + bool only_target; char *opts_source; char *opts_target; } MigrateStart; @@ -571,7 +581,9 @@ static int test_migrate_start(QTestState **from, QTestState **to, arch_source, shmem_opts, args->opts_source, ignore_stderr); g_free(arch_source); - *from = qtest_init(cmd_source); + if (!args->only_target) { + *from = qtest_init(cmd_source); + } g_free(cmd_source); cmd_target = g_strdup_printf("-accel kvm -accel tcg%s%s " @@ -1294,6 +1306,103 @@ static void test_multifd_tcp(void) g_free(uri); } +/* + * This test does: + * source target + * migrate_incoming + * migrate + * migrate_cancel + * launch another target + * migrate + * + * And see that it works + */ + +static void test_multifd_tcp_cancel(void) +{ + MigrateStart *args = migrate_start_new(); + QTestState *from, *to, *to2; + QDict *rsp; + char *uri; + + args->hide_stderr = true; + + if (test_migrate_start(&from, &to, "defer", args)) { + return; + } + + /* + * We want to pick a speed slow enough that the test completes + * quickly, but that it doesn't complete precopy even on a slow + * machine, so also set the downtime. + */ + /* 1 ms should make it not converge*/ + migrate_set_parameter_int(from, "downtime-limit", 1); + /* 300MB/s */ + migrate_set_parameter_int(from, "max-bandwidth", 30000000); + + migrate_set_parameter_int(from, "multifd-channels", 16); + migrate_set_parameter_int(to, "multifd-channels", 16); + + migrate_set_capability(from, "multifd", "true"); + migrate_set_capability(to, "multifd", "true"); + + /* Start incoming migration from the 1st socket */ + rsp = wait_command(to, "{ 'execute': 'migrate-incoming'," + " 'arguments': { 'uri': 'tcp:127.0.0.1:0' }}"); + qobject_unref(rsp); + + /* Wait for the first serial output from the source */ + wait_for_serial("src_serial"); + + uri = migrate_get_socket_address(to, "socket-address"); + + migrate_qmp(from, uri, "{}"); + + wait_for_migration_pass(from); + + migrate_cancel(from); + + args = migrate_start_new(); + args->only_target = true; + + if (test_migrate_start(&from, &to2, "defer", args)) { + return; + } + + migrate_set_parameter_int(to2, "multifd-channels", 16); + + migrate_set_capability(to2, "multifd", "true"); + + /* Start incoming migration from the 1st socket */ + rsp = wait_command(to2, "{ 'execute': 'migrate-incoming'," + " 'arguments': { 'uri': 'tcp:127.0.0.1:0' }}"); + qobject_unref(rsp); + + uri = migrate_get_socket_address(to2, "socket-address"); + + wait_for_migration_status(from, "cancelled", NULL); + + /* 300ms it should converge */ + migrate_set_parameter_int(from, "downtime-limit", 300); + /* 1GB/s */ + migrate_set_parameter_int(from, "max-bandwidth", 1000000000); + + migrate_qmp(from, uri, "{}"); + + wait_for_migration_pass(from); + + if (!got_stop) { + qtest_qmp_eventwait(from, "STOP"); + } + qtest_qmp_eventwait(to2, "RESUME"); + + wait_for_serial("dest_serial"); + wait_for_migration_complete(from); + test_migrate_end(from, to2, true); + g_free(uri); +} + int main(int argc, char **argv) { char template[] = "/tmp/migration-test-XXXXXX"; @@ -1359,6 +1468,7 @@ int main(int argc, char **argv) qtest_add_func("/migration/auto_converge", test_migrate_auto_converge); qtest_add_func("/migration/multifd/tcp", test_multifd_tcp); + qtest_add_func("/migration/multifd/tcp/cancel", test_multifd_tcp_cancel); ret = g_test_run(); From 392d87e21325fdb01210176faa07472b4985ccf0 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Tue, 21 Jan 2020 15:39:23 +0100 Subject: [PATCH 06/18] migration: Create migration_is_running() This function returns true if we are in the middle of a migration. It is like migration_is_setup_or_active() with CANCELLING and COLO. Adapt all callers that are needed. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/migration.c | 29 ++++++++++++++++++++++++----- migration/migration.h | 1 + migration/savevm.c | 4 +--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index efd5350e84..77768fb2c7 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -829,6 +829,27 @@ bool migration_is_setup_or_active(int state) } } +bool migration_is_running(int state) +{ + switch (state) { + case MIGRATION_STATUS_ACTIVE: + case MIGRATION_STATUS_POSTCOPY_ACTIVE: + case MIGRATION_STATUS_POSTCOPY_PAUSED: + case MIGRATION_STATUS_POSTCOPY_RECOVER: + case MIGRATION_STATUS_SETUP: + case MIGRATION_STATUS_PRE_SWITCHOVER: + case MIGRATION_STATUS_DEVICE: + case MIGRATION_STATUS_WAIT_UNPLUG: + case MIGRATION_STATUS_CANCELLING: + case MIGRATION_STATUS_COLO: + return true; + + default: + return false; + + } +} + static void populate_time_info(MigrationInfo *info, MigrationState *s) { info->has_status = true; @@ -1077,7 +1098,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, MigrationCapabilityStatusList *cap; bool cap_list[MIGRATION_CAPABILITY__MAX]; - if (migration_is_setup_or_active(s->state)) { + if (migration_is_running(s->state)) { error_setg(errp, QERR_MIGRATION_ACTIVE); return; } @@ -1590,7 +1611,7 @@ static void migrate_fd_cancel(MigrationState *s) do { old_state = s->state; - if (!migration_is_setup_or_active(old_state)) { + if (!migration_is_running(old_state)) { break; } /* If the migration is paused, kick it out of the pause */ @@ -1888,9 +1909,7 @@ static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc, return true; } - if (migration_is_setup_or_active(s->state) || - s->state == MIGRATION_STATUS_CANCELLING || - s->state == MIGRATION_STATUS_COLO) { + if (migration_is_running(s->state)) { error_setg(errp, QERR_MIGRATION_ACTIVE); return false; } diff --git a/migration/migration.h b/migration/migration.h index aa9ff6f27b..44b1d56929 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -279,6 +279,7 @@ void migrate_fd_error(MigrationState *s, const Error *error); void migrate_fd_connect(MigrationState *s, Error *error_in); bool migration_is_setup_or_active(int state); +bool migration_is_running(int state); void migrate_init(MigrationState *s); bool migration_is_blocked(Error **errp); diff --git a/migration/savevm.c b/migration/savevm.c index adfdca26ac..f19cb9ec7a 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -1531,9 +1531,7 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp) MigrationState *ms = migrate_get_current(); MigrationStatus status; - if (migration_is_setup_or_active(ms->state) || - ms->state == MIGRATION_STATUS_CANCELLING || - ms->state == MIGRATION_STATUS_COLO) { + if (migration_is_running(ms->state)) { error_setg(errp, QERR_MIGRATION_ACTIVE); return -EINVAL; } From 9c4d333c092e9c26d38f740ff3616deb42f21681 Mon Sep 17 00:00:00 2001 From: Zhimin Feng Date: Fri, 10 Jan 2020 16:50:19 +0800 Subject: [PATCH 07/18] migration/multifd: fix nullptr access in multifd_send_terminate_threads If the multifd_send_threads is not created when migration is failed, multifd_save_cleanup would be called twice. In this senario, the multifd_send_state is accessed after it has been released, the result is that the source VM is crashing down. Here is the coredump stack: Program received signal SIGSEGV, Segmentation fault. 0x00005629333a78ef in multifd_send_terminate_threads (err=err@entry=0x0) at migration/ram.c:1012 1012 MultiFDSendParams *p = &multifd_send_state->params[i]; #0 0x00005629333a78ef in multifd_send_terminate_threads (err=err@entry=0x0) at migration/ram.c:1012 #1 0x00005629333ab8a9 in multifd_save_cleanup () at migration/ram.c:1028 #2 0x00005629333abaea in multifd_new_send_channel_async (task=0x562935450e70, opaque=) at migration/ram.c:1202 #3 0x000056293373a562 in qio_task_complete (task=task@entry=0x562935450e70) at io/task.c:196 #4 0x000056293373a6e0 in qio_task_thread_result (opaque=0x562935450e70) at io/task.c:111 #5 0x00007f475d4d75a7 in g_idle_dispatch () from /usr/lib64/libglib-2.0.so.0 #6 0x00007f475d4da9a9 in g_main_context_dispatch () from /usr/lib64/libglib-2.0.so.0 #7 0x0000562933785b33 in glib_pollfds_poll () at util/main-loop.c:219 #8 os_host_main_loop_wait (timeout=) at util/main-loop.c:242 #9 main_loop_wait (nonblocking=nonblocking@entry=0) at util/main-loop.c:518 #10 0x00005629334c5acf in main_loop () at vl.c:1810 #11 0x000056293334d7bb in main (argc=, argv=, envp=) at vl.c:4471 If the multifd_send_threads is not created when migration is failed. In this senario, we don't call multifd_save_cleanup in multifd_new_send_channel_async. Signed-off-by: Zhimin Feng Reviewed-by: Juan Quintela Signed-off-by: Juan Quintela --- migration/ram.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/migration/ram.c b/migration/ram.c index 3fd7fdffcf..82c7edb083 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -1233,7 +1233,15 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) trace_multifd_new_send_channel_async(p->id); if (qio_task_propagate_error(task, &local_err)) { migrate_set_error(migrate_get_current(), local_err); - multifd_save_cleanup(); + /* Error happen, we need to tell who pay attention to me */ + qemu_sem_post(&multifd_send_state->channels_ready); + qemu_sem_post(&p->sem_sync); + /* + * Although multifd_send_thread is not created, but main migration + * thread neet to judge whether it is running, so we need to mark + * its status. + */ + p->quit = true; } else { p->c = QIO_CHANNEL(sioc); qio_channel_set_delay(p->c, false); From 41aa4e9fd847a2145d336406d1cf7bbb7aa8803a Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 15:58:57 +0100 Subject: [PATCH 08/18] ram_addr: Split RAMBlock definition We need some of the fields without having to poison everything else. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- MAINTAINERS | 1 + include/exec/ram_addr.h | 40 +------------------------- include/exec/ramblock.h | 64 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 39 deletions(-) create mode 100644 include/exec/ramblock.h diff --git a/MAINTAINERS b/MAINTAINERS index efd3f3875f..c45e886d88 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -1975,6 +1975,7 @@ F: ioport.c F: include/exec/memop.h F: include/exec/memory.h F: include/exec/ram_addr.h +F: include/exec/ramblock.h F: memory.c F: include/exec/memory-internal.h F: exec.c diff --git a/include/exec/ram_addr.h b/include/exec/ram_addr.h index 5adebb0bc7..5e59a3d8d7 100644 --- a/include/exec/ram_addr.h +++ b/include/exec/ram_addr.h @@ -24,45 +24,7 @@ #include "hw/xen/xen.h" #include "sysemu/tcg.h" #include "exec/ramlist.h" - -struct RAMBlock { - struct rcu_head rcu; - struct MemoryRegion *mr; - uint8_t *host; - uint8_t *colo_cache; /* For colo, VM's ram cache */ - ram_addr_t offset; - ram_addr_t used_length; - ram_addr_t max_length; - void (*resized)(const char*, uint64_t length, void *host); - uint32_t flags; - /* Protected by iothread lock. */ - char idstr[256]; - /* RCU-enabled, writes protected by the ramlist lock */ - QLIST_ENTRY(RAMBlock) next; - QLIST_HEAD(, RAMBlockNotifier) ramblock_notifiers; - int fd; - size_t page_size; - /* dirty bitmap used during migration */ - unsigned long *bmap; - /* bitmap of already received pages in postcopy */ - unsigned long *receivedmap; - - /* - * bitmap to track already cleared dirty bitmap. When the bit is - * set, it means the corresponding memory chunk needs a log-clear. - * Set this up to non-NULL to enable the capability to postpone - * and split clearing of dirty bitmap on the remote node (e.g., - * KVM). The bitmap will be set only when doing global sync. - * - * NOTE: this bitmap is different comparing to the other bitmaps - * in that one bit can represent multiple guest pages (which is - * decided by the `clear_bmap_shift' variable below). On - * destination side, this should always be NULL, and the variable - * `clear_bmap_shift' is meaningless. - */ - unsigned long *clear_bmap; - uint8_t clear_bmap_shift; -}; +#include "exec/ramblock.h" /** * clear_bmap_size: calculate clear bitmap size diff --git a/include/exec/ramblock.h b/include/exec/ramblock.h new file mode 100644 index 0000000000..07d50864d8 --- /dev/null +++ b/include/exec/ramblock.h @@ -0,0 +1,64 @@ +/* + * Declarations for cpu physical memory functions + * + * Copyright 2011 Red Hat, Inc. and/or its affiliates + * + * Authors: + * Avi Kivity + * + * This work is licensed under the terms of the GNU GPL, version 2 or + * later. See the COPYING file in the top-level directory. + * + */ + +/* + * This header is for use by exec.c and memory.c ONLY. Do not include it. + * The functions declared here will be removed soon. + */ + +#ifndef QEMU_EXEC_RAMBLOCK_H +#define QEMU_EXEC_RAMBLOCK_H + +#ifndef CONFIG_USER_ONLY +#include "cpu-common.h" + +struct RAMBlock { + struct rcu_head rcu; + struct MemoryRegion *mr; + uint8_t *host; + uint8_t *colo_cache; /* For colo, VM's ram cache */ + ram_addr_t offset; + ram_addr_t used_length; + ram_addr_t max_length; + void (*resized)(const char*, uint64_t length, void *host); + uint32_t flags; + /* Protected by iothread lock. */ + char idstr[256]; + /* RCU-enabled, writes protected by the ramlist lock */ + QLIST_ENTRY(RAMBlock) next; + QLIST_HEAD(, RAMBlockNotifier) ramblock_notifiers; + int fd; + size_t page_size; + /* dirty bitmap used during migration */ + unsigned long *bmap; + /* bitmap of already received pages in postcopy */ + unsigned long *receivedmap; + + /* + * bitmap to track already cleared dirty bitmap. When the bit is + * set, it means the corresponding memory chunk needs a log-clear. + * Set this up to non-NULL to enable the capability to postpone + * and split clearing of dirty bitmap on the remote node (e.g., + * KVM). The bitmap will be set only when doing global sync. + * + * NOTE: this bitmap is different comparing to the other bitmaps + * in that one bit can represent multiple guest pages (which is + * decided by the `clear_bmap_shift' variable below). On + * destination side, this should always be NULL, and the variable + * `clear_bmap_shift' is meaningless. + */ + unsigned long *clear_bmap; + uint8_t clear_bmap_shift; +}; +#endif +#endif From df94d32bb14faa0a329782b5354fe0ab5b1e1ef5 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 16:01:09 +0100 Subject: [PATCH 09/18] multifd: multifd_send_pages only needs the qemufile Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 82c7edb083..602e9ca5a0 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -929,7 +929,7 @@ struct { * false. */ -static int multifd_send_pages(RAMState *rs) +static int multifd_send_pages(QEMUFile *f) { int i; static int next_channel; @@ -965,7 +965,7 @@ static int multifd_send_pages(RAMState *rs) multifd_send_state->pages = p->pages; p->pages = pages; transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len; - qemu_file_update_transfer(rs->f, transferred); + qemu_file_update_transfer(f, transferred); ram_counters.multifd_bytes += transferred; ram_counters.transferred += transferred;; qemu_mutex_unlock(&p->mutex); @@ -993,7 +993,7 @@ static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) } } - if (multifd_send_pages(rs) < 0) { + if (multifd_send_pages(rs->f) < 0) { return -1; } @@ -1090,7 +1090,7 @@ static void multifd_send_sync_main(RAMState *rs) return; } if (multifd_send_state->pages->used) { - if (multifd_send_pages(rs) < 0) { + if (multifd_send_pages(rs->f) < 0) { error_report("%s: multifd_send_pages fail", __func__); return; } From 67a4c8910c7351114f2bffe6b0eceac6ad9c72af Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 16:03:01 +0100 Subject: [PATCH 10/18] multifd: multifd_queue_page only needs the qemufile Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 602e9ca5a0..ad9cc3e7bd 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -974,7 +974,7 @@ static int multifd_send_pages(QEMUFile *f) return 1; } -static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) +static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) { MultiFDPages_t *pages = multifd_send_state->pages; @@ -993,12 +993,12 @@ static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) } } - if (multifd_send_pages(rs->f) < 0) { + if (multifd_send_pages(f) < 0) { return -1; } if (pages->block != block) { - return multifd_queue_page(rs, block, offset); + return multifd_queue_page(f, block, offset); } return 1; @@ -2136,7 +2136,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - if (multifd_queue_page(rs, block, offset) < 0) { + if (multifd_queue_page(rs->f, block, offset) < 0) { return -1; } ram_counters.normal++; From 99f2c6fb46d9df44aa13ec043d807505ae4b5559 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 16:04:53 +0100 Subject: [PATCH 11/18] multifd: multifd_send_sync_main only needs the qemufile Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index ad9cc3e7bd..152e9cf46d 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -1082,7 +1082,7 @@ void multifd_save_cleanup(void) multifd_send_state = NULL; } -static void multifd_send_sync_main(RAMState *rs) +static void multifd_send_sync_main(QEMUFile *f) { int i; @@ -1090,7 +1090,7 @@ static void multifd_send_sync_main(RAMState *rs) return; } if (multifd_send_state->pages->used) { - if (multifd_send_pages(rs->f) < 0) { + if (multifd_send_pages(f) < 0) { error_report("%s: multifd_send_pages fail", __func__); return; } @@ -1111,7 +1111,7 @@ static void multifd_send_sync_main(RAMState *rs) p->packet_num = multifd_send_state->packet_num++; p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; - qemu_file_update_transfer(rs->f, p->packet_len); + qemu_file_update_transfer(f, p->packet_len); ram_counters.multifd_bytes += p->packet_len; ram_counters.transferred += p->packet_len; qemu_mutex_unlock(&p->mutex); @@ -3434,7 +3434,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); - multifd_send_sync_main(*rsp); + multifd_send_sync_main(f); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); qemu_fflush(f); @@ -3534,7 +3534,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) out: if (ret >= 0 && migration_is_setup_or_active(migrate_get_current()->state)) { - multifd_send_sync_main(rs); + multifd_send_sync_main(rs->f); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); qemu_fflush(f); ram_counters.transferred += 8; @@ -3593,7 +3593,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) } if (ret >= 0) { - multifd_send_sync_main(rs); + multifd_send_sync_main(rs->f); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); qemu_fflush(f); } From a6703e4d336b29668b3f5f84da82619dafe33f10 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 17:46:30 +0100 Subject: [PATCH 12/18] multifd: Use qemu_target_page_size() We will make it cpu independent. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 152e9cf46d..8f04b5ab3a 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -882,14 +882,14 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) for (i = 0; i < p->pages->used; i++) { uint64_t offset = be64_to_cpu(packet->offset[i]); - if (offset > (block->used_length - TARGET_PAGE_SIZE)) { + if (offset > (block->used_length - qemu_target_page_size())) { error_setg(errp, "multifd: offset too long %" PRIu64 " (max " RAM_ADDR_FMT ")", offset, block->max_length); return -1; } p->pages->iov[i].iov_base = block->host + offset; - p->pages->iov[i].iov_len = TARGET_PAGE_SIZE; + p->pages->iov[i].iov_len = qemu_target_page_size(); } return 0; @@ -964,7 +964,8 @@ static int multifd_send_pages(QEMUFile *f) p->packet_num = multifd_send_state->packet_num++; multifd_send_state->pages = p->pages; p->pages = pages; - transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len; + transferred = ((uint64_t) pages->used) * qemu_target_page_size() + + p->packet_len; qemu_file_update_transfer(f, transferred); ram_counters.multifd_bytes += transferred; ram_counters.transferred += transferred;; @@ -985,7 +986,7 @@ static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) if (pages->block == block) { pages->offset[pages->used] = offset; pages->iov[pages->used].iov_base = block->host + offset; - pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE; + pages->iov[pages->used].iov_len = qemu_target_page_size(); pages->used++; if (pages->used < pages->allocated) { From 857a4bbb86d45fe9cd008480d3d6460a998cbb66 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 19:12:28 +0100 Subject: [PATCH 13/18] migration: Make checkpatch happy with comments Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 8f04b5ab3a..12b76b7841 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -1320,10 +1320,12 @@ static void multifd_recv_terminate_threads(Error *err) qemu_mutex_lock(&p->mutex); p->quit = true; - /* We could arrive here for two reasons: - - normal quit, i.e. everything went fine, just finished - - error quit: We close the channels so the channel threads - finish the qio_channel_read_all_eof() */ + /* + * We could arrive here for two reasons: + * - normal quit, i.e. everything went fine, just finished + * - error quit: We close the channels so the channel threads + * finish the qio_channel_read_all_eof() + */ if (p->c) { qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } From 00f4b572e60ecfac0fde9193257efc61a76aba7e Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 12 Jun 2019 11:33:27 +0200 Subject: [PATCH 14/18] multifd: Make multifd_save_setup() get an Error parameter Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/migration.c | 4 +++- migration/ram.c | 2 +- migration/ram.h | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index 77768fb2c7..d478f832ea 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -3367,6 +3367,7 @@ static void *migration_thread(void *opaque) void migrate_fd_connect(MigrationState *s, Error *error_in) { + Error *local_err = NULL; int64_t rate_limit; bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED; @@ -3415,7 +3416,8 @@ void migrate_fd_connect(MigrationState *s, Error *error_in) return; } - if (multifd_save_setup() != 0) { + if (multifd_save_setup(&local_err) != 0) { + error_report_err(local_err); migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED); migrate_fd_cleanup(s); diff --git a/migration/ram.c b/migration/ram.c index 12b76b7841..78483247ad 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -1252,7 +1252,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) } } -int multifd_save_setup(void) +int multifd_save_setup(Error **errp) { int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); diff --git a/migration/ram.h b/migration/ram.h index bd0eee79b6..da22a417ea 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -41,7 +41,7 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_total(void); -int multifd_save_setup(void); +int multifd_save_setup(Error **errp); void multifd_save_cleanup(void); int multifd_load_setup(void); int multifd_load_cleanup(Error **errp); From b673eab4e2ca749669405640bb9e3f5140f89e1b Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 12 Jun 2019 11:44:19 +0200 Subject: [PATCH 15/18] multifd: Make multifd_load_setup() get an Error parameter We need to change the full chain to pass the Error parameter. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/migration.c | 35 +++++++++++++++++++++++++++++------ migration/migration.h | 2 +- migration/ram.c | 2 +- migration/ram.h | 2 +- migration/rdma.c | 2 +- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index d478f832ea..adc7d08e93 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -518,13 +518,23 @@ fail: exit(EXIT_FAILURE); } -static void migration_incoming_setup(QEMUFile *f) +/** + * @migration_incoming_setup: Setup incoming migration + * + * Returns 0 for no error or 1 for error + * + * @f: file for main migration channel + * @errp: where to put errors + */ +static int migration_incoming_setup(QEMUFile *f, Error **errp) { MigrationIncomingState *mis = migration_incoming_get_current(); + Error *local_err = NULL; - if (multifd_load_setup() != 0) { + if (multifd_load_setup(&local_err) != 0) { /* We haven't been able to create multifd threads nothing better to do */ + error_report_err(local_err); exit(EXIT_FAILURE); } @@ -532,6 +542,7 @@ static void migration_incoming_setup(QEMUFile *f) mis->from_src_file = f; } qemu_file_set_blocking(f, false); + return 0; } void migration_incoming_process(void) @@ -572,19 +583,27 @@ static bool postcopy_try_recover(QEMUFile *f) return false; } -void migration_fd_process_incoming(QEMUFile *f) +void migration_fd_process_incoming(QEMUFile *f, Error **errp) { + Error *local_err = NULL; + if (postcopy_try_recover(f)) { return; } - migration_incoming_setup(f); + if (migration_incoming_setup(f, &local_err)) { + if (local_err) { + error_propagate(errp, local_err); + } + return; + } migration_incoming_process(); } void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) { MigrationIncomingState *mis = migration_incoming_get_current(); + Error *local_err = NULL; bool start_migration; if (!mis->from_src_file) { @@ -596,7 +615,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) return; } - migration_incoming_setup(f); + if (migration_incoming_setup(f, &local_err)) { + if (local_err) { + error_propagate(errp, local_err); + } + return; + } /* * Common migration only needs one channel, so we can start @@ -604,7 +628,6 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp) */ start_migration = !migrate_use_multifd(); } else { - Error *local_err = NULL; /* Multiple connections */ assert(migrate_use_multifd()); start_migration = multifd_recv_new_channel(ioc, &local_err); diff --git a/migration/migration.h b/migration/migration.h index 44b1d56929..8473ddfc88 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -265,7 +265,7 @@ struct MigrationState void migrate_set_state(int *state, int old_state, int new_state); -void migration_fd_process_incoming(QEMUFile *f); +void migration_fd_process_incoming(QEMUFile *f, Error **errp); void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp); void migration_incoming_process(void); diff --git a/migration/ram.c b/migration/ram.c index 78483247ad..3abd41ad33 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -1474,7 +1474,7 @@ static void *multifd_recv_thread(void *opaque) return NULL; } -int multifd_load_setup(void) +int multifd_load_setup(Error **errp) { int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); diff --git a/migration/ram.h b/migration/ram.h index da22a417ea..42be471d52 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -43,7 +43,7 @@ uint64_t ram_bytes_total(void); int multifd_save_setup(Error **errp); void multifd_save_cleanup(void); -int multifd_load_setup(void); +int multifd_load_setup(Error **errp); int multifd_load_cleanup(Error **errp); bool multifd_recv_all_channels_created(void); bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); diff --git a/migration/rdma.c b/migration/rdma.c index e241dcb992..2379b8345b 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -4004,7 +4004,7 @@ static void rdma_accept_incoming_migration(void *opaque) } rdma->migration_started_on_destination = 1; - migration_fd_process_incoming(f); + migration_fd_process_incoming(f, errp); } void rdma_start_incoming_migration(const char *host_port, Error **errp) From d32ca5ad7988328f95db6a26beb374c55154c77b Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 16:16:07 +0100 Subject: [PATCH 16/18] multifd: Split multifd code into its own file Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/Makefile.objs | 1 + migration/migration.c | 1 + migration/multifd.c | 899 ++++++++++++++++++++++++++++++++++++ migration/multifd.h | 139 ++++++ migration/ram.c | 988 +--------------------------------------- migration/ram.h | 7 - 6 files changed, 1041 insertions(+), 994 deletions(-) create mode 100644 migration/multifd.c create mode 100644 migration/multifd.h diff --git a/migration/Makefile.objs b/migration/Makefile.objs index a4f3bafd86..d3623d5f9b 100644 --- a/migration/Makefile.objs +++ b/migration/Makefile.objs @@ -7,6 +7,7 @@ common-obj-y += qemu-file-channel.o common-obj-y += xbzrle.o postcopy-ram.o common-obj-y += qjson.o common-obj-y += block-dirty-bitmap.o +common-obj-y += multifd.o common-obj-$(CONFIG_RDMA) += rdma.o diff --git a/migration/migration.c b/migration/migration.c index adc7d08e93..3a21a4686c 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -53,6 +53,7 @@ #include "monitor/monitor.h" #include "net/announce.h" #include "qemu/queue.h" +#include "multifd.h" #define MAX_THROTTLE (32 << 20) /* Migration transfer speed throttling */ diff --git a/migration/multifd.c b/migration/multifd.c new file mode 100644 index 0000000000..b3e8ae9bcc --- /dev/null +++ b/migration/multifd.c @@ -0,0 +1,899 @@ +/* + * Multifd common code + * + * Copyright (c) 2019-2020 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/rcu.h" +#include "exec/target_page.h" +#include "sysemu/sysemu.h" +#include "exec/ramblock.h" +#include "qemu/error-report.h" +#include "qapi/error.h" +#include "ram.h" +#include "migration.h" +#include "socket.h" +#include "qemu-file.h" +#include "trace.h" +#include "multifd.h" + +/* Multiple fd's */ + +#define MULTIFD_MAGIC 0x11223344U +#define MULTIFD_VERSION 1 + +typedef struct { + uint32_t magic; + uint32_t version; + unsigned char uuid[16]; /* QemuUUID */ + uint8_t id; + uint8_t unused1[7]; /* Reserved for future use */ + uint64_t unused2[4]; /* Reserved for future use */ +} __attribute__((packed)) MultiFDInit_t; + +static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) +{ + MultiFDInit_t msg = {}; + int ret; + + msg.magic = cpu_to_be32(MULTIFD_MAGIC); + msg.version = cpu_to_be32(MULTIFD_VERSION); + msg.id = p->id; + memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); + + ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + return 0; +} + +static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) +{ + MultiFDInit_t msg; + int ret; + + ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + + msg.magic = be32_to_cpu(msg.magic); + msg.version = be32_to_cpu(msg.version); + + if (msg.magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet magic %x " + "expected %x", msg.magic, MULTIFD_MAGIC); + return -1; + } + + if (msg.version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { + char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); + char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); + + error_setg(errp, "multifd: received uuid '%s' and expected " + "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); + g_free(uuid); + g_free(msg_uuid); + return -1; + } + + if (msg.id > migrate_multifd_channels()) { + error_setg(errp, "multifd: received channel version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + return msg.id; +} + +static MultiFDPages_t *multifd_pages_init(size_t size) +{ + MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); + + pages->allocated = size; + pages->iov = g_new0(struct iovec, size); + pages->offset = g_new0(ram_addr_t, size); + + return pages; +} + +static void multifd_pages_clear(MultiFDPages_t *pages) +{ + pages->used = 0; + pages->allocated = 0; + pages->packet_num = 0; + pages->block = NULL; + g_free(pages->iov); + pages->iov = NULL; + g_free(pages->offset); + pages->offset = NULL; + g_free(pages); +} + +static void multifd_send_fill_packet(MultiFDSendParams *p) +{ + MultiFDPacket_t *packet = p->packet; + int i; + + packet->flags = cpu_to_be32(p->flags); + packet->pages_alloc = cpu_to_be32(p->pages->allocated); + packet->pages_used = cpu_to_be32(p->pages->used); + packet->next_packet_size = cpu_to_be32(p->next_packet_size); + packet->packet_num = cpu_to_be64(p->packet_num); + + if (p->pages->block) { + strncpy(packet->ramblock, p->pages->block->idstr, 256); + } + + for (i = 0; i < p->pages->used; i++) { + /* there are architectures where ram_addr_t is 32 bit */ + uint64_t temp = p->pages->offset[i]; + + packet->offset[i] = cpu_to_be64(temp); + } +} + +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + MultiFDPacket_t *packet = p->packet; + uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + RAMBlock *block; + int i; + + packet->magic = be32_to_cpu(packet->magic); + if (packet->magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet " + "magic %x and expected magic %x", + packet->magic, MULTIFD_MAGIC); + return -1; + } + + packet->version = be32_to_cpu(packet->version); + if (packet->version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet " + "version %d and expected version %d", + packet->version, MULTIFD_VERSION); + return -1; + } + + p->flags = be32_to_cpu(packet->flags); + + packet->pages_alloc = be32_to_cpu(packet->pages_alloc); + /* + * If we received a packet that is 100 times bigger than expected + * just stop migration. It is a magic number. + */ + if (packet->pages_alloc > pages_max * 100) { + error_setg(errp, "multifd: received packet " + "with size %d and expected a maximum size of %d", + packet->pages_alloc, pages_max * 100) ; + return -1; + } + /* + * We received a packet that is bigger than expected but inside + * reasonable limits (see previous comment). Just reallocate. + */ + if (packet->pages_alloc > p->pages->allocated) { + multifd_pages_clear(p->pages); + p->pages = multifd_pages_init(packet->pages_alloc); + } + + p->pages->used = be32_to_cpu(packet->pages_used); + if (p->pages->used > packet->pages_alloc) { + error_setg(errp, "multifd: received packet " + "with %d pages and expected maximum pages are %d", + p->pages->used, packet->pages_alloc) ; + return -1; + } + + p->next_packet_size = be32_to_cpu(packet->next_packet_size); + p->packet_num = be64_to_cpu(packet->packet_num); + + if (p->pages->used == 0) { + return 0; + } + + /* make sure that ramblock is 0 terminated */ + packet->ramblock[255] = 0; + block = qemu_ram_block_by_name(packet->ramblock); + if (!block) { + error_setg(errp, "multifd: unknown ram block %s", + packet->ramblock); + return -1; + } + + for (i = 0; i < p->pages->used; i++) { + uint64_t offset = be64_to_cpu(packet->offset[i]); + + if (offset > (block->used_length - qemu_target_page_size())) { + error_setg(errp, "multifd: offset too long %" PRIu64 + " (max " RAM_ADDR_FMT ")", + offset, block->max_length); + return -1; + } + p->pages->iov[i].iov_base = block->host + offset; + p->pages->iov[i].iov_len = qemu_target_page_size(); + } + + return 0; +} + +struct { + MultiFDSendParams *params; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* send channels ready */ + QemuSemaphore channels_ready; + /* + * Have we already run terminate threads. There is a race when it + * happens that we got one error while we are exiting. + * We will use atomic operations. Only valid values are 0 and 1. + */ + int exiting; +} *multifd_send_state; + +/* + * How we use multifd_send_state->pages and channel->pages? + * + * We create a pages for each channel, and a main one. Each time that + * we need to send a batch of pages we interchange the ones between + * multifd_send_state and the channel that is sending it. There are + * two reasons for that: + * - to not have to do so many mallocs during migration + * - to make easier to know what to free at the end of migration + * + * This way we always know who is the owner of each "pages" struct, + * and we don't need any locking. It belongs to the migration thread + * or to the channel thread. Switching is safe because the migration + * thread is using the channel mutex when changing it, and the channel + * have to had finish with its own, otherwise pending_job can't be + * false. + */ + +static int multifd_send_pages(QEMUFile *f) +{ + int i; + static int next_channel; + MultiFDSendParams *p = NULL; /* make happy gcc */ + MultiFDPages_t *pages = multifd_send_state->pages; + uint64_t transferred; + + if (atomic_read(&multifd_send_state->exiting)) { + return -1; + } + + qemu_sem_wait(&multifd_send_state->channels_ready); + for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (p->quit) { + error_report("%s: channel %d has already quit!", __func__, i); + qemu_mutex_unlock(&p->mutex); + return -1; + } + if (!p->pending_job) { + p->pending_job++; + next_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + assert(!p->pages->used); + assert(!p->pages->block); + + p->packet_num = multifd_send_state->packet_num++; + multifd_send_state->pages = p->pages; + p->pages = pages; + transferred = ((uint64_t) pages->used) * qemu_target_page_size() + + p->packet_len; + qemu_file_update_transfer(f, transferred); + ram_counters.multifd_bytes += transferred; + ram_counters.transferred += transferred;; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + + return 1; +} + +int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_send_state->pages; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->used] = offset; + pages->iov[pages->used].iov_base = block->host + offset; + pages->iov[pages->used].iov_len = qemu_target_page_size(); + pages->used++; + + if (pages->used < pages->allocated) { + return 1; + } + } + + if (multifd_send_pages(f) < 0) { + return -1; + } + + if (pages->block != block) { + return multifd_queue_page(f, block, offset); + } + + return 1; +} + +static void multifd_send_terminate_threads(Error *err) +{ + int i; + + trace_multifd_send_terminate_threads(err != NULL); + + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_PRE_SWITCHOVER || + s->state == MIGRATION_STATUS_DEVICE || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + /* + * We don't want to exit each threads twice. Depending on where + * we get the error, or if there are two independent errors in two + * threads at the same time, we can end calling this function + * twice. + */ + if (atomic_xchg(&multifd_send_state->exiting, 1)) { + return; + } + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + qemu_sem_post(&p->sem); + qemu_mutex_unlock(&p->mutex); + } +} + +void multifd_save_cleanup(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + multifd_send_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + if (p->running) { + qemu_thread_join(&p->thread); + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + socket_send_channel_destroy(p->c); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_send_state->channels_ready); + g_free(multifd_send_state->params); + multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; + g_free(multifd_send_state); + multifd_send_state = NULL; +} + +void multifd_send_sync_main(QEMUFile *f) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + if (multifd_send_state->pages->used) { + if (multifd_send_pages(f) < 0) { + error_report("%s: multifd_send_pages fail", __func__); + return; + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); + + if (p->quit) { + error_report("%s: channel %d has already quit", __func__, i); + qemu_mutex_unlock(&p->mutex); + return; + } + + p->packet_num = multifd_send_state->packet_num++; + p->flags |= MULTIFD_FLAG_SYNC; + p->pending_job++; + qemu_file_update_transfer(f, p->packet_len); + ram_counters.multifd_bytes += p->packet_len; + ram_counters.transferred += p->packet_len; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_wait(p->id); + qemu_sem_wait(&p->sem_sync); + } + trace_multifd_send_sync_main(multifd_send_state->packet_num); +} + +static void *multifd_send_thread(void *opaque) +{ + MultiFDSendParams *p = opaque; + Error *local_err = NULL; + int ret = 0; + uint32_t flags = 0; + + trace_multifd_send_thread_start(p->id); + rcu_register_thread(); + + if (multifd_send_initial_packet(p, &local_err) < 0) { + ret = -1; + goto out; + } + /* initial packet */ + p->num_packets = 1; + + while (true) { + qemu_sem_wait(&p->sem); + + if (atomic_read(&multifd_send_state->exiting)) { + break; + } + qemu_mutex_lock(&p->mutex); + + if (p->pending_job) { + uint32_t used = p->pages->used; + uint64_t packet_num = p->packet_num; + flags = p->flags; + + p->next_packet_size = used * qemu_target_page_size(); + multifd_send_fill_packet(p); + p->flags = 0; + p->num_packets++; + p->num_pages += used; + p->pages->used = 0; + p->pages->block = NULL; + qemu_mutex_unlock(&p->mutex); + + trace_multifd_send(p->id, packet_num, used, flags, + p->next_packet_size); + + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + + if (used) { + ret = qio_channel_writev_all(p->c, p->pages->iov, + used, &local_err); + if (ret != 0) { + break; + } + } + + qemu_mutex_lock(&p->mutex); + p->pending_job--; + qemu_mutex_unlock(&p->mutex); + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&p->sem_sync); + } + qemu_sem_post(&multifd_send_state->channels_ready); + } else if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ + } + } + +out: + if (local_err) { + trace_multifd_send_error(p->id); + multifd_send_terminate_threads(local_err); + } + + /* + * Error happen, I will exit, but I can't just leave, tell + * who pay attention to me. + */ + if (ret != 0) { + qemu_sem_post(&p->sem_sync); + qemu_sem_post(&multifd_send_state->channels_ready); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + rcu_unregister_thread(); + trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); + + return NULL; +} + +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) +{ + MultiFDSendParams *p = opaque; + QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); + Error *local_err = NULL; + + trace_multifd_new_send_channel_async(p->id); + if (qio_task_propagate_error(task, &local_err)) { + migrate_set_error(migrate_get_current(), local_err); + /* Error happen, we need to tell who pay attention to me */ + qemu_sem_post(&multifd_send_state->channels_ready); + qemu_sem_post(&p->sem_sync); + /* + * Although multifd_send_thread is not created, but main migration + * thread neet to judge whether it is running, so we need to mark + * its status. + */ + p->quit = true; + } else { + p->c = QIO_CHANNEL(sioc); + qio_channel_set_delay(p->c, false); + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, + QEMU_THREAD_JOINABLE); + } +} + +int multifd_save_setup(Error **errp) +{ + int thread_count; + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint8_t i; + + if (!migrate_use_multifd()) { + return 0; + } + thread_count = migrate_multifd_channels(); + multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); + multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); + multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->channels_ready, 0); + atomic_set(&multifd_send_state->exiting, 0); + + for (i = 0; i < thread_count; i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); + p->quit = false; + p->pending_job = 0; + p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet->version = cpu_to_be32(MULTIFD_VERSION); + p->name = g_strdup_printf("multifdsend_%d", i); + socket_send_channel_create(multifd_new_send_channel_async, p); + } + return 0; +} + +struct { + MultiFDRecvParams *params; + /* number of created threads */ + int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; +} *multifd_recv_state; + +static void multifd_recv_terminate_threads(Error *err) +{ + int i; + + trace_multifd_recv_terminate_threads(err != NULL); + + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + /* + * We could arrive here for two reasons: + * - normal quit, i.e. everything went fine, just finished + * - error quit: We close the channels so the channel threads + * finish the qio_channel_read_all_eof() + */ + if (p->c) { + qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + } + qemu_mutex_unlock(&p->mutex); + } +} + +int multifd_load_cleanup(Error **errp) +{ + int i; + int ret = 0; + + if (!migrate_use_multifd()) { + return 0; + } + multifd_recv_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + if (p->running) { + p->quit = true; + /* + * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, + * however try to wakeup it without harm in cleanup phase. + */ + qemu_sem_post(&p->sem_sync); + qemu_thread_join(&p->thread); + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + object_unref(OBJECT(p->c)); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_recv_state->sem_sync); + g_free(multifd_recv_state->params); + multifd_recv_state->params = NULL; + g_free(multifd_recv_state); + multifd_recv_state = NULL; + + return ret; +} + +void multifd_recv_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_wait(p->id); + qemu_sem_wait(&multifd_recv_state->sem_sync); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (multifd_recv_state->packet_num < p->packet_num) { + multifd_recv_state->packet_num = p->packet_num; + } + qemu_mutex_unlock(&p->mutex); + trace_multifd_recv_sync_main_signal(p->id); + qemu_sem_post(&p->sem_sync); + } + trace_multifd_recv_sync_main(multifd_recv_state->packet_num); +} + +static void *multifd_recv_thread(void *opaque) +{ + MultiFDRecvParams *p = opaque; + Error *local_err = NULL; + int ret; + + trace_multifd_recv_thread_start(p->id); + rcu_register_thread(); + + while (true) { + uint32_t used; + uint32_t flags; + + if (p->quit) { + break; + } + + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0) { /* EOF */ + break; + } + if (ret == -1) { /* Error */ + break; + } + + qemu_mutex_lock(&p->mutex); + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { + qemu_mutex_unlock(&p->mutex); + break; + } + + used = p->pages->used; + flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, used, flags, + p->next_packet_size); + p->num_packets++; + p->num_pages += used; + qemu_mutex_unlock(&p->mutex); + + if (used) { + ret = qio_channel_readv_all(p->c, p->pages->iov, + used, &local_err); + if (ret != 0) { + break; + } + } + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } + } + + if (local_err) { + multifd_recv_terminate_threads(local_err); + } + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + rcu_unregister_thread(); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); + + return NULL; +} + +int multifd_load_setup(Error **errp) +{ + int thread_count; + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint8_t i; + + if (!migrate_use_multifd()) { + return 0; + } + thread_count = migrate_multifd_channels(); + multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); + multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); + atomic_set(&multifd_recv_state->count, 0); + qemu_sem_init(&multifd_recv_state->sem_sync, 0); + + for (i = 0; i < thread_count; i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem_sync, 0); + p->quit = false; + p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->name = g_strdup_printf("multifdrecv_%d", i); + } + return 0; +} + +bool multifd_recv_all_channels_created(void) +{ + int thread_count = migrate_multifd_channels(); + + if (!migrate_use_multifd()) { + return true; + } + + return thread_count == atomic_read(&multifd_recv_state->count); +} + +/* + * Try to receive all multifd channels to get ready for the migration. + * - Return true and do not set @errp when correctly receving all channels; + * - Return false and do not set @errp when correctly receiving the current one; + * - Return false and set @errp when failing to receive the current channel. + */ +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) +{ + MultiFDRecvParams *p; + Error *local_err = NULL; + int id; + + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + error_propagate_prepend(errp, local_err, + "failed to receive packet" + " via multifd channel %d: ", + atomic_read(&multifd_recv_state->count)); + return false; + } + trace_multifd_recv_new_channel(id); + + p = &multifd_recv_state->params[id]; + if (p->c != NULL) { + error_setg(&local_err, "multifd: received id '%d' already setup'", + id); + multifd_recv_terminate_threads(local_err); + error_propagate(errp, local_err); + return false; + } + p->c = ioc; + object_ref(OBJECT(ioc)); + /* initial packet */ + p->num_packets = 1; + + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, + QEMU_THREAD_JOINABLE); + atomic_inc(&multifd_recv_state->count); + return atomic_read(&multifd_recv_state->count) == + migrate_multifd_channels(); +} + diff --git a/migration/multifd.h b/migration/multifd.h new file mode 100644 index 0000000000..d8b0205977 --- /dev/null +++ b/migration/multifd.h @@ -0,0 +1,139 @@ +/* + * Multifd common functions + * + * Copyright (c) 2019-2020 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#ifndef QEMU_MIGRATION_MULTIFD_H +#define QEMU_MIGRATION_MULTIFD_H + +int multifd_save_setup(Error **errp); +void multifd_save_cleanup(void); +int multifd_load_setup(Error **errp); +int multifd_load_cleanup(Error **errp); +bool multifd_recv_all_channels_created(void); +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); +void multifd_recv_sync_main(void); +void multifd_send_sync_main(QEMUFile *f); +int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); + +#define MULTIFD_FLAG_SYNC (1 << 0) + +/* This value needs to be a multiple of qemu_target_page_size() */ +#define MULTIFD_PACKET_SIZE (512 * 1024) + +typedef struct { + uint32_t magic; + uint32_t version; + uint32_t flags; + /* maximum number of allocated pages */ + uint32_t pages_alloc; + uint32_t pages_used; + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + uint64_t packet_num; + uint64_t unused[4]; /* Reserved for future use */ + char ramblock[256]; + uint64_t offset[]; +} __attribute__((packed)) MultiFDPacket_t; + +typedef struct { + /* number of used pages */ + uint32_t used; + /* number of allocated pages */ + uint32_t allocated; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* offset of each page */ + ram_addr_t *offset; + /* pointer to each page */ + struct iovec *iov; + RAMBlock *block; +} MultiFDPages_t; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* sem where to wait for more work */ + QemuSemaphore sem; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; + /* thread has work to do */ + int pending_job; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} MultiFDSendParams; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; + /* array of pages to receive */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} MultiFDRecvParams; + +#endif + diff --git a/migration/ram.c b/migration/ram.c index 3abd41ad33..ed23ed1c7c 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -36,7 +36,6 @@ #include "xbzrle.h" #include "ram.h" #include "migration.h" -#include "socket.h" #include "migration/register.h" #include "migration/misc.h" #include "qemu-file.h" @@ -53,9 +52,9 @@ #include "migration/colo.h" #include "block.h" #include "sysemu/sysemu.h" -#include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "multifd.h" /***********************************************************/ /* ram save/restore */ @@ -575,991 +574,6 @@ exit: return -1; } -/* Multiple fd's */ - -#define MULTIFD_MAGIC 0x11223344U -#define MULTIFD_VERSION 1 - -#define MULTIFD_FLAG_SYNC (1 << 0) - -/* This value needs to be a multiple of qemu_target_page_size() */ -#define MULTIFD_PACKET_SIZE (512 * 1024) - -typedef struct { - uint32_t magic; - uint32_t version; - unsigned char uuid[16]; /* QemuUUID */ - uint8_t id; - uint8_t unused1[7]; /* Reserved for future use */ - uint64_t unused2[4]; /* Reserved for future use */ -} __attribute__((packed)) MultiFDInit_t; - -typedef struct { - uint32_t magic; - uint32_t version; - uint32_t flags; - /* maximum number of allocated pages */ - uint32_t pages_alloc; - uint32_t pages_used; - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - uint64_t packet_num; - uint64_t unused[4]; /* Reserved for future use */ - char ramblock[256]; - uint64_t offset[]; -} __attribute__((packed)) MultiFDPacket_t; - -typedef struct { - /* number of used pages */ - uint32_t used; - /* number of allocated pages */ - uint32_t allocated; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* offset of each page */ - ram_addr_t *offset; - /* pointer to each page */ - struct iovec *iov; - RAMBlock *block; -} MultiFDPages_t; - -typedef struct { - /* this fields are not changed once the thread is created */ - /* channel number */ - uint8_t id; - /* channel thread name */ - char *name; - /* channel thread id */ - QemuThread thread; - /* communication channel */ - QIOChannel *c; - /* sem where to wait for more work */ - QemuSemaphore sem; - /* this mutex protects the following parameters */ - QemuMutex mutex; - /* is this channel thread running */ - bool running; - /* should this thread finish */ - bool quit; - /* thread has work to do */ - int pending_job; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* packet allocated len */ - uint32_t packet_len; - /* pointer to the packet */ - MultiFDPacket_t *packet; - /* multifd flags for each packet */ - uint32_t flags; - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* thread local variables */ - /* packets sent through this channel */ - uint64_t num_packets; - /* pages sent through this channel */ - uint64_t num_pages; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; -} MultiFDSendParams; - -typedef struct { - /* this fields are not changed once the thread is created */ - /* channel number */ - uint8_t id; - /* channel thread name */ - char *name; - /* channel thread id */ - QemuThread thread; - /* communication channel */ - QIOChannel *c; - /* this mutex protects the following parameters */ - QemuMutex mutex; - /* is this channel thread running */ - bool running; - /* should this thread finish */ - bool quit; - /* array of pages to receive */ - MultiFDPages_t *pages; - /* packet allocated len */ - uint32_t packet_len; - /* pointer to the packet */ - MultiFDPacket_t *packet; - /* multifd flags for each packet */ - uint32_t flags; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* thread local variables */ - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - /* packets sent through this channel */ - uint64_t num_packets; - /* pages sent through this channel */ - uint64_t num_pages; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; -} MultiFDRecvParams; - -static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) -{ - MultiFDInit_t msg = {}; - int ret; - - msg.magic = cpu_to_be32(MULTIFD_MAGIC); - msg.version = cpu_to_be32(MULTIFD_VERSION); - msg.id = p->id; - memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); - - ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - return 0; -} - -static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) -{ - MultiFDInit_t msg; - int ret; - - ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - - msg.magic = be32_to_cpu(msg.magic); - msg.version = be32_to_cpu(msg.version); - - if (msg.magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet magic %x " - "expected %x", msg.magic, MULTIFD_MAGIC); - return -1; - } - - if (msg.version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet version %d " - "expected %d", msg.version, MULTIFD_VERSION); - return -1; - } - - if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { - char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); - char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); - - error_setg(errp, "multifd: received uuid '%s' and expected " - "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); - g_free(uuid); - g_free(msg_uuid); - return -1; - } - - if (msg.id > migrate_multifd_channels()) { - error_setg(errp, "multifd: received channel version %d " - "expected %d", msg.version, MULTIFD_VERSION); - return -1; - } - - return msg.id; -} - -static MultiFDPages_t *multifd_pages_init(size_t size) -{ - MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); - - pages->allocated = size; - pages->iov = g_new0(struct iovec, size); - pages->offset = g_new0(ram_addr_t, size); - - return pages; -} - -static void multifd_pages_clear(MultiFDPages_t *pages) -{ - pages->used = 0; - pages->allocated = 0; - pages->packet_num = 0; - pages->block = NULL; - g_free(pages->iov); - pages->iov = NULL; - g_free(pages->offset); - pages->offset = NULL; - g_free(pages); -} - -static void multifd_send_fill_packet(MultiFDSendParams *p) -{ - MultiFDPacket_t *packet = p->packet; - int i; - - packet->flags = cpu_to_be32(p->flags); - packet->pages_alloc = cpu_to_be32(p->pages->allocated); - packet->pages_used = cpu_to_be32(p->pages->used); - packet->next_packet_size = cpu_to_be32(p->next_packet_size); - packet->packet_num = cpu_to_be64(p->packet_num); - - if (p->pages->block) { - strncpy(packet->ramblock, p->pages->block->idstr, 256); - } - - for (i = 0; i < p->pages->used; i++) { - /* there are architectures where ram_addr_t is 32 bit */ - uint64_t temp = p->pages->offset[i]; - - packet->offset[i] = cpu_to_be64(temp); - } -} - -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) -{ - MultiFDPacket_t *packet = p->packet; - uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - RAMBlock *block; - int i; - - packet->magic = be32_to_cpu(packet->magic); - if (packet->magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet " - "magic %x and expected magic %x", - packet->magic, MULTIFD_MAGIC); - return -1; - } - - packet->version = be32_to_cpu(packet->version); - if (packet->version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet " - "version %d and expected version %d", - packet->version, MULTIFD_VERSION); - return -1; - } - - p->flags = be32_to_cpu(packet->flags); - - packet->pages_alloc = be32_to_cpu(packet->pages_alloc); - /* - * If we received a packet that is 100 times bigger than expected - * just stop migration. It is a magic number. - */ - if (packet->pages_alloc > pages_max * 100) { - error_setg(errp, "multifd: received packet " - "with size %d and expected a maximum size of %d", - packet->pages_alloc, pages_max * 100) ; - return -1; - } - /* - * We received a packet that is bigger than expected but inside - * reasonable limits (see previous comment). Just reallocate. - */ - if (packet->pages_alloc > p->pages->allocated) { - multifd_pages_clear(p->pages); - p->pages = multifd_pages_init(packet->pages_alloc); - } - - p->pages->used = be32_to_cpu(packet->pages_used); - if (p->pages->used > packet->pages_alloc) { - error_setg(errp, "multifd: received packet " - "with %d pages and expected maximum pages are %d", - p->pages->used, packet->pages_alloc) ; - return -1; - } - - p->next_packet_size = be32_to_cpu(packet->next_packet_size); - p->packet_num = be64_to_cpu(packet->packet_num); - - if (p->pages->used == 0) { - return 0; - } - - /* make sure that ramblock is 0 terminated */ - packet->ramblock[255] = 0; - block = qemu_ram_block_by_name(packet->ramblock); - if (!block) { - error_setg(errp, "multifd: unknown ram block %s", - packet->ramblock); - return -1; - } - - for (i = 0; i < p->pages->used; i++) { - uint64_t offset = be64_to_cpu(packet->offset[i]); - - if (offset > (block->used_length - qemu_target_page_size())) { - error_setg(errp, "multifd: offset too long %" PRIu64 - " (max " RAM_ADDR_FMT ")", - offset, block->max_length); - return -1; - } - p->pages->iov[i].iov_base = block->host + offset; - p->pages->iov[i].iov_len = qemu_target_page_size(); - } - - return 0; -} - -struct { - MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* send channels ready */ - QemuSemaphore channels_ready; - /* - * Have we already run terminate threads. There is a race when it - * happens that we got one error while we are exiting. - * We will use atomic operations. Only valid values are 0 and 1. - */ - int exiting; -} *multifd_send_state; - -/* - * How we use multifd_send_state->pages and channel->pages? - * - * We create a pages for each channel, and a main one. Each time that - * we need to send a batch of pages we interchange the ones between - * multifd_send_state and the channel that is sending it. There are - * two reasons for that: - * - to not have to do so many mallocs during migration - * - to make easier to know what to free at the end of migration - * - * This way we always know who is the owner of each "pages" struct, - * and we don't need any locking. It belongs to the migration thread - * or to the channel thread. Switching is safe because the migration - * thread is using the channel mutex when changing it, and the channel - * have to had finish with its own, otherwise pending_job can't be - * false. - */ - -static int multifd_send_pages(QEMUFile *f) -{ - int i; - static int next_channel; - MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; - uint64_t transferred; - - if (atomic_read(&multifd_send_state->exiting)) { - return -1; - } - - qemu_sem_wait(&multifd_send_state->channels_ready); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { - p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (p->quit) { - error_report("%s: channel %d has already quit!", __func__, i); - qemu_mutex_unlock(&p->mutex); - return -1; - } - if (!p->pending_job) { - p->pending_job++; - next_channel = (i + 1) % migrate_multifd_channels(); - break; - } - qemu_mutex_unlock(&p->mutex); - } - assert(!p->pages->used); - assert(!p->pages->block); - - p->packet_num = multifd_send_state->packet_num++; - multifd_send_state->pages = p->pages; - p->pages = pages; - transferred = ((uint64_t) pages->used) * qemu_target_page_size() - + p->packet_len; - qemu_file_update_transfer(f, transferred); - ram_counters.multifd_bytes += transferred; - ram_counters.transferred += transferred;; - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - - return 1; -} - -static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) -{ - MultiFDPages_t *pages = multifd_send_state->pages; - - if (!pages->block) { - pages->block = block; - } - - if (pages->block == block) { - pages->offset[pages->used] = offset; - pages->iov[pages->used].iov_base = block->host + offset; - pages->iov[pages->used].iov_len = qemu_target_page_size(); - pages->used++; - - if (pages->used < pages->allocated) { - return 1; - } - } - - if (multifd_send_pages(f) < 0) { - return -1; - } - - if (pages->block != block) { - return multifd_queue_page(f, block, offset); - } - - return 1; -} - -static void multifd_send_terminate_threads(Error *err) -{ - int i; - - trace_multifd_send_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_PRE_SWITCHOVER || - s->state == MIGRATION_STATUS_DEVICE || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - /* - * We don't want to exit each threads twice. Depending on where - * we get the error, or if there are two independent errors in two - * threads at the same time, we can end calling this function - * twice. - */ - if (atomic_xchg(&multifd_send_state->exiting, 1)) { - return; - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - qemu_sem_post(&p->sem); - qemu_mutex_unlock(&p->mutex); - } -} - -void multifd_save_cleanup(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - if (p->running) { - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - socket_send_channel_destroy(p->c); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - } - qemu_sem_destroy(&multifd_send_state->channels_ready); - g_free(multifd_send_state->params); - multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; - g_free(multifd_send_state); - multifd_send_state = NULL; -} - -static void multifd_send_sync_main(QEMUFile *f) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - if (multifd_send_state->pages->used) { - if (multifd_send_pages(f) < 0) { - error_report("%s: multifd_send_pages fail", __func__); - return; - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_signal(p->id); - - qemu_mutex_lock(&p->mutex); - - if (p->quit) { - error_report("%s: channel %d has already quit", __func__, i); - qemu_mutex_unlock(&p->mutex); - return; - } - - p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; - qemu_file_update_transfer(f, p->packet_len); - ram_counters.multifd_bytes += p->packet_len; - ram_counters.transferred += p->packet_len; - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_wait(p->id); - qemu_sem_wait(&p->sem_sync); - } - trace_multifd_send_sync_main(multifd_send_state->packet_num); -} - -static void *multifd_send_thread(void *opaque) -{ - MultiFDSendParams *p = opaque; - Error *local_err = NULL; - int ret = 0; - uint32_t flags = 0; - - trace_multifd_send_thread_start(p->id); - rcu_register_thread(); - - if (multifd_send_initial_packet(p, &local_err) < 0) { - ret = -1; - goto out; - } - /* initial packet */ - p->num_packets = 1; - - while (true) { - qemu_sem_wait(&p->sem); - - if (atomic_read(&multifd_send_state->exiting)) { - break; - } - qemu_mutex_lock(&p->mutex); - - if (p->pending_job) { - uint32_t used = p->pages->used; - uint64_t packet_num = p->packet_num; - flags = p->flags; - - p->next_packet_size = used * qemu_target_page_size(); - multifd_send_fill_packet(p); - p->flags = 0; - p->num_packets++; - p->num_pages += used; - p->pages->used = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); - - trace_multifd_send(p->id, packet_num, used, flags, - p->next_packet_size); - - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret != 0) { - break; - } - - if (used) { - ret = qio_channel_writev_all(p->c, p->pages->iov, - used, &local_err); - if (ret != 0) { - break; - } - } - - qemu_mutex_lock(&p->mutex); - p->pending_job--; - qemu_mutex_unlock(&p->mutex); - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&p->sem_sync); - } - qemu_sem_post(&multifd_send_state->channels_ready); - } else if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } else { - qemu_mutex_unlock(&p->mutex); - /* sometimes there are spurious wakeups */ - } - } - -out: - if (local_err) { - trace_multifd_send_error(p->id); - multifd_send_terminate_threads(local_err); - } - - /* - * Error happen, I will exit, but I can't just leave, tell - * who pay attention to me. - */ - if (ret != 0) { - qemu_sem_post(&p->sem_sync); - qemu_sem_post(&multifd_send_state->channels_ready); - } - - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); - - return NULL; -} - -static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); - Error *local_err = NULL; - - trace_multifd_new_send_channel_async(p->id); - if (qio_task_propagate_error(task, &local_err)) { - migrate_set_error(migrate_get_current(), local_err); - /* Error happen, we need to tell who pay attention to me */ - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - /* - * Although multifd_send_thread is not created, but main migration - * thread neet to judge whether it is running, so we need to mark - * its status. - */ - p->quit = true; - } else { - p->c = QIO_CHANNEL(sioc); - qio_channel_set_delay(p->c, false); - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); - } -} - -int multifd_save_setup(Error **errp) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - thread_count = migrate_multifd_channels(); - multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); - multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); - multifd_send_state->pages = multifd_pages_init(page_count); - qemu_sem_init(&multifd_send_state->channels_ready, 0); - atomic_set(&multifd_send_state->exiting, 0); - - for (i = 0; i < thread_count; i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem, 0); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->pending_job = 0; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); - p->name = g_strdup_printf("multifdsend_%d", i); - socket_send_channel_create(multifd_new_send_channel_async, p); - } - return 0; -} - -struct { - MultiFDRecvParams *params; - /* number of created threads */ - int count; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; - /* global number of generated multifd packets */ - uint64_t packet_num; -} *multifd_recv_state; - -static void multifd_recv_terminate_threads(Error *err) -{ - int i; - - trace_multifd_recv_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - /* - * We could arrive here for two reasons: - * - normal quit, i.e. everything went fine, just finished - * - error quit: We close the channels so the channel threads - * finish the qio_channel_read_all_eof() - */ - if (p->c) { - qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - } - qemu_mutex_unlock(&p->mutex); - } -} - -int multifd_load_cleanup(Error **errp) -{ - int i; - int ret = 0; - - if (!migrate_use_multifd()) { - return 0; - } - multifd_recv_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - if (p->running) { - p->quit = true; - /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. - */ - qemu_sem_post(&p->sem_sync); - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - object_unref(OBJECT(p->c)); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - } - qemu_sem_destroy(&multifd_recv_state->sem_sync); - g_free(multifd_recv_state->params); - multifd_recv_state->params = NULL; - g_free(multifd_recv_state); - multifd_recv_state = NULL; - - return ret; -} - -static void multifd_recv_sync_main(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - trace_multifd_recv_sync_main_wait(p->id); - qemu_sem_wait(&multifd_recv_state->sem_sync); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (multifd_recv_state->packet_num < p->packet_num) { - multifd_recv_state->packet_num = p->packet_num; - } - qemu_mutex_unlock(&p->mutex); - trace_multifd_recv_sync_main_signal(p->id); - qemu_sem_post(&p->sem_sync); - } - trace_multifd_recv_sync_main(multifd_recv_state->packet_num); -} - -static void *multifd_recv_thread(void *opaque) -{ - MultiFDRecvParams *p = opaque; - Error *local_err = NULL; - int ret; - - trace_multifd_recv_thread_start(p->id); - rcu_register_thread(); - - while (true) { - uint32_t used; - uint32_t flags; - - if (p->quit) { - break; - } - - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0) { /* EOF */ - break; - } - if (ret == -1) { /* Error */ - break; - } - - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { - qemu_mutex_unlock(&p->mutex); - break; - } - - used = p->pages->used; - flags = p->flags; - trace_multifd_recv(p->id, p->packet_num, used, flags, - p->next_packet_size); - p->num_packets++; - p->num_pages += used; - qemu_mutex_unlock(&p->mutex); - - if (used) { - ret = qio_channel_readv_all(p->c, p->pages->iov, - used, &local_err); - if (ret != 0) { - break; - } - } - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&multifd_recv_state->sem_sync); - qemu_sem_wait(&p->sem_sync); - } - } - - if (local_err) { - multifd_recv_terminate_threads(local_err); - } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); - - return NULL; -} - -int multifd_load_setup(Error **errp) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - thread_count = migrate_multifd_channels(); - multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); - multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); - atomic_set(&multifd_recv_state->count, 0); - qemu_sem_init(&multifd_recv_state->sem_sync, 0); - - for (i = 0; i < thread_count; i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->name = g_strdup_printf("multifdrecv_%d", i); - } - return 0; -} - -bool multifd_recv_all_channels_created(void) -{ - int thread_count = migrate_multifd_channels(); - - if (!migrate_use_multifd()) { - return true; - } - - return thread_count == atomic_read(&multifd_recv_state->count); -} - -/* - * Try to receive all multifd channels to get ready for the migration. - * - Return true and do not set @errp when correctly receving all channels; - * - Return false and do not set @errp when correctly receiving the current one; - * - Return false and set @errp when failing to receive the current channel. - */ -bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) -{ - MultiFDRecvParams *p; - Error *local_err = NULL; - int id; - - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - atomic_read(&multifd_recv_state->count)); - return false; - } - trace_multifd_recv_new_channel(id); - - p = &multifd_recv_state->params[id]; - if (p->c != NULL) { - error_setg(&local_err, "multifd: received id '%d' already setup'", - id); - multifd_recv_terminate_threads(local_err); - error_propagate(errp, local_err); - return false; - } - p->c = ioc; - object_ref(OBJECT(ioc)); - /* initial packet */ - p->num_packets = 1; - - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, - QEMU_THREAD_JOINABLE); - atomic_inc(&multifd_recv_state->count); - return atomic_read(&multifd_recv_state->count) == - migrate_multifd_channels(); -} - /** * save_page_header: write page header to wire * diff --git a/migration/ram.h b/migration/ram.h index 42be471d52..a553d40751 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -41,13 +41,6 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_total(void); -int multifd_save_setup(Error **errp); -void multifd_save_cleanup(void); -int multifd_load_setup(Error **errp); -int multifd_load_cleanup(Error **errp); -bool multifd_recv_all_channels_created(void); -bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); - uint64_t ram_pagesize_summary(void); int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len); void acct_update_position(QEMUFile *f, size_t size, bool zero); From a085664f21d80b3bc4d052a8e9c372abba2d6c38 Mon Sep 17 00:00:00 2001 From: Eric Auger Date: Sat, 25 Jan 2020 18:24:49 +0100 Subject: [PATCH 17/18] migration: Simplify get_qlist Instead of inserting read elements at the head and then reversing the list, it is simpler to add each element after the previous one. Introduce QLIST_RAW_INSERT_AFTER helper and use it in get_qlist(). Signed-off-by: Eric Auger Suggested-by: Juan Quintela Reviewed-by: Juan Quintela Signed-off-by: Juan Quintela --- include/qemu/queue.h | 19 ++++++------------- migration/vmstate-types.c | 10 +++++++--- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/include/qemu/queue.h b/include/qemu/queue.h index 4d4554a7ce..19425f973f 100644 --- a/include/qemu/queue.h +++ b/include/qemu/queue.h @@ -515,6 +515,12 @@ union { \ (elm); \ (elm) = *QLIST_RAW_NEXT(elm, entry)) +#define QLIST_RAW_INSERT_AFTER(head, prev, elem, entry) do { \ + *QLIST_RAW_NEXT(prev, entry) = elem; \ + *QLIST_RAW_PREVIOUS(elem, entry) = QLIST_RAW_NEXT(prev, entry); \ + *QLIST_RAW_NEXT(elem, entry) = NULL; \ +} while (0) + #define QLIST_RAW_INSERT_HEAD(head, elm, entry) do { \ void *first = *QLIST_RAW_FIRST(head); \ *QLIST_RAW_FIRST(head) = elm; \ @@ -527,17 +533,4 @@ union { \ } \ } while (0) -#define QLIST_RAW_REVERSE(head, elm, entry) do { \ - void *iter = *QLIST_RAW_FIRST(head), *prev = NULL, *next; \ - while (iter) { \ - next = *QLIST_RAW_NEXT(iter, entry); \ - *QLIST_RAW_PREVIOUS(iter, entry) = QLIST_RAW_NEXT(next, entry); \ - *QLIST_RAW_NEXT(iter, entry) = prev; \ - prev = iter; \ - iter = next; \ - } \ - *QLIST_RAW_FIRST(head) = prev; \ - *QLIST_RAW_PREVIOUS(prev, entry) = QLIST_RAW_FIRST(head); \ -} while (0) - #endif /* QEMU_SYS_QUEUE_H */ diff --git a/migration/vmstate-types.c b/migration/vmstate-types.c index 1eee36773a..35e784c9d9 100644 --- a/migration/vmstate-types.c +++ b/migration/vmstate-types.c @@ -879,7 +879,7 @@ static int get_qlist(QEMUFile *f, void *pv, size_t unused_size, /* offset of the QLIST entry in a QLIST element */ size_t entry_offset = field->start; int version_id = field->version_id; - void *elm; + void *elm, *prev = NULL; trace_get_qlist(field->name, vmsd->name, vmsd->version_id); if (version_id > vmsd->version_id) { @@ -900,9 +900,13 @@ static int get_qlist(QEMUFile *f, void *pv, size_t unused_size, g_free(elm); return ret; } - QLIST_RAW_INSERT_HEAD(pv, elm, entry_offset); + if (!prev) { + QLIST_RAW_INSERT_HEAD(pv, elm, entry_offset); + } else { + QLIST_RAW_INSERT_AFTER(pv, prev, elm, entry_offset); + } + prev = elm; } - QLIST_RAW_REVERSE(pv, elm, entry_offset); trace_get_qlist_end(field->name, vmsd->name); return ret; From 42d24611afc7610808ecb8770cf40e84714dd28e Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Sat, 12 Oct 2019 10:39:31 +0800 Subject: [PATCH 18/18] migration/compress: compress QEMUFile is not writable We open a file with empty_ops for compress QEMUFile, which means this is not writable. Signed-off-by: Wei Yang Reviewed-by: Juan Quintela Signed-off-by: Juan Quintela --- migration/qemu-file.c | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/migration/qemu-file.c b/migration/qemu-file.c index bbb2b63927..1c3a358a14 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -764,11 +764,8 @@ static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, /* Compress size bytes of data start at p and store the compressed * data to the buffer of f. * - * When f is not writable, return -1 if f has no space to save the - * compressed data. - * When f is wirtable and it has no space to save the compressed data, - * do fflush first, if f still has no space to save the compressed - * data, return -1. + * Since the file is dummy file with empty_ops, return -1 if f has no space to + * save the compressed data. */ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, const uint8_t *p, size_t size) @@ -776,14 +773,7 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); if (blen < compressBound(size)) { - if (!qemu_file_is_writable(f)) { - return -1; - } - qemu_fflush(f); - blen = IO_BUF_SIZE - sizeof(int32_t); - if (blen < compressBound(size)) { - return -1; - } + return -1; } blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),