Migration pull request

- Steve's cleanup of unused variable
 - Peter Maydell's fixes for several leaks in migration-test
 - Fabiano's flexibilization of multifd data structures for device
   state migration
 - Arman Nabiev's fix for ppc e500 migration
 - Thomas' fix for migration-test vs. --without-default-devices
 -----BEGIN PGP SIGNATURE-----
 
 iQJEBAABCAAuFiEEqhtIsKIjJqWkw2TPx5jcdBvsMZ0FAmbYVXwQHGZhcm9zYXNA
 c3VzZS5kZQAKCRDHmNx0G+wxnRucEAC1vo046UGdUmbb4PaF5vKAg97io6RB2nrH
 HMz56Yc0AcAKRUGwe2Z80e2jY8B6zi8Ha8b9l7cVsej095eGCF+tINIL4wRX4lHm
 alDY/LkhuqjE5g5c/DaeTztyBOFLvdWHPU5eJyDOC9r7kSlnUcL1gAslH23b8uL0
 xvhPVKaTWjGIzNL1q/XfBr1WgRGqfD6dYb32HJDTq85yOnUT5sEr55aoEEu0euKh
 MYbXPmi5AMbrp8nP21kzUopX8iYERRdoKwhF0ZssciGi/qJVevH70tNdbDEQSxyp
 +vtP54TnL3LrzD4uY5Snng9zT9h0QrZujY79OEcxu20U0s29OQaudWkIjp7yLLUv
 UnPZHS+bIyaS53DdpV94GKGGBX1wrjGC/sn8eGYzmb2yMlMjLTBoE8L5r9cadshX
 XTeF4MtKGqaS3xDM2fIgACHHFl6qr/l0nENspv0raFzpf9Jx/WbpekghvTuWN6/B
 pZHnoOTNiAqXS/Rnyy829vsQ0Pw4hi6wx79Z73RP+35ubZTgTmOsQx9f2FjuEh6k
 JS+q9k4VJ+nntUWsYn4GS1Jlt+FXJ2hfzNj1NNFN4xLT1oioc6pCHsQyV7SBArB1
 ml2zYyfKCTC3riIRhcv/ew6OcKbhHcPFOpd/v0y40LO3mx8S0LZnUWXkcrl3XIZS
 Mj5CBdlFgA==
 =SRN4
 -----END PGP SIGNATURE-----

Merge tag 'migration-20240904-pull-request' of https://gitlab.com/farosas/qemu into staging

Migration pull request

- Steve's cleanup of unused variable
- Peter Maydell's fixes for several leaks in migration-test
- Fabiano's flexibilization of multifd data structures for device
  state migration
- Arman Nabiev's fix for ppc e500 migration
- Thomas' fix for migration-test vs. --without-default-devices

# -----BEGIN PGP SIGNATURE-----
#
# iQJEBAABCAAuFiEEqhtIsKIjJqWkw2TPx5jcdBvsMZ0FAmbYVXwQHGZhcm9zYXNA
# c3VzZS5kZQAKCRDHmNx0G+wxnRucEAC1vo046UGdUmbb4PaF5vKAg97io6RB2nrH
# HMz56Yc0AcAKRUGwe2Z80e2jY8B6zi8Ha8b9l7cVsej095eGCF+tINIL4wRX4lHm
# alDY/LkhuqjE5g5c/DaeTztyBOFLvdWHPU5eJyDOC9r7kSlnUcL1gAslH23b8uL0
# xvhPVKaTWjGIzNL1q/XfBr1WgRGqfD6dYb32HJDTq85yOnUT5sEr55aoEEu0euKh
# MYbXPmi5AMbrp8nP21kzUopX8iYERRdoKwhF0ZssciGi/qJVevH70tNdbDEQSxyp
# +vtP54TnL3LrzD4uY5Snng9zT9h0QrZujY79OEcxu20U0s29OQaudWkIjp7yLLUv
# UnPZHS+bIyaS53DdpV94GKGGBX1wrjGC/sn8eGYzmb2yMlMjLTBoE8L5r9cadshX
# XTeF4MtKGqaS3xDM2fIgACHHFl6qr/l0nENspv0raFzpf9Jx/WbpekghvTuWN6/B
# pZHnoOTNiAqXS/Rnyy829vsQ0Pw4hi6wx79Z73RP+35ubZTgTmOsQx9f2FjuEh6k
# JS+q9k4VJ+nntUWsYn4GS1Jlt+FXJ2hfzNj1NNFN4xLT1oioc6pCHsQyV7SBArB1
# ml2zYyfKCTC3riIRhcv/ew6OcKbhHcPFOpd/v0y40LO3mx8S0LZnUWXkcrl3XIZS
# Mj5CBdlFgA==
# =SRN4
# -----END PGP SIGNATURE-----
# gpg: Signature made Wed 04 Sep 2024 13:41:32 BST
# gpg:                using RSA key AA1B48B0A22326A5A4C364CFC798DC741BEC319D
# gpg:                issuer "farosas@suse.de"
# gpg: Good signature from "Fabiano Rosas <farosas@suse.de>" [unknown]
# gpg:                 aka "Fabiano Almeida Rosas <fabiano.rosas@suse.com>" [unknown]
# gpg: WARNING: This key is not certified with a trusted signature!
# gpg:          There is no indication that the signature belongs to the owner.
# Primary key fingerprint: AA1B 48B0 A223 26A5 A4C3  64CF C798 DC74 1BEC 319D

* tag 'migration-20240904-pull-request' of https://gitlab.com/farosas/qemu: (34 commits)
  tests/qtest/migration: Add a check for the availability of the "pc" machine
  target/ppc: Fix migration of CPUs with TLB_EMB TLB type
  migration/multifd: Add documentation for multifd methods
  migration/multifd: Add a couple of asserts for p->iov
  migration/multifd: Fix p->iov leak in multifd-uadk.c
  migration/multifd: Stop changing the packet on recv side
  migration/multifd: Make MultiFDMethods const
  migration/multifd: Move nocomp code into multifd-nocomp.c
  migration/multifd: Register nocomp ops dynamically
  migration/multifd: Standardize on multifd ops names
  migration/multifd: Allow multifd sync without flush
  migration/multifd: Replace multifd_send_state->pages with client data
  migration/multifd: Don't send ram data during SYNC
  migration/multifd: Isolate ram pages packet data
  migration/multifd: Remove total pages tracing
  migration/multifd: Move pages accounting into multifd_send_zero_page_detect()
  migration/multifd: Replace p->pages with an union pointer
  migration/multifd: Make MultiFDPages_t:offset a flexible array member
  migration/multifd: Introduce MultiFDSendData
  migration/multifd: Pass in MultiFDPages_t to file_write_ramblock_iov
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2024-09-06 12:33:07 +01:00
commit becd694497
21 changed files with 773 additions and 931 deletions

View File

@ -196,12 +196,13 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp)
}
int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
int niov, RAMBlock *block, Error **errp)
int niov, MultiFDPages_t *pages, Error **errp)
{
ssize_t ret = 0;
int i, slice_idx, slice_num;
uintptr_t base, next, offset;
size_t len;
RAMBlock *block = pages->block;
slice_idx = 0;
slice_num = 1;

View File

@ -21,6 +21,6 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp);
void file_cleanup_outgoing_migration(void);
bool file_send_channel_create(gpointer opaque, Error **errp);
int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
int niov, RAMBlock *block, Error **errp);
int niov, MultiFDPages_t *pages, Error **errp);
int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp);
#endif

View File

@ -21,6 +21,7 @@ system_ss.add(files(
'migration-hmp-cmds.c',
'migration.c',
'multifd.c',
'multifd-nocomp.c',
'multifd-zlib.c',
'multifd-zero-page.c',
'options.c',

389
migration/multifd-nocomp.c Normal file
View File

@ -0,0 +1,389 @@
/*
* Multifd RAM migration without compression
*
* Copyright (c) 2019-2020 Red Hat Inc
*
* Authors:
* Juan Quintela <quintela@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#include "qemu/osdep.h"
#include "exec/ramblock.h"
#include "exec/target_page.h"
#include "file.h"
#include "multifd.h"
#include "options.h"
#include "qapi/error.h"
#include "qemu/error-report.h"
#include "trace.h"
static MultiFDSendData *multifd_ram_send;
size_t multifd_ram_payload_size(void)
{
uint32_t n = multifd_ram_page_count();
/*
* We keep an array of page offsets at the end of MultiFDPages_t,
* add space for it in the allocation.
*/
return sizeof(MultiFDPages_t) + n * sizeof(ram_addr_t);
}
void multifd_ram_save_setup(void)
{
multifd_ram_send = multifd_send_data_alloc();
}
void multifd_ram_save_cleanup(void)
{
g_free(multifd_ram_send);
multifd_ram_send = NULL;
}
static void multifd_set_file_bitmap(MultiFDSendParams *p)
{
MultiFDPages_t *pages = &p->data->u.ram;
assert(pages->block);
for (int i = 0; i < pages->normal_num; i++) {
ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
}
for (int i = pages->normal_num; i < pages->num; i++) {
ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
}
}
static int multifd_nocomp_send_setup(MultiFDSendParams *p, Error **errp)
{
uint32_t page_count = multifd_ram_page_count();
if (migrate_zero_copy_send()) {
p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
}
if (!migrate_mapped_ram()) {
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
} else {
p->iov = g_new0(struct iovec, page_count);
}
return 0;
}
static void multifd_nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
{
g_free(p->iov);
p->iov = NULL;
return;
}
static void multifd_send_prepare_iovs(MultiFDSendParams *p)
{
MultiFDPages_t *pages = &p->data->u.ram;
uint32_t page_size = multifd_ram_page_size();
for (int i = 0; i < pages->normal_num; i++) {
p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
p->iov[p->iovs_num].iov_len = page_size;
p->iovs_num++;
}
p->next_packet_size = pages->normal_num * page_size;
}
static int multifd_nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
bool use_zero_copy_send = migrate_zero_copy_send();
int ret;
multifd_send_zero_page_detect(p);
if (migrate_mapped_ram()) {
multifd_send_prepare_iovs(p);
multifd_set_file_bitmap(p);
return 0;
}
if (!use_zero_copy_send) {
/*
* Only !zerocopy needs the header in IOV; zerocopy will
* send it separately.
*/
multifd_send_prepare_header(p);
}
multifd_send_prepare_iovs(p);
p->flags |= MULTIFD_FLAG_NOCOMP;
multifd_send_fill_packet(p);
if (use_zero_copy_send) {
/* Send header first, without zerocopy */
ret = qio_channel_write_all(p->c, (void *)p->packet,
p->packet_len, errp);
if (ret != 0) {
return -1;
}
}
return 0;
}
static int multifd_nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
{
p->iov = g_new0(struct iovec, multifd_ram_page_count());
return 0;
}
static void multifd_nocomp_recv_cleanup(MultiFDRecvParams *p)
{
g_free(p->iov);
p->iov = NULL;
}
static int multifd_nocomp_recv(MultiFDRecvParams *p, Error **errp)
{
uint32_t flags;
if (migrate_mapped_ram()) {
return multifd_file_recv_data(p, errp);
}
flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
if (flags != MULTIFD_FLAG_NOCOMP) {
error_setg(errp, "multifd %u: flags received %x flags expected %x",
p->id, flags, MULTIFD_FLAG_NOCOMP);
return -1;
}
multifd_recv_zero_page_process(p);
if (!p->normal_num) {
return 0;
}
for (int i = 0; i < p->normal_num; i++) {
p->iov[i].iov_base = p->host + p->normal[i];
p->iov[i].iov_len = multifd_ram_page_size();
ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
}
return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
}
static void multifd_pages_reset(MultiFDPages_t *pages)
{
/*
* We don't need to touch offset[] array, because it will be
* overwritten later when reused.
*/
pages->num = 0;
pages->normal_num = 0;
pages->block = NULL;
}
void multifd_ram_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
MultiFDPages_t *pages = &p->data->u.ram;
uint32_t zero_num = pages->num - pages->normal_num;
packet->pages_alloc = cpu_to_be32(multifd_ram_page_count());
packet->normal_pages = cpu_to_be32(pages->normal_num);
packet->zero_pages = cpu_to_be32(zero_num);
if (pages->block) {
strncpy(packet->ramblock, pages->block->idstr, 256);
}
for (int i = 0; i < pages->num; i++) {
/* there are architectures where ram_addr_t is 32 bit */
uint64_t temp = pages->offset[i];
packet->offset[i] = cpu_to_be64(temp);
}
trace_multifd_send_ram_fill(p->id, pages->normal_num,
zero_num);
}
int multifd_ram_unfill_packet(MultiFDRecvParams *p, Error **errp)
{
MultiFDPacket_t *packet = p->packet;
uint32_t page_count = multifd_ram_page_count();
uint32_t page_size = multifd_ram_page_size();
uint32_t pages_per_packet = be32_to_cpu(packet->pages_alloc);
int i;
if (pages_per_packet > page_count) {
error_setg(errp, "multifd: received packet with %u pages, expected %u",
pages_per_packet, page_count);
return -1;
}
p->normal_num = be32_to_cpu(packet->normal_pages);
if (p->normal_num > pages_per_packet) {
error_setg(errp, "multifd: received packet with %u non-zero pages, "
"which exceeds maximum expected pages %u",
p->normal_num, pages_per_packet);
return -1;
}
p->zero_num = be32_to_cpu(packet->zero_pages);
if (p->zero_num > pages_per_packet - p->normal_num) {
error_setg(errp,
"multifd: received packet with %u zero pages, expected maximum %u",
p->zero_num, pages_per_packet - p->normal_num);
return -1;
}
if (p->normal_num == 0 && p->zero_num == 0) {
return 0;
}
/* make sure that ramblock is 0 terminated */
packet->ramblock[255] = 0;
p->block = qemu_ram_block_by_name(packet->ramblock);
if (!p->block) {
error_setg(errp, "multifd: unknown ram block %s",
packet->ramblock);
return -1;
}
p->host = p->block->host;
for (i = 0; i < p->normal_num; i++) {
uint64_t offset = be64_to_cpu(packet->offset[i]);
if (offset > (p->block->used_length - page_size)) {
error_setg(errp, "multifd: offset too long %" PRIu64
" (max " RAM_ADDR_FMT ")",
offset, p->block->used_length);
return -1;
}
p->normal[i] = offset;
}
for (i = 0; i < p->zero_num; i++) {
uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
if (offset > (p->block->used_length - page_size)) {
error_setg(errp, "multifd: offset too long %" PRIu64
" (max " RAM_ADDR_FMT ")",
offset, p->block->used_length);
return -1;
}
p->zero[i] = offset;
}
return 0;
}
static inline bool multifd_queue_empty(MultiFDPages_t *pages)
{
return pages->num == 0;
}
static inline bool multifd_queue_full(MultiFDPages_t *pages)
{
return pages->num == multifd_ram_page_count();
}
static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
{
pages->offset[pages->num++] = offset;
}
/* Returns true if enqueue successful, false otherwise */
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
{
MultiFDPages_t *pages;
retry:
pages = &multifd_ram_send->u.ram;
if (multifd_payload_empty(multifd_ram_send)) {
multifd_pages_reset(pages);
multifd_set_payload_type(multifd_ram_send, MULTIFD_PAYLOAD_RAM);
}
/* If the queue is empty, we can already enqueue now */
if (multifd_queue_empty(pages)) {
pages->block = block;
multifd_enqueue(pages, offset);
return true;
}
/*
* Not empty, meanwhile we need a flush. It can because of either:
*
* (1) The page is not on the same ramblock of previous ones, or,
* (2) The queue is full.
*
* After flush, always retry.
*/
if (pages->block != block || multifd_queue_full(pages)) {
if (!multifd_send(&multifd_ram_send)) {
return false;
}
goto retry;
}
/* Not empty, and we still have space, do it! */
multifd_enqueue(pages, offset);
return true;
}
int multifd_ram_flush_and_sync(void)
{
if (!migrate_multifd()) {
return 0;
}
if (!multifd_payload_empty(multifd_ram_send)) {
if (!multifd_send(&multifd_ram_send)) {
error_report("%s: multifd_send fail", __func__);
return -1;
}
}
return multifd_send_sync_main();
}
bool multifd_send_prepare_common(MultiFDSendParams *p)
{
MultiFDPages_t *pages = &p->data->u.ram;
multifd_send_zero_page_detect(p);
if (!pages->normal_num) {
p->next_packet_size = 0;
return false;
}
multifd_send_prepare_header(p);
return true;
}
static const MultiFDMethods multifd_nocomp_ops = {
.send_setup = multifd_nocomp_send_setup,
.send_cleanup = multifd_nocomp_send_cleanup,
.send_prepare = multifd_nocomp_send_prepare,
.recv_setup = multifd_nocomp_recv_setup,
.recv_cleanup = multifd_nocomp_recv_cleanup,
.recv = multifd_nocomp_recv
};
static void multifd_nocomp_register(void)
{
multifd_register_ops(MULTIFD_COMPRESSION_NONE, &multifd_nocomp_ops);
}
migration_init(multifd_nocomp_register);

View File

@ -220,21 +220,13 @@ static void multifd_qpl_deinit(QplData *qpl)
}
}
/**
* multifd_qpl_send_setup: set up send side
*
* Set up the channel with QPL compression.
*
* Returns 0 on success or -1 on error
*
* @p: Params for the channel being used
* @errp: pointer to an error
*/
static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
{
QplData *qpl;
uint32_t page_size = multifd_ram_page_size();
uint32_t page_count = multifd_ram_page_count();
qpl = multifd_qpl_init(p->page_count, p->page_size, errp);
qpl = multifd_qpl_init(page_count, page_size, errp);
if (!qpl) {
return -1;
}
@ -245,18 +237,10 @@ static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
* additional two IOVs are used to store packet header and compressed data
* length
*/
p->iov = g_new0(struct iovec, p->page_count + 2);
p->iov = g_new0(struct iovec, page_count + 2);
return 0;
}
/**
* multifd_qpl_send_cleanup: clean up send side
*
* Close the channel and free memory.
*
* @p: Params for the channel being used
* @errp: pointer to an error
*/
static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
{
multifd_qpl_deinit(p->compress_data);
@ -404,13 +388,14 @@ retry:
static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
{
QplData *qpl = p->compress_data;
MultiFDPages_t *pages = &p->data->u.ram;
uint32_t size = p->page_size;
qpl_job *job = qpl->sw_job;
uint8_t *zbuf = qpl->zbuf;
uint8_t *buf;
for (int i = 0; i < p->pages->normal_num; i++) {
buf = p->pages->block->host + p->pages->offset[i];
for (int i = 0; i < pages->normal_num; i++) {
buf = pages->block->host + pages->offset[i];
multifd_qpl_prepare_comp_job(job, buf, zbuf, size);
if (qpl_execute_job(job) == QPL_STS_OK) {
multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
@ -434,7 +419,7 @@ static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
static void multifd_qpl_compress_pages(MultiFDSendParams *p)
{
QplData *qpl = p->compress_data;
MultiFDPages_t *pages = p->pages;
MultiFDPages_t *pages = &p->data->u.ram;
uint32_t size = p->page_size;
QplHwJob *hw_job;
uint8_t *buf;
@ -484,20 +469,10 @@ static void multifd_qpl_compress_pages(MultiFDSendParams *p)
}
}
/**
* multifd_qpl_send_prepare: prepare data to be able to send
*
* Create a compressed buffer with all the pages that we are going to
* send.
*
* Returns 0 on success or -1 on error
*
* @p: Params for the channel being used
* @errp: pointer to an error
*/
static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
{
QplData *qpl = p->compress_data;
MultiFDPages_t *pages = &p->data->u.ram;
uint32_t len = 0;
if (!multifd_send_prepare_common(p)) {
@ -505,7 +480,7 @@ static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
}
/* The first IOV is used to store the compressed page lengths */
len = p->pages->normal_num * sizeof(uint32_t);
len = pages->normal_num * sizeof(uint32_t);
multifd_qpl_fill_iov(p, (uint8_t *) qpl->zlen, len);
if (qpl->hw_avail) {
multifd_qpl_compress_pages(p);
@ -519,21 +494,13 @@ out:
return 0;
}
/**
* multifd_qpl_recv_setup: set up receive side
*
* Create the compressed channel and buffer.
*
* Returns 0 on success or -1 on error
*
* @p: Params for the channel being used
* @errp: pointer to an error
*/
static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
{
QplData *qpl;
uint32_t page_size = multifd_ram_page_size();
uint32_t page_count = multifd_ram_page_count();
qpl = multifd_qpl_init(p->page_count, p->page_size, errp);
qpl = multifd_qpl_init(page_count, page_size, errp);
if (!qpl) {
return -1;
}
@ -541,13 +508,6 @@ static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
return 0;
}
/**
* multifd_qpl_recv_cleanup: set up receive side
*
* Close the channel and free memory.
*
* @p: Params for the channel being used
*/
static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
{
multifd_qpl_deinit(p->compress_data);
@ -688,17 +648,6 @@ static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error **errp)
}
return 0;
}
/**
* multifd_qpl_recv: read the data from the channel into actual pages
*
* Read the compressed buffer, and uncompress it into the actual
* pages.
*
* Returns 0 on success or -1 on error
*
* @p: Params for the channel being used
* @errp: pointer to an error
*/
static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
{
QplData *qpl = p->compress_data;
@ -745,7 +694,7 @@ static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
return multifd_qpl_decompress_pages_slow_path(p, errp);
}
static MultiFDMethods multifd_qpl_ops = {
static const MultiFDMethods multifd_qpl_ops = {
.send_setup = multifd_qpl_send_setup,
.send_cleanup = multifd_qpl_send_cleanup,
.send_prepare = multifd_qpl_send_prepare,

View File

@ -103,19 +103,13 @@ static void multifd_uadk_uninit_sess(struct wd_data *wd)
g_free(wd);
}
/**
* multifd_uadk_send_setup: setup send side
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int multifd_uadk_send_setup(MultiFDSendParams *p, Error **errp)
{
struct wd_data *wd;
uint32_t page_size = multifd_ram_page_size();
uint32_t page_count = multifd_ram_page_count();
wd = multifd_uadk_init_sess(p->page_count, p->page_size, true, errp);
wd = multifd_uadk_init_sess(page_count, page_size, true, errp);
if (!wd) {
return -1;
}
@ -128,24 +122,18 @@ static int multifd_uadk_send_setup(MultiFDSendParams *p, Error **errp)
* length
*/
p->iov = g_new0(struct iovec, p->page_count + 2);
p->iov = g_new0(struct iovec, page_count + 2);
return 0;
}
/**
* multifd_uadk_send_cleanup: cleanup send side
*
* Close the channel and return memory.
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static void multifd_uadk_send_cleanup(MultiFDSendParams *p, Error **errp)
{
struct wd_data *wd = p->compress_data;
multifd_uadk_uninit_sess(wd);
p->compress_data = NULL;
g_free(p->iov);
p->iov = NULL;
}
static inline void prepare_next_iov(MultiFDSendParams *p, void *base,
@ -157,37 +145,28 @@ static inline void prepare_next_iov(MultiFDSendParams *p, void *base,
p->iovs_num++;
}
/**
* multifd_uadk_send_prepare: prepare data to be able to send
*
* Create a compressed buffer with all the pages that we are going to
* send.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int multifd_uadk_send_prepare(MultiFDSendParams *p, Error **errp)
{
struct wd_data *uadk_data = p->compress_data;
uint32_t hdr_size;
uint32_t page_size = multifd_ram_page_size();
uint8_t *buf = uadk_data->buf;
int ret = 0;
MultiFDPages_t *pages = &p->data->u.ram;
if (!multifd_send_prepare_common(p)) {
goto out;
}
hdr_size = p->pages->normal_num * sizeof(uint32_t);
hdr_size = pages->normal_num * sizeof(uint32_t);
/* prepare the header that stores the lengths of all compressed data */
prepare_next_iov(p, uadk_data->buf_hdr, hdr_size);
for (int i = 0; i < p->pages->normal_num; i++) {
for (int i = 0; i < pages->normal_num; i++) {
struct wd_comp_req creq = {
.op_type = WD_DIR_COMPRESS,
.src = p->pages->block->host + p->pages->offset[i],
.src_len = p->page_size,
.src = pages->block->host + pages->offset[i],
.src_len = page_size,
.dst = buf,
/* Set dst_len to double the src in case compressed out >= page_size */
.dst_len = p->page_size * 2,
@ -200,7 +179,7 @@ static int multifd_uadk_send_prepare(MultiFDSendParams *p, Error **errp)
p->id, ret, creq.status);
return -1;
}
if (creq.dst_len < p->page_size) {
if (creq.dst_len < page_size) {
uadk_data->buf_hdr[i] = cpu_to_be32(creq.dst_len);
prepare_next_iov(p, buf, creq.dst_len);
buf += creq.dst_len;
@ -212,11 +191,11 @@ static int multifd_uadk_send_prepare(MultiFDSendParams *p, Error **errp)
* than page_size as well because at the receive end we can skip the
* decompression. But it is tricky to find the right number here.
*/
if (!uadk_data->handle || creq.dst_len >= p->page_size) {
uadk_data->buf_hdr[i] = cpu_to_be32(p->page_size);
prepare_next_iov(p, p->pages->block->host + p->pages->offset[i],
p->page_size);
buf += p->page_size;
if (!uadk_data->handle || creq.dst_len >= page_size) {
uadk_data->buf_hdr[i] = cpu_to_be32(page_size);
prepare_next_iov(p, pages->block->host + pages->offset[i],
page_size);
buf += page_size;
}
}
out:
@ -225,21 +204,13 @@ out:
return 0;
}
/**
* multifd_uadk_recv_setup: setup receive side
*
* Create the compressed channel and buffer.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int multifd_uadk_recv_setup(MultiFDRecvParams *p, Error **errp)
{
struct wd_data *wd;
uint32_t page_size = multifd_ram_page_size();
uint32_t page_count = multifd_ram_page_count();
wd = multifd_uadk_init_sess(p->page_count, p->page_size, false, errp);
wd = multifd_uadk_init_sess(page_count, page_size, false, errp);
if (!wd) {
return -1;
}
@ -247,13 +218,6 @@ static int multifd_uadk_recv_setup(MultiFDRecvParams *p, Error **errp)
return 0;
}
/**
* multifd_uadk_recv_cleanup: cleanup receive side
*
* Close the channel and return memory.
*
* @p: Params for the channel that we are using
*/
static void multifd_uadk_recv_cleanup(MultiFDRecvParams *p)
{
struct wd_data *wd = p->compress_data;
@ -262,17 +226,6 @@ static void multifd_uadk_recv_cleanup(MultiFDRecvParams *p)
p->compress_data = NULL;
}
/**
* multifd_uadk_recv: read the data from the channel into actual pages
*
* Read the compressed buffer, and uncompress it into the actual
* pages.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int multifd_uadk_recv(MultiFDRecvParams *p, Error **errp)
{
struct wd_data *uadk_data = p->compress_data;
@ -280,6 +233,7 @@ static int multifd_uadk_recv(MultiFDRecvParams *p, Error **errp)
uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
uint32_t data_len = 0;
uint32_t page_size = multifd_ram_page_size();
uint8_t *buf = uadk_data->buf;
int ret = 0;
@ -306,7 +260,7 @@ static int multifd_uadk_recv(MultiFDRecvParams *p, Error **errp)
for (int i = 0; i < p->normal_num; i++) {
uadk_data->buf_hdr[i] = be32_to_cpu(uadk_data->buf_hdr[i]);
data_len += uadk_data->buf_hdr[i];
assert(uadk_data->buf_hdr[i] <= p->page_size);
assert(uadk_data->buf_hdr[i] <= page_size);
}
/* read compressed data */
@ -322,12 +276,12 @@ static int multifd_uadk_recv(MultiFDRecvParams *p, Error **errp)
.src = buf,
.src_len = uadk_data->buf_hdr[i],
.dst = p->host + p->normal[i],
.dst_len = p->page_size,
.dst_len = page_size,
};
if (uadk_data->buf_hdr[i] == p->page_size) {
memcpy(p->host + p->normal[i], buf, p->page_size);
buf += p->page_size;
if (uadk_data->buf_hdr[i] == page_size) {
memcpy(p->host + p->normal[i], buf, page_size);
buf += page_size;
continue;
}
@ -343,7 +297,7 @@ static int multifd_uadk_recv(MultiFDRecvParams *p, Error **errp)
p->id, ret, creq.status);
return -1;
}
if (creq.dst_len != p->page_size) {
if (creq.dst_len != page_size) {
error_setg(errp, "multifd %u: decompressed length error", p->id);
return -1;
}
@ -353,7 +307,7 @@ static int multifd_uadk_recv(MultiFDRecvParams *p, Error **errp)
return 0;
}
static MultiFDMethods multifd_uadk_ops = {
static const MultiFDMethods multifd_uadk_ops = {
.send_setup = multifd_uadk_send_setup,
.send_cleanup = multifd_uadk_send_cleanup,
.send_prepare = multifd_uadk_send_prepare,

View File

@ -14,6 +14,7 @@
#include "qemu/cutils.h"
#include "exec/ramblock.h"
#include "migration.h"
#include "migration-stats.h"
#include "multifd.h"
#include "options.h"
#include "ram.h"
@ -46,14 +47,14 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
*/
void multifd_send_zero_page_detect(MultiFDSendParams *p)
{
MultiFDPages_t *pages = p->pages;
MultiFDPages_t *pages = &p->data->u.ram;
RAMBlock *rb = pages->block;
int i = 0;
int j = pages->num - 1;
if (!multifd_zero_page_enabled()) {
pages->normal_num = pages->num;
return;
goto out;
}
/*
@ -63,7 +64,7 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
while (i <= j) {
uint64_t offset = pages->offset[i];
if (!buffer_is_zero(rb->host + offset, p->page_size)) {
if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
i++;
continue;
}
@ -74,6 +75,10 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
}
pages->normal_num = i;
out:
stat64_add(&mig_stats.normal_pages, pages->normal_num);
stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
}
void multifd_recv_zero_page_process(MultiFDRecvParams *p)
@ -81,7 +86,7 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
for (int i = 0; i < p->zero_num; i++) {
void *page = p->host + p->zero[i];
if (ramblock_recv_bitmap_test_byte_offset(p->block, p->zero[i])) {
memset(page, 0, p->page_size);
memset(page, 0, multifd_ram_page_size());
} else {
ramblock_recv_bitmap_set_offset(p->block, p->zero[i]);
}

View File

@ -34,17 +34,7 @@ struct zlib_data {
/* Multifd zlib compression */
/**
* zlib_send_setup: setup send side
*
* Setup each channel with zlib compression.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
static int multifd_zlib_send_setup(MultiFDSendParams *p, Error **errp)
{
struct zlib_data *z = g_new0(struct zlib_data, 1);
z_stream *zs = &z->zs;
@ -86,15 +76,7 @@ err_free_z:
return -1;
}
/**
* zlib_send_cleanup: cleanup send side
*
* Close the channel and return memory.
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
static void multifd_zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
{
struct zlib_data *z = p->compress_data;
@ -110,23 +92,13 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
p->iov = NULL;
}
/**
* zlib_send_prepare: prepare date to be able to send
*
* Create a compressed buffer with all the pages that we are going to
* send.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
static int multifd_zlib_send_prepare(MultiFDSendParams *p, Error **errp)
{
MultiFDPages_t *pages = p->pages;
MultiFDPages_t *pages = &p->data->u.ram;
struct zlib_data *z = p->compress_data;
z_stream *zs = &z->zs;
uint32_t out_size = 0;
uint32_t page_size = multifd_ram_page_size();
int ret;
uint32_t i;
@ -147,8 +119,8 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
* with compression. zlib does not guarantee that this is safe,
* therefore copy the page before calling deflate().
*/
memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
zs->avail_in = p->page_size;
memcpy(z->buf, pages->block->host + pages->offset[i], page_size);
zs->avail_in = page_size;
zs->next_in = z->buf;
zs->avail_out = available;
@ -188,17 +160,7 @@ out:
return 0;
}
/**
* zlib_recv_setup: setup receive side
*
* Create the compressed channel and buffer.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
static int multifd_zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
{
struct zlib_data *z = g_new0(struct zlib_data, 1);
z_stream *zs = &z->zs;
@ -224,14 +186,7 @@ static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
return 0;
}
/**
* zlib_recv_cleanup: setup receive side
*
* For no compression this function does nothing.
*
* @p: Params for the channel that we are using
*/
static void zlib_recv_cleanup(MultiFDRecvParams *p)
static void multifd_zlib_recv_cleanup(MultiFDRecvParams *p)
{
struct zlib_data *z = p->compress_data;
@ -242,25 +197,15 @@ static void zlib_recv_cleanup(MultiFDRecvParams *p)
p->compress_data = NULL;
}
/**
* zlib_recv: read the data from the channel into actual pages
*
* Read the compressed buffer, and uncompress it into the actual
* pages.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zlib_recv(MultiFDRecvParams *p, Error **errp)
static int multifd_zlib_recv(MultiFDRecvParams *p, Error **errp)
{
struct zlib_data *z = p->compress_data;
z_stream *zs = &z->zs;
uint32_t in_size = p->next_packet_size;
/* we measure the change of total_out */
uint32_t out_size = zs->total_out;
uint32_t expected_size = p->normal_num * p->page_size;
uint32_t page_size = multifd_ram_page_size();
uint32_t expected_size = p->normal_num * page_size;
uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
int ret;
int i;
@ -296,7 +241,7 @@ static int zlib_recv(MultiFDRecvParams *p, Error **errp)
flush = Z_SYNC_FLUSH;
}
zs->avail_out = p->page_size;
zs->avail_out = page_size;
zs->next_out = p->host + p->normal[i];
/*
@ -310,8 +255,8 @@ static int zlib_recv(MultiFDRecvParams *p, Error **errp)
do {
ret = inflate(zs, flush);
} while (ret == Z_OK && zs->avail_in
&& (zs->total_out - start) < p->page_size);
if (ret == Z_OK && (zs->total_out - start) < p->page_size) {
&& (zs->total_out - start) < page_size);
if (ret == Z_OK && (zs->total_out - start) < page_size) {
error_setg(errp, "multifd %u: inflate generated too few output",
p->id);
return -1;
@ -332,13 +277,13 @@ static int zlib_recv(MultiFDRecvParams *p, Error **errp)
return 0;
}
static MultiFDMethods multifd_zlib_ops = {
.send_setup = zlib_send_setup,
.send_cleanup = zlib_send_cleanup,
.send_prepare = zlib_send_prepare,
.recv_setup = zlib_recv_setup,
.recv_cleanup = zlib_recv_cleanup,
.recv = zlib_recv
static const MultiFDMethods multifd_zlib_ops = {
.send_setup = multifd_zlib_send_setup,
.send_cleanup = multifd_zlib_send_cleanup,
.send_prepare = multifd_zlib_send_prepare,
.recv_setup = multifd_zlib_recv_setup,
.recv_cleanup = multifd_zlib_recv_cleanup,
.recv = multifd_zlib_recv
};
static void multifd_zlib_register(void)

View File

@ -37,17 +37,7 @@ struct zstd_data {
/* Multifd zstd compression */
/**
* zstd_send_setup: setup send side
*
* Setup each channel with zstd compression.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
static int multifd_zstd_send_setup(MultiFDSendParams *p, Error **errp)
{
struct zstd_data *z = g_new0(struct zstd_data, 1);
int res;
@ -83,15 +73,7 @@ static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
return 0;
}
/**
* zstd_send_cleanup: cleanup send side
*
* Close the channel and return memory.
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
static void multifd_zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
{
struct zstd_data *z = p->compress_data;
@ -106,20 +88,9 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
p->iov = NULL;
}
/**
* zstd_send_prepare: prepare date to be able to send
*
* Create a compressed buffer with all the pages that we are going to
* send.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
static int multifd_zstd_send_prepare(MultiFDSendParams *p, Error **errp)
{
MultiFDPages_t *pages = p->pages;
MultiFDPages_t *pages = &p->data->u.ram;
struct zstd_data *z = p->compress_data;
int ret;
uint32_t i;
@ -138,8 +109,8 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
if (i == pages->normal_num - 1) {
flush = ZSTD_e_flush;
}
z->in.src = p->pages->block->host + pages->offset[i];
z->in.size = p->page_size;
z->in.src = pages->block->host + pages->offset[i];
z->in.size = multifd_ram_page_size();
z->in.pos = 0;
/*
@ -176,17 +147,7 @@ out:
return 0;
}
/**
* zstd_recv_setup: setup receive side
*
* Create the compressed channel and buffer.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
static int multifd_zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
{
struct zstd_data *z = g_new0(struct zstd_data, 1);
int ret;
@ -220,14 +181,7 @@ static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
return 0;
}
/**
* zstd_recv_cleanup: setup receive side
*
* For no compression this function does nothing.
*
* @p: Params for the channel that we are using
*/
static void zstd_recv_cleanup(MultiFDRecvParams *p)
static void multifd_zstd_recv_cleanup(MultiFDRecvParams *p)
{
struct zstd_data *z = p->compress_data;
@ -239,22 +193,12 @@ static void zstd_recv_cleanup(MultiFDRecvParams *p)
p->compress_data = NULL;
}
/**
* zstd_recv: read the data from the channel into actual pages
*
* Read the compressed buffer, and uncompress it into the actual
* pages.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int zstd_recv(MultiFDRecvParams *p, Error **errp)
static int multifd_zstd_recv(MultiFDRecvParams *p, Error **errp)
{
uint32_t in_size = p->next_packet_size;
uint32_t out_size = 0;
uint32_t expected_size = p->normal_num * p->page_size;
uint32_t page_size = multifd_ram_page_size();
uint32_t expected_size = p->normal_num * page_size;
uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
struct zstd_data *z = p->compress_data;
int ret;
@ -286,7 +230,7 @@ static int zstd_recv(MultiFDRecvParams *p, Error **errp)
for (i = 0; i < p->normal_num; i++) {
ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
z->out.dst = p->host + p->normal[i];
z->out.size = p->page_size;
z->out.size = page_size;
z->out.pos = 0;
/*
@ -300,8 +244,8 @@ static int zstd_recv(MultiFDRecvParams *p, Error **errp)
do {
ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
} while (ret > 0 && (z->in.size - z->in.pos > 0)
&& (z->out.pos < p->page_size));
if (ret > 0 && (z->out.pos < p->page_size)) {
&& (z->out.pos < page_size));
if (ret > 0 && (z->out.pos < page_size)) {
error_setg(errp, "multifd %u: decompressStream buffer too small",
p->id);
return -1;
@ -321,13 +265,13 @@ static int zstd_recv(MultiFDRecvParams *p, Error **errp)
return 0;
}
static MultiFDMethods multifd_zstd_ops = {
.send_setup = zstd_send_setup,
.send_cleanup = zstd_send_cleanup,
.send_prepare = zstd_send_prepare,
.recv_setup = zstd_recv_setup,
.recv_cleanup = zstd_recv_cleanup,
.recv = zstd_recv
static const MultiFDMethods multifd_zstd_ops = {
.send_setup = multifd_zstd_send_setup,
.send_cleanup = multifd_zstd_send_cleanup,
.send_prepare = multifd_zstd_send_prepare,
.recv_setup = multifd_zstd_recv_setup,
.recv_cleanup = multifd_zstd_recv_cleanup,
.recv = multifd_zstd_recv
};
static void multifd_zstd_register(void)

View File

@ -49,8 +49,6 @@ typedef struct {
struct {
MultiFDSendParams *params;
/* array of pages to sent */
MultiFDPages_t *pages;
/*
* Global number of generated multifd packets.
*
@ -78,7 +76,7 @@ struct {
*/
int exiting;
/* multifd ops */
MultiFDMethods *ops;
const MultiFDMethods *ops;
} *multifd_send_state;
struct {
@ -95,9 +93,31 @@ struct {
uint64_t packet_num;
int exiting;
/* multifd ops */
MultiFDMethods *ops;
const MultiFDMethods *ops;
} *multifd_recv_state;
MultiFDSendData *multifd_send_data_alloc(void)
{
size_t max_payload_size, size_minus_payload;
/*
* MultiFDPages_t has a flexible array at the end, account for it
* when allocating MultiFDSendData. Use max() in case other types
* added to the union in the future are larger than
* (MultiFDPages_t + flex array).
*/
max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
/*
* Account for any holes the compiler might insert. We can't pack
* the structure because that misaligns the members and triggers
* Waddress-of-packed-member.
*/
size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
return g_malloc0(size_minus_payload + max_payload_size);
}
static bool multifd_use_packets(void)
{
return !migrate_mapped_ram();
@ -108,223 +128,15 @@ void multifd_send_channel_created(void)
qemu_sem_post(&multifd_send_state->channels_created);
}
static void multifd_set_file_bitmap(MultiFDSendParams *p)
static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
void multifd_register_ops(int method, const MultiFDMethods *ops)
{
MultiFDPages_t *pages = p->pages;
assert(pages->block);
for (int i = 0; i < p->pages->normal_num; i++) {
ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
}
for (int i = p->pages->normal_num; i < p->pages->num; i++) {
ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
}
}
/* Multifd without compression */
/**
* nocomp_send_setup: setup send side
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
{
if (migrate_zero_copy_send()) {
p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
}
if (multifd_use_packets()) {
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, p->page_count + 1);
} else {
p->iov = g_new0(struct iovec, p->page_count);
}
return 0;
}
/**
* nocomp_send_cleanup: cleanup send side
*
* For no compression this function does nothing.
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
{
g_free(p->iov);
p->iov = NULL;
return;
}
static void multifd_send_prepare_iovs(MultiFDSendParams *p)
{
MultiFDPages_t *pages = p->pages;
for (int i = 0; i < pages->normal_num; i++) {
p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
p->iov[p->iovs_num].iov_len = p->page_size;
p->iovs_num++;
}
p->next_packet_size = pages->normal_num * p->page_size;
}
/**
* nocomp_send_prepare: prepare date to be able to send
*
* For no compression we just have to calculate the size of the
* packet.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
bool use_zero_copy_send = migrate_zero_copy_send();
int ret;
multifd_send_zero_page_detect(p);
if (!multifd_use_packets()) {
multifd_send_prepare_iovs(p);
multifd_set_file_bitmap(p);
return 0;
}
if (!use_zero_copy_send) {
/*
* Only !zerocopy needs the header in IOV; zerocopy will
* send it separately.
*/
multifd_send_prepare_header(p);
}
multifd_send_prepare_iovs(p);
p->flags |= MULTIFD_FLAG_NOCOMP;
multifd_send_fill_packet(p);
if (use_zero_copy_send) {
/* Send header first, without zerocopy */
ret = qio_channel_write_all(p->c, (void *)p->packet,
p->packet_len, errp);
if (ret != 0) {
return -1;
}
}
return 0;
}
/**
* nocomp_recv_setup: setup receive side
*
* For no compression this function does nothing.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
{
p->iov = g_new0(struct iovec, p->page_count);
return 0;
}
/**
* nocomp_recv_cleanup: setup receive side
*
* For no compression this function does nothing.
*
* @p: Params for the channel that we are using
*/
static void nocomp_recv_cleanup(MultiFDRecvParams *p)
{
g_free(p->iov);
p->iov = NULL;
}
/**
* nocomp_recv: read the data from the channel
*
* For no compression we just need to read things into the correct place.
*
* Returns 0 for success or -1 for error
*
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
{
uint32_t flags;
if (!multifd_use_packets()) {
return multifd_file_recv_data(p, errp);
}
flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
if (flags != MULTIFD_FLAG_NOCOMP) {
error_setg(errp, "multifd %u: flags received %x flags expected %x",
p->id, flags, MULTIFD_FLAG_NOCOMP);
return -1;
}
multifd_recv_zero_page_process(p);
if (!p->normal_num) {
return 0;
}
for (int i = 0; i < p->normal_num; i++) {
p->iov[i].iov_base = p->host + p->normal[i];
p->iov[i].iov_len = p->page_size;
ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
}
return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
}
static MultiFDMethods multifd_nocomp_ops = {
.send_setup = nocomp_send_setup,
.send_cleanup = nocomp_send_cleanup,
.send_prepare = nocomp_send_prepare,
.recv_setup = nocomp_recv_setup,
.recv_cleanup = nocomp_recv_cleanup,
.recv = nocomp_recv
};
static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
[MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
};
void multifd_register_ops(int method, MultiFDMethods *ops)
{
assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
assert(!multifd_ops[method]);
multifd_ops[method] = ops;
}
/* Reset a MultiFDPages_t* object for the next use */
static void multifd_pages_reset(MultiFDPages_t *pages)
{
/*
* We don't need to touch offset[] array, because it will be
* overwritten later when reused.
*/
pages->num = 0;
pages->normal_num = 0;
pages->block = NULL;
}
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg = {};
@ -389,160 +201,65 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
return msg.id;
}
static MultiFDPages_t *multifd_pages_init(uint32_t n)
{
MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
pages->allocated = n;
pages->offset = g_new0(ram_addr_t, n);
return pages;
}
static void multifd_pages_clear(MultiFDPages_t *pages)
{
multifd_pages_reset(pages);
pages->allocated = 0;
g_free(pages->offset);
pages->offset = NULL;
g_free(pages);
}
void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
MultiFDPages_t *pages = p->pages;
uint64_t packet_num;
uint32_t zero_num = pages->num - pages->normal_num;
int i;
bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
memset(packet, 0, p->packet_len);
packet->magic = cpu_to_be32(MULTIFD_MAGIC);
packet->version = cpu_to_be32(MULTIFD_VERSION);
packet->flags = cpu_to_be32(p->flags);
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
packet->normal_pages = cpu_to_be32(pages->normal_num);
packet->zero_pages = cpu_to_be32(zero_num);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
packet->packet_num = cpu_to_be64(packet_num);
if (pages->block) {
strncpy(packet->ramblock, pages->block->idstr, 256);
}
for (i = 0; i < pages->num; i++) {
/* there are architectures where ram_addr_t is 32 bit */
uint64_t temp = pages->offset[i];
packet->offset[i] = cpu_to_be64(temp);
}
p->packets_sent++;
p->total_normal_pages += pages->normal_num;
p->total_zero_pages += zero_num;
trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
p->flags, p->next_packet_size);
if (!sync_packet) {
multifd_ram_fill_packet(p);
}
trace_multifd_send_fill(p->id, packet_num,
p->flags, p->next_packet_size);
}
static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
{
MultiFDPacket_t *packet = p->packet;
int i;
const MultiFDPacket_t *packet = p->packet;
uint32_t magic = be32_to_cpu(packet->magic);
uint32_t version = be32_to_cpu(packet->version);
int ret = 0;
packet->magic = be32_to_cpu(packet->magic);
if (packet->magic != MULTIFD_MAGIC) {
error_setg(errp, "multifd: received packet "
"magic %x and expected magic %x",
packet->magic, MULTIFD_MAGIC);
if (magic != MULTIFD_MAGIC) {
error_setg(errp, "multifd: received packet magic %x, expected %x",
magic, MULTIFD_MAGIC);
return -1;
}
packet->version = be32_to_cpu(packet->version);
if (packet->version != MULTIFD_VERSION) {
error_setg(errp, "multifd: received packet "
"version %u and expected version %u",
packet->version, MULTIFD_VERSION);
if (version != MULTIFD_VERSION) {
error_setg(errp, "multifd: received packet version %u, expected %u",
version, MULTIFD_VERSION);
return -1;
}
p->flags = be32_to_cpu(packet->flags);
packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
/*
* If we received a packet that is 100 times bigger than expected
* just stop migration. It is a magic number.
*/
if (packet->pages_alloc > p->page_count) {
error_setg(errp, "multifd: received packet "
"with size %u and expected a size of %u",
packet->pages_alloc, p->page_count) ;
return -1;
}
p->normal_num = be32_to_cpu(packet->normal_pages);
if (p->normal_num > packet->pages_alloc) {
error_setg(errp, "multifd: received packet "
"with %u normal pages and expected maximum pages are %u",
p->normal_num, packet->pages_alloc) ;
return -1;
}
p->zero_num = be32_to_cpu(packet->zero_pages);
if (p->zero_num > packet->pages_alloc - p->normal_num) {
error_setg(errp, "multifd: received packet "
"with %u zero pages and expected maximum zero pages are %u",
p->zero_num, packet->pages_alloc - p->normal_num) ;
return -1;
}
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
p->packets_recved++;
p->total_normal_pages += p->normal_num;
p->total_zero_pages += p->zero_num;
trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
p->flags, p->next_packet_size);
if (p->normal_num == 0 && p->zero_num == 0) {
return 0;
if (!(p->flags & MULTIFD_FLAG_SYNC)) {
ret = multifd_ram_unfill_packet(p, errp);
}
/* make sure that ramblock is 0 terminated */
packet->ramblock[255] = 0;
p->block = qemu_ram_block_by_name(packet->ramblock);
if (!p->block) {
error_setg(errp, "multifd: unknown ram block %s",
packet->ramblock);
return -1;
}
trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
p->next_packet_size);
p->host = p->block->host;
for (i = 0; i < p->normal_num; i++) {
uint64_t offset = be64_to_cpu(packet->offset[i]);
if (offset > (p->block->used_length - p->page_size)) {
error_setg(errp, "multifd: offset too long %" PRIu64
" (max " RAM_ADDR_FMT ")",
offset, p->block->used_length);
return -1;
}
p->normal[i] = offset;
}
for (i = 0; i < p->zero_num; i++) {
uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
if (offset > (p->block->used_length - p->page_size)) {
error_setg(errp, "multifd: offset too long %" PRIu64
" (max " RAM_ADDR_FMT ")",
offset, p->block->used_length);
return -1;
}
p->zero[i] = offset;
}
return 0;
return ret;
}
static bool multifd_send_should_exit(void)
@ -568,30 +285,25 @@ static void multifd_send_kick_main(MultiFDSendParams *p)
}
/*
* How we use multifd_send_state->pages and channel->pages?
* multifd_send() works by exchanging the MultiFDSendData object
* provided by the caller with an unused MultiFDSendData object from
* the next channel that is found to be idle.
*
* We create a pages for each channel, and a main one. Each time that
* we need to send a batch of pages we interchange the ones between
* multifd_send_state and the channel that is sending it. There are
* two reasons for that:
* - to not have to do so many mallocs during migration
* - to make easier to know what to free at the end of migration
* The channel owns the data until it finishes transmitting and the
* caller owns the empty object until it fills it with data and calls
* this function again. No locking necessary.
*
* This way we always know who is the owner of each "pages" struct,
* and we don't need any locking. It belongs to the migration thread
* or to the channel thread. Switching is safe because the migration
* thread is using the channel mutex when changing it, and the channel
* have to had finish with its own, otherwise pending_job can't be
* false.
* Switching is safe because both the migration thread and the channel
* thread have barriers in place to serialize access.
*
* Returns true if succeed, false otherwise.
*/
static bool multifd_send_pages(void)
bool multifd_send(MultiFDSendData **send_data)
{
int i;
static int next_channel;
MultiFDSendParams *p = NULL; /* make happy gcc */
MultiFDPages_t *pages = multifd_send_state->pages;
MultiFDSendData *tmp;
if (multifd_send_should_exit()) {
return false;
@ -626,11 +338,19 @@ static bool multifd_send_pages(void)
* qatomic_store_release() in multifd_send_thread().
*/
smp_mb_acquire();
assert(!p->pages->num);
multifd_send_state->pages = p->pages;
p->pages = pages;
assert(multifd_payload_empty(p->data));
/*
* Making sure p->pages is setup before marking pending_job=true. Pairs
* Swap the pointers. The channel gets the client data for
* transferring and the client gets back an unused data slot.
*/
tmp = *send_data;
*send_data = p->data;
p->data = tmp;
/*
* Making sure p->data is setup before marking pending_job=true. Pairs
* with the qatomic_load_acquire() in multifd_send_thread().
*/
qatomic_store_release(&p->pending_job, true);
@ -639,56 +359,6 @@ static bool multifd_send_pages(void)
return true;
}
static inline bool multifd_queue_empty(MultiFDPages_t *pages)
{
return pages->num == 0;
}
static inline bool multifd_queue_full(MultiFDPages_t *pages)
{
return pages->num == pages->allocated;
}
static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
{
pages->offset[pages->num++] = offset;
}
/* Returns true if enqueue successful, false otherwise */
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
{
MultiFDPages_t *pages;
retry:
pages = multifd_send_state->pages;
/* If the queue is empty, we can already enqueue now */
if (multifd_queue_empty(pages)) {
pages->block = block;
multifd_enqueue(pages, offset);
return true;
}
/*
* Not empty, meanwhile we need a flush. It can because of either:
*
* (1) The page is not on the same ramblock of previous ones, or,
* (2) The queue is full.
*
* After flush, always retry.
*/
if (pages->block != block || multifd_queue_full(pages)) {
if (!multifd_send_pages()) {
return false;
}
goto retry;
}
/* Not empty, and we still have space, do it! */
multifd_enqueue(pages, offset);
return true;
}
/* Multifd send side hit an error; remember it and prepare to quit */
static void multifd_send_set_error(Error *err)
{
@ -790,12 +460,13 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
multifd_pages_clear(p->pages);
p->pages = NULL;
g_free(p->data);
p->data = NULL;
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
multifd_send_state->ops->send_cleanup(p, errp);
assert(!p->iov);
return *errp == NULL;
}
@ -808,8 +479,6 @@ static void multifd_send_cleanup_state(void)
qemu_sem_destroy(&multifd_send_state->channels_ready);
g_free(multifd_send_state->params);
multifd_send_state->params = NULL;
multifd_pages_clear(multifd_send_state->pages);
multifd_send_state->pages = NULL;
g_free(multifd_send_state);
multifd_send_state = NULL;
}
@ -859,16 +528,6 @@ int multifd_send_sync_main(void)
int i;
bool flush_zero_copy;
if (!migrate_multifd()) {
return 0;
}
if (multifd_send_state->pages->num) {
if (!multifd_send_pages()) {
error_report("%s: multifd_send_pages fail", __func__);
return -1;
}
}
flush_zero_copy = migrate_zero_copy_send();
for (i = 0; i < migrate_multifd_channels(); i++) {
@ -937,14 +596,12 @@ static void *multifd_send_thread(void *opaque)
}
/*
* Read pending_job flag before p->pages. Pairs with the
* qatomic_store_release() in multifd_send_pages().
* Read pending_job flag before p->data. Pairs with the
* qatomic_store_release() in multifd_send().
*/
if (qatomic_load_acquire(&p->pending_job)) {
MultiFDPages_t *pages = p->pages;
p->iovs_num = 0;
assert(pages->num);
assert(!multifd_payload_empty(p->data));
ret = multifd_send_state->ops->send_prepare(p, &local_err);
if (ret != 0) {
@ -953,7 +610,7 @@ static void *multifd_send_thread(void *opaque)
if (migrate_mapped_ram()) {
ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
p->pages->block, &local_err);
&p->data->u.ram, &local_err);
} else {
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
NULL, 0, p->write_flags,
@ -966,16 +623,14 @@ static void *multifd_send_thread(void *opaque)
stat64_add(&mig_stats.multifd_bytes,
p->next_packet_size + p->packet_len);
stat64_add(&mig_stats.normal_pages, pages->normal_num);
stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
multifd_pages_reset(p->pages);
p->next_packet_size = 0;
multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
/*
* Making sure p->pages is published before saying "we're
* Making sure p->data is published before saying "we're
* free". Pairs with the smp_mb_acquire() in
* multifd_send_pages().
* multifd_send().
*/
qatomic_store_release(&p->pending_job, false);
} else {
@ -1015,8 +670,7 @@ out:
rcu_unregister_thread();
migration_threads_remove(thread);
trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
p->total_zero_pages);
trace_multifd_send_thread_end(p->id, p->packets_sent);
return NULL;
}
@ -1157,7 +811,7 @@ bool multifd_send_setup(void)
{
MigrationState *s = migrate_get_current();
int thread_count, ret = 0;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
uint32_t page_count = multifd_ram_page_count();
bool use_packets = multifd_use_packets();
uint8_t i;
@ -1168,7 +822,6 @@ bool multifd_send_setup(void)
thread_count = migrate_multifd_channels();
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
multifd_send_state->pages = multifd_pages_init(page_count);
qemu_sem_init(&multifd_send_state->channels_created, 0);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
qatomic_set(&multifd_send_state->exiting, 0);
@ -1181,18 +834,14 @@ bool multifd_send_setup(void)
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
p->pages = multifd_pages_init(page_count);
p->data = multifd_send_data_alloc();
if (use_packets) {
p->packet_len = sizeof(MultiFDPacket_t)
+ sizeof(uint64_t) * page_count;
p->packet = g_malloc0(p->packet_len);
p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
}
p->name = g_strdup_printf("mig/src/send_%d", i);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
p->write_flags = 0;
if (!multifd_new_send_channel_create(p, &local_err)) {
@ -1223,6 +872,7 @@ bool multifd_send_setup(void)
migrate_set_error(s, local_err);
goto err;
}
assert(p->iov);
}
return true;
@ -1501,7 +1151,9 @@ static void *multifd_recv_thread(void *opaque)
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
has_data = p->normal_num || p->zero_num;
if (!(flags & MULTIFD_FLAG_SYNC)) {
has_data = p->normal_num || p->zero_num;
}
qemu_mutex_unlock(&p->mutex);
} else {
/*
@ -1542,7 +1194,6 @@ static void *multifd_recv_thread(void *opaque)
qemu_sem_wait(&p->sem_sync);
}
} else {
p->total_normal_pages += p->data->size / qemu_target_page_size();
p->data->size = 0;
/*
* Order data->size update before clearing
@ -1559,9 +1210,7 @@ static void *multifd_recv_thread(void *opaque)
}
rcu_unregister_thread();
trace_multifd_recv_thread_end(p->id, p->packets_recved,
p->total_normal_pages,
p->total_zero_pages);
trace_multifd_recv_thread_end(p->id, p->packets_recved);
return NULL;
}
@ -1569,7 +1218,7 @@ static void *multifd_recv_thread(void *opaque)
int multifd_recv_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
uint32_t page_count = multifd_ram_page_count();
bool use_packets = multifd_use_packets();
uint8_t i;
@ -1613,8 +1262,6 @@ int multifd_recv_setup(Error **errp)
p->name = g_strdup_printf("mig/dst/recv_%d", i);
p->normal = g_new0(ram_addr_t, page_count);
p->zero = g_new0(ram_addr_t, page_count);
p->page_count = page_count;
p->page_size = qemu_target_page_size();
}
for (i = 0; i < thread_count; i++) {
@ -1687,17 +1334,3 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
QEMU_THREAD_JOINABLE);
qatomic_inc(&multifd_recv_state->count);
}
bool multifd_send_prepare_common(MultiFDSendParams *p)
{
multifd_send_zero_page_detect(p);
if (!p->pages->normal_num) {
p->next_packet_size = 0;
return false;
}
multifd_send_prepare_header(p);
return true;
}

View File

@ -13,9 +13,11 @@
#ifndef QEMU_MIGRATION_MULTIFD_H
#define QEMU_MIGRATION_MULTIFD_H
#include "exec/target_page.h"
#include "ram.h"
typedef struct MultiFDRecvData MultiFDRecvData;
typedef struct MultiFDSendData MultiFDSendData;
bool multifd_send_setup(void);
void multifd_send_shutdown(void);
@ -75,11 +77,9 @@ typedef struct {
uint32_t num;
/* number of normal pages */
uint32_t normal_num;
/* number of allocated pages */
uint32_t allocated;
/* offset of each page */
ram_addr_t *offset;
RAMBlock *block;
/* offset of each page */
ram_addr_t offset[];
} MultiFDPages_t;
struct MultiFDRecvData {
@ -89,6 +89,31 @@ struct MultiFDRecvData {
off_t file_offset;
};
typedef enum {
MULTIFD_PAYLOAD_NONE,
MULTIFD_PAYLOAD_RAM,
} MultiFDPayloadType;
typedef union MultiFDPayload {
MultiFDPages_t ram;
} MultiFDPayload;
struct MultiFDSendData {
MultiFDPayloadType type;
MultiFDPayload u;
};
static inline bool multifd_payload_empty(MultiFDSendData *data)
{
return data->type == MULTIFD_PAYLOAD_NONE;
}
static inline void multifd_set_payload_type(MultiFDSendData *data,
MultiFDPayloadType type)
{
data->type = type;
}
typedef struct {
/* Fields are only written at creating/deletion time */
/* No lock required for them, they are read only */
@ -106,10 +131,6 @@ typedef struct {
QIOChannel *c;
/* packet allocated len */
uint32_t packet_len;
/* guest page size */
uint32_t page_size;
/* number of pages in a full packet */
uint32_t page_count;
/* multifd flags for sending ram */
int write_flags;
@ -131,12 +152,7 @@ typedef struct {
*/
bool pending_job;
bool pending_sync;
/* array of pages to sent.
* The owner of 'pages' depends of 'pending_job' value:
* pending_job == 0 -> migration_thread can use it.
* pending_job != 0 -> multifd_channel can use it.
*/
MultiFDPages_t *pages;
MultiFDSendData *data;
/* thread local variables. No locking required */
@ -146,10 +162,6 @@ typedef struct {
uint32_t next_packet_size;
/* packets sent through this channel */
uint64_t packets_sent;
/* non zero pages sent through this channel */
uint64_t total_normal_pages;
/* zero pages sent through this channel */
uint64_t total_zero_pages;
/* buffers to send */
struct iovec *iov;
/* number of iovs used */
@ -173,10 +185,6 @@ typedef struct {
QIOChannel *c;
/* packet allocated len */
uint32_t packet_len;
/* guest page size */
uint32_t page_size;
/* number of pages in a full packet */
uint32_t page_count;
/* syncs main thread and channels */
QemuSemaphore sem_sync;
@ -206,10 +214,6 @@ typedef struct {
RAMBlock *block;
/* ramblock host address */
uint8_t *host;
/* non zero pages recv through this channel */
uint64_t total_normal_pages;
/* zero pages recv through this channel */
uint64_t total_zero_pages;
/* buffers to recv */
struct iovec *iov;
/* Pages that are not zero */
@ -225,21 +229,85 @@ typedef struct {
} MultiFDRecvParams;
typedef struct {
/* Setup for sending side */
/*
* The send_setup, send_cleanup, send_prepare are only called on
* the QEMU instance at the migration source.
*/
/*
* Setup for sending side. Called once per channel during channel
* setup phase.
*
* Must allocate p->iov. If packets are in use (default), one
* extra iovec must be allocated for the packet header. Any memory
* allocated in this hook must be released at send_cleanup.
*
* p->write_flags may be used for passing flags to the QIOChannel.
*
* p->compression_data may be used by compression methods to store
* compression data.
*/
int (*send_setup)(MultiFDSendParams *p, Error **errp);
/* Cleanup for sending side */
/*
* Cleanup for sending side. Called once per channel during
* channel cleanup phase.
*/
void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
/* Prepare the send packet */
/*
* Prepare the send packet. Called as a result of multifd_send()
* on the client side, with p pointing to the MultiFDSendParams of
* a channel that is currently idle.
*
* Must populate p->iov with the data to be sent, increment
* p->iovs_num to match the amount of iovecs used and set
* p->next_packet_size with the amount of data currently present
* in p->iov.
*
* Must indicate whether this is a compression packet by setting
* p->flags.
*
* As a last step, if packets are in use (default), must prepare
* the packet by calling multifd_send_fill_packet().
*/
int (*send_prepare)(MultiFDSendParams *p, Error **errp);
/* Setup for receiving side */
/*
* The recv_setup, recv_cleanup, recv are only called on the QEMU
* instance at the migration destination.
*/
/*
* Setup for receiving side. Called once per channel during
* channel setup phase. May be empty.
*
* May allocate data structures for the receiving of data. May use
* p->iov. Compression methods may use p->compress_data.
*/
int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
/* Cleanup for receiving side */
/*
* Cleanup for receiving side. Called once per channel during
* channel cleanup phase. May be empty.
*/
void (*recv_cleanup)(MultiFDRecvParams *p);
/* Read all data */
/*
* Data receive method. Called as a result of multifd_recv() on
* the client side, with p pointing to the MultiFDRecvParams of a
* channel that is currently idle. Only called if there is data
* available to receive.
*
* Must validate p->flags according to what was set at
* send_prepare.
*
* Must read the data from the QIOChannel p->c.
*/
int (*recv)(MultiFDRecvParams *p, Error **errp);
} MultiFDMethods;
void multifd_register_ops(int method, MultiFDMethods *ops);
void multifd_register_ops(int method, const MultiFDMethods *ops);
void multifd_send_fill_packet(MultiFDSendParams *p);
bool multifd_send_prepare_common(MultiFDSendParams *p);
void multifd_send_zero_page_detect(MultiFDSendParams *p);
@ -253,5 +321,23 @@ static inline void multifd_send_prepare_header(MultiFDSendParams *p)
}
void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc);
bool multifd_send(MultiFDSendData **send_data);
MultiFDSendData *multifd_send_data_alloc(void);
static inline uint32_t multifd_ram_page_size(void)
{
return qemu_target_page_size();
}
static inline uint32_t multifd_ram_page_count(void)
{
return MULTIFD_PACKET_SIZE / qemu_target_page_size();
}
void multifd_ram_save_setup(void);
void multifd_ram_save_cleanup(void);
int multifd_ram_flush_and_sync(void);
size_t multifd_ram_payload_size(void);
void multifd_ram_fill_packet(MultiFDSendParams *p);
int multifd_ram_unfill_packet(MultiFDRecvParams *p, Error **errp);
#endif

View File

@ -1326,7 +1326,7 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
(!migrate_multifd_flush_after_each_section() ||
migrate_mapped_ram())) {
QEMUFile *f = rs->pss[RAM_CHANNEL_PRECOPY].pss_channel;
int ret = multifd_send_sync_main();
int ret = multifd_ram_flush_and_sync();
if (ret < 0) {
return ret;
}
@ -2387,6 +2387,7 @@ static void ram_save_cleanup(void *opaque)
ram_bitmaps_destroy();
xbzrle_cleanup();
multifd_ram_save_cleanup();
ram_state_cleanup(rsp);
g_free(migration_ops);
migration_ops = NULL;
@ -3058,13 +3059,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque, Error **errp)
migration_ops = g_malloc0(sizeof(MigrationOps));
if (migrate_multifd()) {
multifd_ram_save_setup();
migration_ops->ram_save_target_page = ram_save_target_page_multifd;
} else {
migration_ops->ram_save_target_page = ram_save_target_page_legacy;
}
bql_unlock();
ret = multifd_send_sync_main();
ret = multifd_ram_flush_and_sync();
bql_lock();
if (ret < 0) {
error_setg(errp, "%s: multifd synchronization failed", __func__);
@ -3211,7 +3213,7 @@ out:
&& migration_is_setup_or_active()) {
if (migrate_multifd() && migrate_multifd_flush_after_each_section() &&
!migrate_mapped_ram()) {
ret = multifd_send_sync_main();
ret = multifd_ram_flush_and_sync();
if (ret < 0) {
return ret;
}
@ -3283,7 +3285,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
}
}
ret = multifd_send_sync_main();
ret = multifd_ram_flush_and_sync();
if (ret < 0) {
return ret;
}

View File

@ -2578,8 +2578,7 @@ static bool check_section_footer(QEMUFile *f, SaveStateEntry *se)
}
static int
qemu_loadvm_section_start_full(QEMUFile *f, MigrationIncomingState *mis,
uint8_t type)
qemu_loadvm_section_start_full(QEMUFile *f, uint8_t type)
{
bool trace_downtime = (type == QEMU_VM_SECTION_FULL);
uint32_t instance_id, version_id, section_id;
@ -2657,8 +2656,7 @@ qemu_loadvm_section_start_full(QEMUFile *f, MigrationIncomingState *mis,
}
static int
qemu_loadvm_section_part_end(QEMUFile *f, MigrationIncomingState *mis,
uint8_t type)
qemu_loadvm_section_part_end(QEMUFile *f, uint8_t type)
{
bool trace_downtime = (type == QEMU_VM_SECTION_END);
int64_t start_ts, end_ts;
@ -2893,14 +2891,14 @@ retry:
switch (section_type) {
case QEMU_VM_SECTION_START:
case QEMU_VM_SECTION_FULL:
ret = qemu_loadvm_section_start_full(f, mis, section_type);
ret = qemu_loadvm_section_start_full(f, section_type);
if (ret < 0) {
goto out;
}
break;
case QEMU_VM_SECTION_PART:
case QEMU_VM_SECTION_END:
ret = qemu_loadvm_section_part_end(f, mis, section_type);
ret = qemu_loadvm_section_part_end(f, section_type);
if (ret < 0) {
goto out;
}

View File

@ -128,21 +128,22 @@ postcopy_preempt_reset_channel(void) ""
# multifd.c
multifd_new_send_channel_async(uint8_t id) "channel %u"
multifd_new_send_channel_async_error(uint8_t id, void *err) "channel=%u err=%p"
multifd_recv(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t zero, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u"
multifd_recv_unfill(uint8_t id, uint64_t packet_num, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " flags 0x%x next packet size %u"
multifd_recv_new_channel(uint8_t id) "channel %u"
multifd_recv_sync_main(long packet_num) "packet num %ld"
multifd_recv_sync_main_signal(uint8_t id) "channel %u"
multifd_recv_sync_main_wait(uint8_t id) "iter %u"
multifd_recv_terminate_threads(bool error) "error %d"
multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64
multifd_recv_thread_end(uint8_t id, uint64_t packets) "channel %u packets %" PRIu64
multifd_recv_thread_start(uint8_t id) "%u"
multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal_pages, uint32_t zero_pages, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u zero pages %u flags 0x%x next packet size %u"
multifd_send_fill(uint8_t id, uint64_t packet_num, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " flags 0x%x next packet size %u"
multifd_send_ram_fill(uint8_t id, uint32_t normal, uint32_t zero) "channel %u normal pages %u zero pages %u"
multifd_send_error(uint8_t id) "channel %u"
multifd_send_sync_main(long packet_num) "packet num %ld"
multifd_send_sync_main_signal(uint8_t id) "channel %u"
multifd_send_sync_main_wait(uint8_t id) "channel %u"
multifd_send_terminate_threads(void) ""
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages, uint64_t zero_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64 " zero pages %" PRIu64
multifd_send_thread_end(uint8_t id, uint64_t packets) "channel %u packets %" PRIu64
multifd_send_thread_start(uint8_t id) "%u"
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"

View File

@ -621,7 +621,7 @@ static bool tlbemb_needed(void *opaque)
}
static const VMStateDescription vmstate_tlbemb = {
.name = "cpu/tlb6xx",
.name = "cpu/tlbemb",
.version_id = 1,
.minimum_version_id = 1,
.needed = tlbemb_needed,

View File

@ -514,7 +514,12 @@ static QTestState *qtest_init_internal(const char *qemu_bin,
kill(s->qemu_pid, SIGSTOP);
}
#endif
return s;
/* ask endianness of the target */
s->big_endian = qtest_query_target_endianness(s);
return s;
}
QTestState *qtest_init_without_qmp_handshake(const char *extra_args)
@ -522,21 +527,11 @@ QTestState *qtest_init_without_qmp_handshake(const char *extra_args)
return qtest_init_internal(qtest_qemu_binary(NULL), extra_args);
}
QTestState *qtest_init_with_env_no_handshake(const char *var,
const char *extra_args)
{
return qtest_init_internal(qtest_qemu_binary(var), extra_args);
}
QTestState *qtest_init_with_env(const char *var, const char *extra_args)
{
QTestState *s = qtest_init_internal(qtest_qemu_binary(var), extra_args);
QDict *greeting;
/* ask endianness of the target */
s->big_endian = qtest_query_target_endianness(s);
/* Read the QMP greeting and then do the handshake */
greeting = qtest_qmp_receive(s);
qobject_unref(greeting);

View File

@ -68,8 +68,6 @@ QTestState *qtest_init(const char *extra_args);
*/
QTestState *qtest_init_with_env(const char *var, const char *extra_args);
QTestState *qtest_init_with_env_no_handshake(const char *var,
const char *extra_args);
/**
* qtest_init_without_qmp_handshake:
* @extra_args: other arguments to pass to QEMU. CAUTION: these

View File

@ -82,11 +82,10 @@ static QDict *SocketAddress_to_qdict(SocketAddress *addr)
return dict;
}
static SocketAddress *migrate_get_socket_address(QTestState *who)
static SocketAddressList *migrate_get_socket_address(QTestState *who)
{
QDict *rsp;
SocketAddressList *addrs;
SocketAddress *addr;
Visitor *iv = NULL;
QObject *object;
@ -95,36 +94,35 @@ static SocketAddress *migrate_get_socket_address(QTestState *who)
iv = qobject_input_visitor_new(object);
visit_type_SocketAddressList(iv, NULL, &addrs, &error_abort);
addr = addrs->value;
visit_free(iv);
qobject_unref(rsp);
return addr;
return addrs;
}
static char *
migrate_get_connect_uri(QTestState *who)
{
SocketAddress *addrs;
SocketAddressList *addrs;
char *connect_uri;
addrs = migrate_get_socket_address(who);
connect_uri = SocketAddress_to_str(addrs);
connect_uri = SocketAddress_to_str(addrs->value);
qapi_free_SocketAddress(addrs);
qapi_free_SocketAddressList(addrs);
return connect_uri;
}
static QDict *
migrate_get_connect_qdict(QTestState *who)
{
SocketAddress *addrs;
SocketAddressList *addrs;
QDict *connect_qdict;
addrs = migrate_get_socket_address(who);
connect_qdict = SocketAddress_to_qdict(addrs);
connect_qdict = SocketAddress_to_qdict(addrs->value);
qapi_free_SocketAddress(addrs);
qapi_free_SocketAddressList(addrs);
return connect_qdict;
}
@ -144,7 +142,7 @@ static void migrate_set_ports(QTestState *to, QList *channel_list)
qdict_haskey(addr, "port") &&
(strcmp(qdict_get_str(addrdict, "port"), "0") == 0)) {
addr_port = qdict_get_str(addr, "port");
qdict_put_str(addrdict, "port", g_strdup(addr_port));
qdict_put_str(addrdict, "port", addr_port);
}
}

View File

@ -64,7 +64,6 @@ static QTestMigrationState dst_state;
#define DIRTYLIMIT_TOLERANCE_RANGE 25 /* MB/s */
#define ANALYZE_SCRIPT "scripts/analyze-migration.py"
#define VMSTATE_CHECKER_SCRIPT "scripts/vmstate-static-checker.py"
#define QEMU_VM_FILE_MAGIC 0x5145564d
#define FILE_TEST_FILENAME "migfile"
@ -146,6 +145,9 @@ static char *bootpath;
static void bootfile_delete(void)
{
if (!bootpath) {
return;
}
unlink(bootpath);
g_free(bootpath);
bootpath = NULL;
@ -157,10 +159,7 @@ static void bootfile_create(char *dir, bool suspend_me)
unsigned char *content;
size_t len;
if (bootpath) {
bootfile_delete();
}
bootfile_delete();
bootpath = g_strdup_printf("%s/bootsect", dir);
if (strcmp(arch, "i386") == 0 || strcmp(arch, "x86_64") == 0) {
/* the assembled x86 boot sector should be exactly one sector large */
@ -1062,12 +1061,15 @@ test_migrate_tls_x509_start_common(QTestState *from,
QCRYPTO_TLS_TEST_CLIENT_HOSTILE_NAME :
QCRYPTO_TLS_TEST_CLIENT_NAME,
data->clientcert);
test_tls_deinit_cert(&servercertreq);
}
TLS_CERT_REQ_SIMPLE_SERVER(clientcertreq, cacertreq,
data->servercert,
args->certhostname,
args->certipaddr);
test_tls_deinit_cert(&clientcertreq);
test_tls_deinit_cert(&cacertreq);
qtest_qmp_assert_success(from,
"{ 'execute': 'object-add',"
@ -1692,85 +1694,6 @@ static void test_analyze_script(void)
test_migrate_end(from, to, false);
cleanup("migfile");
}
static void test_vmstate_checker_script(void)
{
g_autofree gchar *cmd_src = NULL;
g_autofree gchar *cmd_dst = NULL;
g_autofree gchar *vmstate_src = NULL;
g_autofree gchar *vmstate_dst = NULL;
const char *machine_alias, *machine_opts = "";
g_autofree char *machine = NULL;
const char *arch = qtest_get_arch();
int pid, wstatus;
const char *python = g_getenv("PYTHON");
if (!getenv(QEMU_ENV_SRC) && !getenv(QEMU_ENV_DST)) {
g_test_skip("Test needs two different QEMU versions");
return;
}
if (!python) {
g_test_skip("PYTHON variable not set");
return;
}
if (strcmp(arch, "i386") == 0 || strcmp(arch, "x86_64") == 0) {
if (g_str_equal(arch, "i386")) {
machine_alias = "pc";
} else {
machine_alias = "q35";
}
} else if (g_str_equal(arch, "s390x")) {
machine_alias = "s390-ccw-virtio";
} else if (strcmp(arch, "ppc64") == 0) {
machine_alias = "pseries";
} else if (strcmp(arch, "aarch64") == 0) {
machine_alias = "virt";
} else {
g_assert_not_reached();
}
if (!qtest_has_machine(machine_alias)) {
g_autofree char *msg = g_strdup_printf("machine %s not supported", machine_alias);
g_test_skip(msg);
return;
}
machine = resolve_machine_version(machine_alias, QEMU_ENV_SRC,
QEMU_ENV_DST);
vmstate_src = g_strdup_printf("%s/vmstate-src", tmpfs);
vmstate_dst = g_strdup_printf("%s/vmstate-dst", tmpfs);
cmd_dst = g_strdup_printf("-machine %s,%s -dump-vmstate %s",
machine, machine_opts, vmstate_dst);
cmd_src = g_strdup_printf("-machine %s,%s -dump-vmstate %s",
machine, machine_opts, vmstate_src);
qtest_init_with_env_no_handshake(QEMU_ENV_SRC, cmd_src);
qtest_init_with_env_no_handshake(QEMU_ENV_DST, cmd_dst);
pid = fork();
if (!pid) {
close(1);
open("/dev/null", O_WRONLY);
execl(python, python, VMSTATE_CHECKER_SCRIPT,
"-s", vmstate_src,
"-d", vmstate_dst,
NULL);
g_assert_not_reached();
}
g_assert(waitpid(pid, &wstatus, 0) == pid);
if (!WIFEXITED(wstatus) || WEXITSTATUS(wstatus) != 0) {
g_test_message("Failed to run vmstate-static-checker.py");
g_test_fail();
}
cleanup("vmstate-src");
cleanup("vmstate-dst");
}
#endif
static void test_precopy_common(MigrateCommon *args)
@ -2395,6 +2318,7 @@ static void multifd_mapped_ram_fdset_end(QTestState *from, QTestState *to,
g_assert(qdict_haskey(resp, "return"));
fdsets = qdict_get_qlist(resp, "return");
g_assert(fdsets && qlist_empty(fdsets));
qobject_unref(resp);
}
static void *multifd_mapped_ram_fdset_dio(QTestState *from, QTestState *to)
@ -3318,6 +3242,7 @@ static void test_multifd_tcp_cancel(void)
/* Make sure QEMU process "to" exited */
qtest_set_expected_status(to, EXIT_FAILURE);
qtest_wait_qemu(to);
qtest_quit(to);
args = (MigrateStart){
.only_target = true,
@ -3397,15 +3322,18 @@ static QDict *query_vcpu_dirty_limit(QTestState *who)
static bool calc_dirtyrate_ready(QTestState *who)
{
QDict *rsp_return;
gchar *status;
const char *status;
bool ready;
rsp_return = query_dirty_rate(who);
g_assert(rsp_return);
status = g_strdup(qdict_get_str(rsp_return, "status"));
status = qdict_get_str(rsp_return, "status");
g_assert(status);
ready = g_strcmp0(status, "measuring");
qobject_unref(rsp_return);
return g_strcmp0(status, "measuring");
return ready;
}
static void wait_for_calc_dirtyrate_complete(QTestState *who,
@ -3428,7 +3356,7 @@ static void wait_for_calc_dirtyrate_complete(QTestState *who,
static int64_t get_dirty_rate(QTestState *who)
{
QDict *rsp_return;
gchar *status;
const char *status;
QList *rates;
const QListEntry *entry;
QDict *rate;
@ -3437,7 +3365,7 @@ static int64_t get_dirty_rate(QTestState *who)
rsp_return = query_dirty_rate(who);
g_assert(rsp_return);
status = g_strdup(qdict_get_str(rsp_return, "status"));
status = qdict_get_str(rsp_return, "status");
g_assert(status);
g_assert_cmpstr(status, ==, "measured");
@ -3823,8 +3751,6 @@ int main(int argc, char **argv)
migration_test_add("/migration/bad_dest", test_baddest);
#ifndef _WIN32
migration_test_add("/migration/analyze-script", test_analyze_script);
migration_test_add("/migration/vmstate-checker-script",
test_vmstate_checker_script);
#endif
if (is_x86) {
@ -4026,8 +3952,10 @@ int main(int argc, char **argv)
if (g_str_equal(arch, "x86_64") && has_kvm && kvm_dirty_ring_supported()) {
migration_test_add("/migration/dirty_ring",
test_precopy_unix_dirty_ring);
migration_test_add("/migration/vcpu_dirty_limit",
test_vcpu_dirty_limit);
if (qtest_has_machine("pc")) {
migration_test_add("/migration/vcpu_dirty_limit",
test_vcpu_dirty_limit);
}
}
ret = g_test_run();

View File

@ -135,6 +135,7 @@ void test_tls_init(const char *keyfile)
void test_tls_cleanup(const char *keyfile)
{
asn1_delete_structure(&pkix_asn1);
gnutls_x509_privkey_deinit(privkey);
unlink(keyfile);
}
@ -502,8 +503,7 @@ void test_tls_write_cert_chain(const char *filename,
g_free(buffer);
}
void test_tls_discard_cert(QCryptoTLSTestCertReq *req)
void test_tls_deinit_cert(QCryptoTLSTestCertReq *req)
{
if (!req->crt) {
return;
@ -511,6 +511,15 @@ void test_tls_discard_cert(QCryptoTLSTestCertReq *req)
gnutls_x509_crt_deinit(req->crt);
req->crt = NULL;
}
void test_tls_discard_cert(QCryptoTLSTestCertReq *req)
{
if (!req->crt) {
return;
}
test_tls_deinit_cert(req);
if (getenv("QEMU_TEST_DEBUG_CERTS") == NULL) {
unlink(req->filename);

View File

@ -73,6 +73,12 @@ void test_tls_generate_cert(QCryptoTLSTestCertReq *req,
void test_tls_write_cert_chain(const char *filename,
gnutls_x509_crt_t *certs,
size_t ncerts);
/*
* Deinitialize the QCryptoTLSTestCertReq, but don't delete the certificate
* file on disk. (The caller is then responsible for doing that themselves.
*/
void test_tls_deinit_cert(QCryptoTLSTestCertReq *req);
/* Deinit the QCryptoTLSTestCertReq, and delete the certificate file */
void test_tls_discard_cert(QCryptoTLSTestCertReq *req);
void test_tls_init(const char *keyfile);