ram.c: Reset result after sending queued data

And take the param->mutex lock for the whole section to ensure
thread-safety.
Now, it is explicitly clear if there is no queued data to send.
Before, this was handled by param->file stream being empty and thus
qemu_put_qemu_file() not sending anything.

This will be used in the next commits to move save_page_header()
out of compress code.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
This commit is contained in:
Lukas Straub 2023-04-20 11:48:03 +02:00 committed by Juan Quintela
parent 10c2f7b747
commit b5cf1cd3e8
1 changed files with 22 additions and 10 deletions

View File

@ -1508,6 +1508,13 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
static bool save_page_use_compression(RAMState *rs); static bool save_page_use_compression(RAMState *rs);
static inline void compress_reset_result(CompressParam *param)
{
param->result = RES_NONE;
param->block = NULL;
param->offset = 0;
}
static void flush_compressed_data(RAMState *rs) static void flush_compressed_data(RAMState *rs)
{ {
MigrationState *ms = migrate_get_current(); MigrationState *ms = migrate_get_current();
@ -1529,13 +1536,16 @@ static void flush_compressed_data(RAMState *rs)
for (idx = 0; idx < thread_count; idx++) { for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex); qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) { if (!comp_param[idx].quit) {
len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file); CompressParam *param = &comp_param[idx];
len = qemu_put_qemu_file(ms->to_dst_file, param->file);
compress_reset_result(param);
/* /*
* it's safe to fetch zero_page without holding comp_done_lock * it's safe to fetch zero_page without holding comp_done_lock
* as there is no further request submitted to the thread, * as there is no further request submitted to the thread,
* i.e, the thread should be waiting for a request at this point. * i.e, the thread should be waiting for a request at this point.
*/ */
update_compress_thread_counts(&comp_param[idx], len); update_compress_thread_counts(param, len);
} }
qemu_mutex_unlock(&comp_param[idx].mutex); qemu_mutex_unlock(&comp_param[idx].mutex);
} }
@ -1560,15 +1570,17 @@ static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
retry: retry:
for (idx = 0; idx < thread_count; idx++) { for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) { if (comp_param[idx].done) {
comp_param[idx].done = false; CompressParam *param = &comp_param[idx];
bytes_xmit = qemu_put_qemu_file(ms->to_dst_file, qemu_mutex_lock(&param->mutex);
comp_param[idx].file); param->done = false;
qemu_mutex_lock(&comp_param[idx].mutex); bytes_xmit = qemu_put_qemu_file(ms->to_dst_file, param->file);
set_compress_params(&comp_param[idx], block, offset); compress_reset_result(param);
qemu_cond_signal(&comp_param[idx].cond); set_compress_params(param, block, offset);
qemu_mutex_unlock(&comp_param[idx].mutex);
update_compress_thread_counts(param, bytes_xmit);
qemu_cond_signal(&param->cond);
qemu_mutex_unlock(&param->mutex);
pages = 1; pages = 1;
update_compress_thread_counts(&comp_param[idx], bytes_xmit);
break; break;
} }
} }