Compression: Improve async compression performance

This commit is contained in:
Eladash 2023-12-29 18:17:13 +02:00 committed by Elad Ashkenazi
parent 6214d0c9a5
commit b00fe1d12f
2 changed files with 38 additions and 9 deletions

View File

@ -171,12 +171,17 @@ void uncompressed_serialization_file_handler::finalize(utils::serial& ar)
ar.data = {}; // Deallocate and clear
}
enum : u64
{
pending_compress_bytes_bound = 0x400'0000,
pending_data_wait_bit = 1ull << 63,
};
struct compressed_stream_data
{
z_stream m_zs{};
lf_queue<std::vector<u8>> m_queued_data_to_process;
lf_queue<std::vector<u8>> m_queued_data_to_write;
atomic_t<usz> m_pending_bytes = 0;
};
void compressed_serialization_file_handler::initialize(utils::serial& ar)
@ -286,21 +291,24 @@ bool compressed_serialization_file_handler::handle_file_op(utils::serial& ar, us
// Avoid flooding RAM, wait if there is too much pending memory
const usz new_value = m_pending_bytes.atomic_op([&](usz v)
{
v &= ~(1ull << 63);
v &= ~pending_data_wait_bit;
if (v > 0x400'0000)
if (v >= pending_compress_bytes_bound)
{
v |= 1ull << 63;
v |= pending_data_wait_bit;
}
else
{
// Overflow detector
ensure(~v - pending_data_wait_bit > ar.data.size());
v += ar.data.size();
}
return v;
});
if (new_value & (1ull << 63))
if (new_value & pending_data_wait_bit)
{
m_pending_bytes.wait(new_value);
}
@ -635,19 +643,32 @@ void compressed_serialization_file_handler::stream_data_prepare_thread_op()
buffer_offset = m_zs.next_out - m_stream_data.data();
m_zs.avail_in = adjust_for_uint(data.size() - (m_zs.next_in - data.data()));
if (m_zs.avail_out == 0)
{
m_stream_data.resize(m_stream_data.size() + (m_zs.avail_in + 3ull) / 4);
}
m_zs.avail_in = adjust_for_uint(data.size() - (m_zs.next_in - data.data()));
}
while (m_zs.avail_out == 0 || m_zs.avail_in != 0);
// Forward for file write
const usz queued_size = data.size();
ensure(buffer_offset);
m_pending_bytes += buffer_offset - queued_size;
const usz size_diff = buffer_offset - queued_size;
const usz new_val = m_pending_bytes.add_fetch(size_diff);
const usz left = new_val & ~pending_data_wait_bit;
if (new_val & pending_data_wait_bit && left < pending_compress_bytes_bound && left - size_diff >= pending_compress_bytes_bound && !m_pending_signal)
{
// Notification is postponed until data write and memory release
m_pending_signal = true;
}
// Ensure wait bit state has not changed by the update
ensure(~((new_val - size_diff) ^ new_val) & pending_data_wait_bit);
m_stream_data.resize(buffer_offset);
stream.m_queued_data_to_write.push(std::move(m_stream_data));
}
@ -673,10 +694,17 @@ void compressed_serialization_file_handler::file_writer_thread_op()
m_file->write(data);
data = {}; // Deallocate before notification
if (m_pending_bytes.sub_fetch(last_size) == 1ull << 63)
const usz new_val = m_pending_bytes.sub_fetch(last_size);
const usz left = new_val & ~pending_data_wait_bit;
const bool pending_sig = m_pending_signal && m_pending_signal.exchange(false);
if (pending_sig || (new_val & pending_data_wait_bit && left < pending_compress_bytes_bound && left + last_size >= pending_compress_bytes_bound))
{
m_pending_bytes.notify_all();
}
// Ensure wait bit state has not changed by the update
ensure(~((new_val + last_size) ^ new_val) & pending_data_wait_bit);
}
}
}

View File

@ -91,6 +91,7 @@ private:
usz m_stream_data_index = 0;
usz m_file_read_index = 0;
atomic_t<usz> m_pending_bytes = 0;
atomic_t<bool> m_pending_signal = false;
bool m_write_inited = false;
bool m_read_inited = false;
bool m_errored = false;