Migration PULL request (20230508 edition, take 2)

Hi
 
 This is just the compression bits of the Migration PULL request for
 20230428.  Only change is that we don't run the compression tests by
 default.
 
 The problem already exist with compression code.  The test just show
 that it don't work.
 
 - Add migration tests for (old) compress migration code (lukas)
 - Make compression code independent of ram.c (lukas)
 - Move compression code into ram-compress.c (lukas)
 
 Please apply, Juan.
 -----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEEGJn/jt6/WMzuA0uC9IfvGFhy1yMFAmRZRMwACgkQ9IfvGFhy
 1yOdixAA1fOLanaYMUJZGLZ9sVTt7rDc4AEPRGkQOYYZNGK3LHaG2Dx9ob2/CEkS
 /YPp9Oth9QAYHZgiI2Xx8GSg98PRVr9b/GlQPseoCOFXnUL89rTpQtxQq4CV41E6
 AA5Dr8Z07hsr47ERQERFfDGD4zsvpn+NWM1ZBy+CCilf/o8UU4eIyfRF34YgSScv
 FVdWM4czUKei9fe2Go1KnMCz1GnT/6epl47Hs8zn9WAEeUfLILp7dbkbNq26F65G
 8YC8YnrikxU+2j+NIyIbRxbIdjR+JUbR14AyezwWZ2zGbirwWN1DP2WQx0QIZOqM
 ZuCqIDj5HpNSlHmShI0gNDfPvs+iM+sFSwQ7JE8Q03hlES9HF5c+MOr3Pl3J91hH
 EEmkk5gBJ2v2tvBuHgwVAQ2UH1+XT+a7RXeoMU1iizc2sXRGDK12ZsyaAg4D0oaF
 eohzJk2j1QXcx/DNK2G5uhzwgKvKv1/+rHyYQFtg+XuWVVipSNwqRjDJkDANAYZP
 VwKOOqDd5lHLOIzE1j61Yu06DJhkSoMvz74RQlqnk+r1EKJcTUZL52uhQor//DaL
 ULpBsgYzoMUMrtw7myHxq4t0t6mmOtOkb0CvO8dTzkIV0YgIFTtPFB0ySXOFUFf5
 UoFoMFKlfbPpDsvTNEVErxpaG4FBwZNVt67V2KXQ53xRPShyBiQ=
 =SG8L
 -----END PGP SIGNATURE-----

Merge tag 'compression-code-pull-request' of https://gitlab.com/juan.quintela/qemu into staging

Migration PULL request (20230508 edition, take 2)

Hi

This is just the compression bits of the Migration PULL request for
20230428.  Only change is that we don't run the compression tests by
default.

The problem already exist with compression code.  The test just show
that it don't work.

- Add migration tests for (old) compress migration code (lukas)
- Make compression code independent of ram.c (lukas)
- Move compression code into ram-compress.c (lukas)

Please apply, Juan.

# -----BEGIN PGP SIGNATURE-----
#
# iQIzBAABCAAdFiEEGJn/jt6/WMzuA0uC9IfvGFhy1yMFAmRZRMwACgkQ9IfvGFhy
# 1yOdixAA1fOLanaYMUJZGLZ9sVTt7rDc4AEPRGkQOYYZNGK3LHaG2Dx9ob2/CEkS
# /YPp9Oth9QAYHZgiI2Xx8GSg98PRVr9b/GlQPseoCOFXnUL89rTpQtxQq4CV41E6
# AA5Dr8Z07hsr47ERQERFfDGD4zsvpn+NWM1ZBy+CCilf/o8UU4eIyfRF34YgSScv
# FVdWM4czUKei9fe2Go1KnMCz1GnT/6epl47Hs8zn9WAEeUfLILp7dbkbNq26F65G
# 8YC8YnrikxU+2j+NIyIbRxbIdjR+JUbR14AyezwWZ2zGbirwWN1DP2WQx0QIZOqM
# ZuCqIDj5HpNSlHmShI0gNDfPvs+iM+sFSwQ7JE8Q03hlES9HF5c+MOr3Pl3J91hH
# EEmkk5gBJ2v2tvBuHgwVAQ2UH1+XT+a7RXeoMU1iizc2sXRGDK12ZsyaAg4D0oaF
# eohzJk2j1QXcx/DNK2G5uhzwgKvKv1/+rHyYQFtg+XuWVVipSNwqRjDJkDANAYZP
# VwKOOqDd5lHLOIzE1j61Yu06DJhkSoMvz74RQlqnk+r1EKJcTUZL52uhQor//DaL
# ULpBsgYzoMUMrtw7myHxq4t0t6mmOtOkb0CvO8dTzkIV0YgIFTtPFB0ySXOFUFf5
# UoFoMFKlfbPpDsvTNEVErxpaG4FBwZNVt67V2KXQ53xRPShyBiQ=
# =SG8L
# -----END PGP SIGNATURE-----
# gpg: Signature made Mon 08 May 2023 07:51:56 PM BST
# gpg:                using RSA key 1899FF8EDEBF58CCEE034B82F487EF185872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>" [undefined]
# gpg:                 aka "Juan Quintela <quintela@trasno.org>" [undefined]
# gpg: WARNING: This key is not certified with a trusted signature!
# gpg:          There is no indication that the signature belongs to the owner.
# Primary key fingerprint: 1899 FF8E DEBF 58CC EE03  4B82 F487 EF18 5872 D723

* tag 'compression-code-pull-request' of https://gitlab.com/juan.quintela/qemu:
  migration: Initialize and cleanup decompression in migration.c
  ram-compress.c: Make target independent
  ram compress: Assert that the file buffer matches the result
  ram.c: Move core decompression code into its own file
  ram.c: Move core compression code into its own file
  ram.c: Remove last ram.c dependency from the core compress code
  ram.c: Call update_compress_thread_counts from compress_send_queued_data
  ram.c: Do not call save_page_header() from compress threads
  ram.c: Reset result after sending queued data
  ram.c: Dont change param->block in the compress thread
  ram.c: Let the compress threads return a CompressResult enum
  qtest/migration-test.c: Add postcopy tests with compress enabled
  qtest/migration-test.c: Add tests with compress enabled

Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
This commit is contained in:
Richard Henderson 2023-05-08 20:38:05 +01:00
commit 271477b59e
8 changed files with 758 additions and 460 deletions

View File

@ -23,6 +23,8 @@ softmmu_ss.add(files(
'migration.c',
'multifd.c',
'multifd-zlib.c',
'multifd-zlib.c',
'ram-compress.c',
'options.c',
'postcopy-ram.c',
'savevm.c',
@ -38,4 +40,6 @@ endif
softmmu_ss.add(when: zstd, if_true: files('multifd-zstd.c'))
specific_ss.add(when: 'CONFIG_SOFTMMU',
if_true: files('dirtyrate.c', 'ram.c', 'target.c'))
if_true: files('dirtyrate.c',
'ram.c',
'target.c'))

View File

@ -26,6 +26,7 @@
#include "sysemu/cpu-throttle.h"
#include "rdma.h"
#include "ram.h"
#include "ram-compress.h"
#include "migration/global_state.h"
#include "migration/misc.h"
#include "migration.h"
@ -228,6 +229,7 @@ void migration_incoming_state_destroy(void)
struct MigrationIncomingState *mis = migration_incoming_get_current();
multifd_load_cleanup();
compress_threads_load_cleanup();
if (mis->to_src_file) {
/* Tell source that we are done */
@ -500,6 +502,12 @@ process_incoming_migration_co(void *opaque)
Error *local_err = NULL;
assert(mis->from_src_file);
if (compress_threads_load_setup(mis->from_src_file)) {
error_report("Failed to setup decompress threads");
goto fail;
}
mis->migration_incoming_co = qemu_coroutine_self();
mis->largest_page_size = qemu_ram_pagesize_largest();
postcopy_state_set(POSTCOPY_INCOMING_NONE);
@ -565,6 +573,7 @@ fail:
qemu_fclose(mis->from_src_file);
multifd_load_cleanup();
compress_threads_load_cleanup();
exit(EXIT_FAILURE);
}

View File

@ -870,6 +870,17 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
return len;
}
/*
* Check if the writable buffer is empty
*/
bool qemu_file_buffer_empty(QEMUFile *file)
{
assert(qemu_file_is_writable(file));
return !file->iovcnt;
}
/*
* Get a string whose length is determined by a single preceding byte
* A preallocated 256 byte buffer must be passed in.

View File

@ -113,6 +113,7 @@ size_t coroutine_mixed_fn qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, s
ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
const uint8_t *p, size_t size);
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
bool qemu_file_buffer_empty(QEMUFile *file);
/*
* Note that you can only peek continuous bytes from where the current pointer

485
migration/ram-compress.c Normal file
View File

@ -0,0 +1,485 @@
/*
* QEMU System Emulator
*
* Copyright (c) 2003-2008 Fabrice Bellard
* Copyright (c) 2011-2015 Red Hat Inc
*
* Authors:
* Juan Quintela <quintela@redhat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu/osdep.h"
#include "qemu/cutils.h"
#include "ram-compress.h"
#include "qemu/error-report.h"
#include "migration.h"
#include "options.h"
#include "io/channel-null.h"
#include "exec/target_page.h"
#include "exec/ramblock.h"
CompressionStats compression_counters;
static CompressParam *comp_param;
static QemuThread *compress_threads;
/* comp_done_cond is used to wake up the migration thread when
* one of the compression threads has finished the compression.
* comp_done_lock is used to co-work with comp_done_cond.
*/
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
struct DecompressParam {
bool done;
bool quit;
QemuMutex mutex;
QemuCond cond;
void *des;
uint8_t *compbuf;
int len;
z_stream stream;
};
typedef struct DecompressParam DecompressParam;
static QEMUFile *decomp_file;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf);
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
RAMBlock *block;
ram_addr_t offset;
CompressResult result;
qemu_mutex_lock(&param->mutex);
while (!param->quit) {
if (param->trigger) {
block = param->block;
offset = param->offset;
param->trigger = false;
qemu_mutex_unlock(&param->mutex);
result = do_compress_ram_page(param->file, &param->stream,
block, offset, param->originbuf);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
param->result = result;
qemu_cond_signal(&comp_done_cond);
qemu_mutex_unlock(&comp_done_lock);
qemu_mutex_lock(&param->mutex);
} else {
qemu_cond_wait(&param->cond, &param->mutex);
}
}
qemu_mutex_unlock(&param->mutex);
return NULL;
}
void compress_threads_save_cleanup(void)
{
int i, thread_count;
if (!migrate_compress() || !comp_param) {
return;
}
thread_count = migrate_compress_threads();
for (i = 0; i < thread_count; i++) {
/*
* we use it as a indicator which shows if the thread is
* properly init'd or not
*/
if (!comp_param[i].file) {
break;
}
qemu_mutex_lock(&comp_param[i].mutex);
comp_param[i].quit = true;
qemu_cond_signal(&comp_param[i].cond);
qemu_mutex_unlock(&comp_param[i].mutex);
qemu_thread_join(compress_threads + i);
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
deflateEnd(&comp_param[i].stream);
g_free(comp_param[i].originbuf);
qemu_fclose(comp_param[i].file);
comp_param[i].file = NULL;
}
qemu_mutex_destroy(&comp_done_lock);
qemu_cond_destroy(&comp_done_cond);
g_free(compress_threads);
g_free(comp_param);
compress_threads = NULL;
comp_param = NULL;
}
int compress_threads_save_setup(void)
{
int i, thread_count;
if (!migrate_compress()) {
return 0;
}
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
if (!comp_param[i].originbuf) {
goto exit;
}
if (deflateInit(&comp_param[i].stream,
migrate_compress_level()) != Z_OK) {
g_free(comp_param[i].originbuf);
goto exit;
}
/* comp_param[i].file is just used as a dummy buffer to save data,
* set its ops to empty.
*/
comp_param[i].file = qemu_file_new_output(
QIO_CHANNEL(qio_channel_null_new()));
comp_param[i].done = true;
comp_param[i].quit = false;
qemu_mutex_init(&comp_param[i].mutex);
qemu_cond_init(&comp_param[i].cond);
qemu_thread_create(compress_threads + i, "compress",
do_data_compress, comp_param + i,
QEMU_THREAD_JOINABLE);
}
return 0;
exit:
compress_threads_save_cleanup();
return -1;
}
static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf)
{
uint8_t *p = block->host + offset;
size_t page_size = qemu_target_page_size();
int ret;
assert(qemu_file_buffer_empty(f));
if (buffer_is_zero(p, page_size)) {
return RES_ZEROPAGE;
}
/*
* copy it to a internal buffer to avoid it being modified by VM
* so that we can catch up the error during compression and
* decompression
*/
memcpy(source_buf, p, page_size);
ret = qemu_put_compression_data(f, stream, source_buf, page_size);
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
qemu_fflush(f);
return RES_NONE;
}
return RES_COMPRESS;
}
static inline void compress_reset_result(CompressParam *param)
{
param->result = RES_NONE;
param->block = NULL;
param->offset = 0;
}
void flush_compressed_data(int (send_queued_data(CompressParam *)))
{
int idx, thread_count;
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
while (!comp_param[idx].done) {
qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
qemu_mutex_unlock(&comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
CompressParam *param = &comp_param[idx];
send_queued_data(param);
assert(qemu_file_buffer_empty(param->file));
compress_reset_result(param);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
}
}
static inline void set_compress_params(CompressParam *param, RAMBlock *block,
ram_addr_t offset)
{
param->block = block;
param->offset = offset;
param->trigger = true;
}
int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
int (send_queued_data(CompressParam *)))
{
int idx, thread_count, pages = -1;
bool wait = migrate_compress_wait_thread();
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
retry:
for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) {
CompressParam *param = &comp_param[idx];
qemu_mutex_lock(&param->mutex);
param->done = false;
send_queued_data(param);
assert(qemu_file_buffer_empty(param->file));
compress_reset_result(param);
set_compress_params(param, block, offset);
qemu_cond_signal(&param->cond);
qemu_mutex_unlock(&param->mutex);
pages = 1;
break;
}
}
/*
* wait for the free thread if the user specifies 'compress-wait-thread',
* otherwise we will post the page out in the main thread as normal page.
*/
if (pages < 0 && wait) {
qemu_cond_wait(&comp_done_cond, &comp_done_lock);
goto retry;
}
qemu_mutex_unlock(&comp_done_lock);
return pages;
}
/* return the size after decompression, or negative value on error */
static int
qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
const uint8_t *source, size_t source_len)
{
int err;
err = inflateReset(stream);
if (err != Z_OK) {
return -1;
}
stream->avail_in = source_len;
stream->next_in = (uint8_t *)source;
stream->avail_out = dest_len;
stream->next_out = dest;
err = inflate(stream, Z_NO_FLUSH);
if (err != Z_STREAM_END) {
return -1;
}
return stream->total_out;
}
static void *do_data_decompress(void *opaque)
{
DecompressParam *param = opaque;
unsigned long pagesize;
uint8_t *des;
int len, ret;
qemu_mutex_lock(&param->mutex);
while (!param->quit) {
if (param->des) {
des = param->des;
len = param->len;
param->des = 0;
qemu_mutex_unlock(&param->mutex);
pagesize = qemu_target_page_size();
ret = qemu_uncompress_data(&param->stream, des, pagesize,
param->compbuf, len);
if (ret < 0 && migrate_get_current()->decompress_error_check) {
error_report("decompress data failed");
qemu_file_set_error(decomp_file, ret);
}
qemu_mutex_lock(&decomp_done_lock);
param->done = true;
qemu_cond_signal(&decomp_done_cond);
qemu_mutex_unlock(&decomp_done_lock);
qemu_mutex_lock(&param->mutex);
} else {
qemu_cond_wait(&param->cond, &param->mutex);
}
}
qemu_mutex_unlock(&param->mutex);
return NULL;
}
int wait_for_decompress_done(void)
{
int idx, thread_count;
if (!migrate_compress()) {
return 0;
}
thread_count = migrate_decompress_threads();
qemu_mutex_lock(&decomp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
while (!decomp_param[idx].done) {
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
qemu_mutex_unlock(&decomp_done_lock);
return qemu_file_get_error(decomp_file);
}
void compress_threads_load_cleanup(void)
{
int i, thread_count;
if (!migrate_compress()) {
return;
}
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
/*
* we use it as a indicator which shows if the thread is
* properly init'd or not
*/
if (!decomp_param[i].compbuf) {
break;
}
qemu_mutex_lock(&decomp_param[i].mutex);
decomp_param[i].quit = true;
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
}
for (i = 0; i < thread_count; i++) {
if (!decomp_param[i].compbuf) {
break;
}
qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond);
inflateEnd(&decomp_param[i].stream);
g_free(decomp_param[i].compbuf);
decomp_param[i].compbuf = NULL;
}
g_free(decompress_threads);
g_free(decomp_param);
decompress_threads = NULL;
decomp_param = NULL;
decomp_file = NULL;
}
int compress_threads_load_setup(QEMUFile *f)
{
int i, thread_count;
if (!migrate_compress()) {
return 0;
}
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
qemu_mutex_init(&decomp_done_lock);
qemu_cond_init(&decomp_done_cond);
decomp_file = f;
for (i = 0; i < thread_count; i++) {
if (inflateInit(&decomp_param[i].stream) != Z_OK) {
goto exit;
}
size_t compbuf_size = compressBound(qemu_target_page_size());
decomp_param[i].compbuf = g_malloc0(compbuf_size);
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].done = true;
decomp_param[i].quit = false;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
}
return 0;
exit:
compress_threads_load_cleanup();
return -1;
}
void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
{
int idx, thread_count;
thread_count = migrate_decompress_threads();
QEMU_LOCK_GUARD(&decomp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (decomp_param[idx].done) {
decomp_param[idx].done = false;
qemu_mutex_lock(&decomp_param[idx].mutex);
qemu_get_buffer(f, decomp_param[idx].compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
qemu_cond_signal(&decomp_param[idx].cond);
qemu_mutex_unlock(&decomp_param[idx].mutex);
break;
}
}
if (idx < thread_count) {
break;
} else {
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
}

70
migration/ram-compress.h Normal file
View File

@ -0,0 +1,70 @@
/*
* QEMU System Emulator
*
* Copyright (c) 2003-2008 Fabrice Bellard
* Copyright (c) 2011-2015 Red Hat Inc
*
* Authors:
* Juan Quintela <quintela@redhat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef QEMU_MIGRATION_COMPRESS_H
#define QEMU_MIGRATION_COMPRESS_H
#include "qemu-file.h"
enum CompressResult {
RES_NONE = 0,
RES_ZEROPAGE = 1,
RES_COMPRESS = 2
};
typedef enum CompressResult CompressResult;
struct CompressParam {
bool done;
bool quit;
bool trigger;
CompressResult result;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
RAMBlock *block;
ram_addr_t offset;
/* internally used fields */
z_stream stream;
uint8_t *originbuf;
};
typedef struct CompressParam CompressParam;
void compress_threads_save_cleanup(void);
int compress_threads_save_setup(void);
void flush_compressed_data(int (send_queued_data(CompressParam *)));
int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
int (send_queued_data(CompressParam *)));
int wait_for_decompress_done(void);
void compress_threads_load_cleanup(void);
int compress_threads_load_setup(QEMUFile *f);
void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len);
#endif

View File

@ -32,8 +32,8 @@
#include "qemu/bitmap.h"
#include "qemu/madvise.h"
#include "qemu/main-loop.h"
#include "io/channel-null.h"
#include "xbzrle.h"
#include "ram-compress.h"
#include "ram.h"
#include "migration.h"
#include "migration-stats.h"
@ -480,56 +480,8 @@ typedef struct MigrationOps MigrationOps;
MigrationOps *migration_ops;
CompressionStats compression_counters;
struct CompressParam {
bool done;
bool quit;
bool zero_page;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
RAMBlock *block;
ram_addr_t offset;
/* internally used fields */
z_stream stream;
uint8_t *originbuf;
};
typedef struct CompressParam CompressParam;
struct DecompressParam {
bool done;
bool quit;
QemuMutex mutex;
QemuCond cond;
void *des;
uint8_t *compbuf;
int len;
z_stream stream;
};
typedef struct DecompressParam DecompressParam;
static CompressParam *comp_param;
static QemuThread *compress_threads;
/* comp_done_cond is used to wake up the migration thread when
* one of the compression threads has finished the compression.
* comp_done_lock is used to co-work with comp_done_cond.
*/
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
static QEMUFile *decomp_file;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
static int ram_save_host_page_urgent(PageSearchStatus *pss);
static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset, uint8_t *source_buf);
/* NOTE: page is the PFN not real ram_addr_t. */
static void pss_init(PageSearchStatus *pss, RAMBlock *rb, ram_addr_t page)
{
@ -548,123 +500,6 @@ static bool pss_overlap(PageSearchStatus *pss1, PageSearchStatus *pss2)
(pss1->host_page_start == pss2->host_page_start);
}
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
RAMBlock *block;
ram_addr_t offset;
bool zero_page;
qemu_mutex_lock(&param->mutex);
while (!param->quit) {
if (param->block) {
block = param->block;
offset = param->offset;
param->block = NULL;
qemu_mutex_unlock(&param->mutex);
zero_page = do_compress_ram_page(param->file, &param->stream,
block, offset, param->originbuf);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
param->zero_page = zero_page;
qemu_cond_signal(&comp_done_cond);
qemu_mutex_unlock(&comp_done_lock);
qemu_mutex_lock(&param->mutex);
} else {
qemu_cond_wait(&param->cond, &param->mutex);
}
}
qemu_mutex_unlock(&param->mutex);
return NULL;
}
static void compress_threads_save_cleanup(void)
{
int i, thread_count;
if (!migrate_compress() || !comp_param) {
return;
}
thread_count = migrate_compress_threads();
for (i = 0; i < thread_count; i++) {
/*
* we use it as a indicator which shows if the thread is
* properly init'd or not
*/
if (!comp_param[i].file) {
break;
}
qemu_mutex_lock(&comp_param[i].mutex);
comp_param[i].quit = true;
qemu_cond_signal(&comp_param[i].cond);
qemu_mutex_unlock(&comp_param[i].mutex);
qemu_thread_join(compress_threads + i);
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
deflateEnd(&comp_param[i].stream);
g_free(comp_param[i].originbuf);
qemu_fclose(comp_param[i].file);
comp_param[i].file = NULL;
}
qemu_mutex_destroy(&comp_done_lock);
qemu_cond_destroy(&comp_done_cond);
g_free(compress_threads);
g_free(comp_param);
compress_threads = NULL;
comp_param = NULL;
}
static int compress_threads_save_setup(void)
{
int i, thread_count;
if (!migrate_compress()) {
return 0;
}
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
if (!comp_param[i].originbuf) {
goto exit;
}
if (deflateInit(&comp_param[i].stream,
migrate_compress_level()) != Z_OK) {
g_free(comp_param[i].originbuf);
goto exit;
}
/* comp_param[i].file is just used as a dummy buffer to save data,
* set its ops to empty.
*/
comp_param[i].file = qemu_file_new_output(
QIO_CHANNEL(qio_channel_null_new()));
comp_param[i].done = true;
comp_param[i].quit = false;
qemu_mutex_init(&comp_param[i].mutex);
qemu_cond_init(&comp_param[i].cond);
qemu_thread_create(compress_threads + i, "compress",
do_data_compress, comp_param + i,
QEMU_THREAD_JOINABLE);
}
return 0;
exit:
compress_threads_save_cleanup();
return -1;
}
/**
* save_page_header: write page header to wire
*
@ -1452,40 +1287,12 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
return 1;
}
static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset, uint8_t *source_buf)
{
RAMState *rs = ram_state;
PageSearchStatus *pss = &rs->pss[RAM_CHANNEL_PRECOPY];
uint8_t *p = block->host + offset;
int ret;
if (save_zero_page_to_file(pss, f, block, offset)) {
return true;
}
save_page_header(pss, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
/*
* copy it to a internal buffer to avoid it being modified by VM
* so that we can catch up the error during compression and
* decompression
*/
memcpy(source_buf, p, TARGET_PAGE_SIZE);
ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
}
return false;
}
static void
update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
{
ram_transferred_add(bytes_xmit);
if (param->zero_page) {
if (param->result == RES_ZEROPAGE) {
stat64_add(&mig_stats.zero_pages, 1);
return;
}
@ -1497,81 +1304,49 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
static bool save_page_use_compression(RAMState *rs);
static void flush_compressed_data(RAMState *rs)
static int send_queued_data(CompressParam *param)
{
PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY];
MigrationState *ms = migrate_get_current();
int idx, len, thread_count;
QEMUFile *file = ms->to_dst_file;
int len = 0;
RAMBlock *block = param->block;
ram_addr_t offset = param->offset;
if (param->result == RES_NONE) {
return 0;
}
assert(block == pss->last_sent_block);
if (param->result == RES_ZEROPAGE) {
assert(qemu_file_buffer_empty(param->file));
len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_ZERO);
qemu_put_byte(file, 0);
len += 1;
ram_release_page(block->idstr, offset);
} else if (param->result == RES_COMPRESS) {
assert(!qemu_file_buffer_empty(param->file));
len += save_page_header(pss, file, block,
offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
len += qemu_put_qemu_file(file, param->file);
} else {
abort();
}
update_compress_thread_counts(param, len);
return len;
}
static void ram_flush_compressed_data(RAMState *rs)
{
if (!save_page_use_compression(rs)) {
return;
}
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
while (!comp_param[idx].done) {
qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
qemu_mutex_unlock(&comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file);
/*
* it's safe to fetch zero_page without holding comp_done_lock
* as there is no further request submitted to the thread,
* i.e, the thread should be waiting for a request at this point.
*/
update_compress_thread_counts(&comp_param[idx], len);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
}
}
static inline void set_compress_params(CompressParam *param, RAMBlock *block,
ram_addr_t offset)
{
param->block = block;
param->offset = offset;
}
static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
bool wait = migrate_compress_wait_thread();
MigrationState *ms = migrate_get_current();
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
retry:
for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) {
comp_param[idx].done = false;
bytes_xmit = qemu_put_qemu_file(ms->to_dst_file,
comp_param[idx].file);
qemu_mutex_lock(&comp_param[idx].mutex);
set_compress_params(&comp_param[idx], block, offset);
qemu_cond_signal(&comp_param[idx].cond);
qemu_mutex_unlock(&comp_param[idx].mutex);
pages = 1;
update_compress_thread_counts(&comp_param[idx], bytes_xmit);
break;
}
}
/*
* wait for the free thread if the user specifies 'compress-wait-thread',
* otherwise we will post the page out in the main thread as normal page.
*/
if (pages < 0 && wait) {
qemu_cond_wait(&comp_done_cond, &comp_done_lock);
goto retry;
}
qemu_mutex_unlock(&comp_done_lock);
return pages;
flush_compressed_data(send_queued_data);
}
#define PAGE_ALL_CLEAN 0
@ -1628,7 +1403,7 @@ static int find_dirty_block(RAMState *rs, PageSearchStatus *pss)
* Also If xbzrle is on, stop using the data compression at this
* point. In theory, xbzrle can do better than compression.
*/
flush_compressed_data(rs);
ram_flush_compressed_data(rs);
/* Hit the end of the list */
pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
@ -2318,11 +2093,11 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss,
* much CPU resource.
*/
if (block != pss->last_sent_block) {
flush_compressed_data(rs);
ram_flush_compressed_data(rs);
return false;
}
if (compress_page_with_multi_thread(block, offset) > 0) {
if (compress_page_with_multi_thread(block, offset, send_queued_data) > 0) {
return true;
}
@ -3368,7 +3143,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
* page is sent in one chunk.
*/
if (migrate_postcopy_ram()) {
flush_compressed_data(rs);
ram_flush_compressed_data(rs);
}
/*
@ -3463,7 +3238,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
}
qemu_mutex_unlock(&rs->bitmap_mutex);
flush_compressed_data(rs);
ram_flush_compressed_data(rs);
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
}
@ -3674,192 +3449,6 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}
/* return the size after decompression, or negative value on error */
static int
qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
const uint8_t *source, size_t source_len)
{
int err;
err = inflateReset(stream);
if (err != Z_OK) {
return -1;
}
stream->avail_in = source_len;
stream->next_in = (uint8_t *)source;
stream->avail_out = dest_len;
stream->next_out = dest;
err = inflate(stream, Z_NO_FLUSH);
if (err != Z_STREAM_END) {
return -1;
}
return stream->total_out;
}
static void *do_data_decompress(void *opaque)
{
DecompressParam *param = opaque;
unsigned long pagesize;
uint8_t *des;
int len, ret;
qemu_mutex_lock(&param->mutex);
while (!param->quit) {
if (param->des) {
des = param->des;
len = param->len;
param->des = 0;
qemu_mutex_unlock(&param->mutex);
pagesize = TARGET_PAGE_SIZE;
ret = qemu_uncompress_data(&param->stream, des, pagesize,
param->compbuf, len);
if (ret < 0 && migrate_get_current()->decompress_error_check) {
error_report("decompress data failed");
qemu_file_set_error(decomp_file, ret);
}
qemu_mutex_lock(&decomp_done_lock);
param->done = true;
qemu_cond_signal(&decomp_done_cond);
qemu_mutex_unlock(&decomp_done_lock);
qemu_mutex_lock(&param->mutex);
} else {
qemu_cond_wait(&param->cond, &param->mutex);
}
}
qemu_mutex_unlock(&param->mutex);
return NULL;
}
static int wait_for_decompress_done(void)
{
int idx, thread_count;
if (!migrate_compress()) {
return 0;
}
thread_count = migrate_decompress_threads();
qemu_mutex_lock(&decomp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
while (!decomp_param[idx].done) {
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
qemu_mutex_unlock(&decomp_done_lock);
return qemu_file_get_error(decomp_file);
}
static void compress_threads_load_cleanup(void)
{
int i, thread_count;
if (!migrate_compress()) {
return;
}
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
/*
* we use it as a indicator which shows if the thread is
* properly init'd or not
*/
if (!decomp_param[i].compbuf) {
break;
}
qemu_mutex_lock(&decomp_param[i].mutex);
decomp_param[i].quit = true;
qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex);
}
for (i = 0; i < thread_count; i++) {
if (!decomp_param[i].compbuf) {
break;
}
qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond);
inflateEnd(&decomp_param[i].stream);
g_free(decomp_param[i].compbuf);
decomp_param[i].compbuf = NULL;
}
g_free(decompress_threads);
g_free(decomp_param);
decompress_threads = NULL;
decomp_param = NULL;
decomp_file = NULL;
}
static int compress_threads_load_setup(QEMUFile *f)
{
int i, thread_count;
if (!migrate_compress()) {
return 0;
}
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
qemu_mutex_init(&decomp_done_lock);
qemu_cond_init(&decomp_done_cond);
decomp_file = f;
for (i = 0; i < thread_count; i++) {
if (inflateInit(&decomp_param[i].stream) != Z_OK) {
goto exit;
}
decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].done = true;
decomp_param[i].quit = false;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
}
return 0;
exit:
compress_threads_load_cleanup();
return -1;
}
static void decompress_data_with_multi_threads(QEMUFile *f,
void *host, int len)
{
int idx, thread_count;
thread_count = migrate_decompress_threads();
QEMU_LOCK_GUARD(&decomp_done_lock);
while (true) {
for (idx = 0; idx < thread_count; idx++) {
if (decomp_param[idx].done) {
decomp_param[idx].done = false;
qemu_mutex_lock(&decomp_param[idx].mutex);
qemu_get_buffer(f, decomp_param[idx].compbuf, len);
decomp_param[idx].des = host;
decomp_param[idx].len = len;
qemu_cond_signal(&decomp_param[idx].cond);
qemu_mutex_unlock(&decomp_param[idx].mutex);
break;
}
}
if (idx < thread_count) {
break;
} else {
qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
}
}
}
static void colo_init_ram_state(void)
{
ram_state_init(&ram_state);
@ -3969,10 +3558,6 @@ void colo_release_ram_cache(void)
*/
static int ram_load_setup(QEMUFile *f, void *opaque)
{
if (compress_threads_load_setup(f)) {
return -1;
}
xbzrle_load_setup();
ramblock_recv_map_init();
@ -3988,7 +3573,6 @@ static int ram_load_cleanup(void *opaque)
}
xbzrle_load_cleanup();
compress_threads_load_cleanup();
RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
g_free(rb->receivedmap);

View File

@ -406,6 +406,41 @@ static void migrate_set_parameter_str(QTestState *who, const char *parameter,
migrate_check_parameter_str(who, parameter, value);
}
static long long migrate_get_parameter_bool(QTestState *who,
const char *parameter)
{
QDict *rsp;
int result;
rsp = wait_command(who, "{ 'execute': 'query-migrate-parameters' }");
result = qdict_get_bool(rsp, parameter);
qobject_unref(rsp);
return !!result;
}
static void migrate_check_parameter_bool(QTestState *who, const char *parameter,
int value)
{
int result;
result = migrate_get_parameter_bool(who, parameter);
g_assert_cmpint(result, ==, value);
}
static void migrate_set_parameter_bool(QTestState *who, const char *parameter,
int value)
{
QDict *rsp;
rsp = qtest_qmp(who,
"{ 'execute': 'migrate-set-parameters',"
"'arguments': { %s: %i } }",
parameter, value);
g_assert(qdict_haskey(rsp, "return"));
qobject_unref(rsp);
migrate_check_parameter_bool(who, parameter, value);
}
static void migrate_ensure_non_converge(QTestState *who)
{
/* Can't converge with 1ms downtime + 3 mbs bandwidth limit */
@ -1092,6 +1127,36 @@ test_migrate_tls_x509_finish(QTestState *from,
#endif /* CONFIG_TASN1 */
#endif /* CONFIG_GNUTLS */
static void *
test_migrate_compress_start(QTestState *from,
QTestState *to)
{
migrate_set_parameter_int(from, "compress-level", 1);
migrate_set_parameter_int(from, "compress-threads", 4);
migrate_set_parameter_bool(from, "compress-wait-thread", true);
migrate_set_parameter_int(to, "decompress-threads", 4);
migrate_set_capability(from, "compress", true);
migrate_set_capability(to, "compress", true);
return NULL;
}
static void *
test_migrate_compress_nowait_start(QTestState *from,
QTestState *to)
{
migrate_set_parameter_int(from, "compress-level", 9);
migrate_set_parameter_int(from, "compress-threads", 1);
migrate_set_parameter_bool(from, "compress-wait-thread", false);
migrate_set_parameter_int(to, "decompress-threads", 1);
migrate_set_capability(from, "compress", true);
migrate_set_capability(to, "compress", true);
return NULL;
}
static int migrate_postcopy_prepare(QTestState **from_ptr,
QTestState **to_ptr,
MigrateCommon *args)
@ -1169,6 +1234,15 @@ static void test_postcopy(void)
test_postcopy_common(&args);
}
static void test_postcopy_compress(void)
{
MigrateCommon args = {
.start_hook = test_migrate_compress_start
};
test_postcopy_common(&args);
}
static void test_postcopy_preempt(void)
{
MigrateCommon args = {
@ -1270,6 +1344,15 @@ static void test_postcopy_recovery(void)
test_postcopy_recovery_common(&args);
}
static void test_postcopy_recovery_compress(void)
{
MigrateCommon args = {
.start_hook = test_migrate_compress_start
};
test_postcopy_recovery_common(&args);
}
#ifdef CONFIG_GNUTLS
static void test_postcopy_recovery_tls_psk(void)
{
@ -1303,6 +1386,7 @@ static void test_postcopy_preempt_all(void)
test_postcopy_recovery_common(&args);
}
#endif
static void test_baddest(void)
@ -1524,6 +1608,40 @@ static void test_precopy_unix_xbzrle(void)
test_precopy_common(&args);
}
static void test_precopy_unix_compress(void)
{
g_autofree char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
MigrateCommon args = {
.connect_uri = uri,
.listen_uri = uri,
.start_hook = test_migrate_compress_start,
/*
* Test that no invalid thread state is left over from
* the previous iteration.
*/
.iterations = 2,
};
test_precopy_common(&args);
}
static void test_precopy_unix_compress_nowait(void)
{
g_autofree char *uri = g_strdup_printf("unix:%s/migsocket", tmpfs);
MigrateCommon args = {
.connect_uri = uri,
.listen_uri = uri,
.start_hook = test_migrate_compress_nowait_start,
/*
* Test that no invalid thread state is left over from
* the previous iteration.
*/
.iterations = 2,
};
test_precopy_common(&args);
}
static void test_precopy_tcp_plain(void)
{
MigrateCommon args = {
@ -2532,11 +2650,27 @@ int main(int argc, char **argv)
qtest_add_func("/migration/postcopy/preempt/plain", test_postcopy_preempt);
qtest_add_func("/migration/postcopy/preempt/recovery/plain",
test_postcopy_preempt_recovery);
if (getenv("QEMU_TEST_FLAKY_TESTS")) {
qtest_add_func("/migration/postcopy/compress/plain",
test_postcopy_compress);
qtest_add_func("/migration/postcopy/recovery/compress/plain",
test_postcopy_recovery_compress);
}
}
qtest_add_func("/migration/bad_dest", test_baddest);
qtest_add_func("/migration/precopy/unix/plain", test_precopy_unix_plain);
qtest_add_func("/migration/precopy/unix/xbzrle", test_precopy_unix_xbzrle);
/*
* Compression fails from time to time.
* Put test here but don't enable it until everything is fixed.
*/
if (getenv("QEMU_TEST_FLAKY_TESTS")) {
qtest_add_func("/migration/precopy/unix/compress/wait",
test_precopy_unix_compress);
qtest_add_func("/migration/precopy/unix/compress/nowait",
test_precopy_unix_compress_nowait);
}
#ifdef CONFIG_GNUTLS
qtest_add_func("/migration/precopy/unix/tls/psk",
test_precopy_unix_tls_psk);