diff --git a/rpcs3/util/serialization_ext.cpp b/rpcs3/util/serialization_ext.cpp index 36adea1663..3b3870c65e 100644 --- a/rpcs3/util/serialization_ext.cpp +++ b/rpcs3/util/serialization_ext.cpp @@ -171,12 +171,17 @@ void uncompressed_serialization_file_handler::finalize(utils::serial& ar) ar.data = {}; // Deallocate and clear } +enum : u64 +{ + pending_compress_bytes_bound = 0x400'0000, + pending_data_wait_bit = 1ull << 63, +}; + struct compressed_stream_data { z_stream m_zs{}; lf_queue> m_queued_data_to_process; lf_queue> m_queued_data_to_write; - atomic_t m_pending_bytes = 0; }; void compressed_serialization_file_handler::initialize(utils::serial& ar) @@ -286,21 +291,24 @@ bool compressed_serialization_file_handler::handle_file_op(utils::serial& ar, us // Avoid flooding RAM, wait if there is too much pending memory const usz new_value = m_pending_bytes.atomic_op([&](usz v) { - v &= ~(1ull << 63); + v &= ~pending_data_wait_bit; - if (v > 0x400'0000) + if (v >= pending_compress_bytes_bound) { - v |= 1ull << 63; + v |= pending_data_wait_bit; } else { + // Overflow detector + ensure(~v - pending_data_wait_bit > ar.data.size()); + v += ar.data.size(); } return v; }); - if (new_value & (1ull << 63)) + if (new_value & pending_data_wait_bit) { m_pending_bytes.wait(new_value); } @@ -635,19 +643,32 @@ void compressed_serialization_file_handler::stream_data_prepare_thread_op() buffer_offset = m_zs.next_out - m_stream_data.data(); + m_zs.avail_in = adjust_for_uint(data.size() - (m_zs.next_in - data.data())); + if (m_zs.avail_out == 0) { m_stream_data.resize(m_stream_data.size() + (m_zs.avail_in + 3ull) / 4); } - - m_zs.avail_in = adjust_for_uint(data.size() - (m_zs.next_in - data.data())); } while (m_zs.avail_out == 0 || m_zs.avail_in != 0); // Forward for file write const usz queued_size = data.size(); ensure(buffer_offset); - m_pending_bytes += buffer_offset - queued_size; + + const usz size_diff = buffer_offset - queued_size; + const usz new_val = m_pending_bytes.add_fetch(size_diff); + const usz left = new_val & ~pending_data_wait_bit; + + if (new_val & pending_data_wait_bit && left < pending_compress_bytes_bound && left - size_diff >= pending_compress_bytes_bound && !m_pending_signal) + { + // Notification is postponed until data write and memory release + m_pending_signal = true; + } + + // Ensure wait bit state has not changed by the update + ensure(~((new_val - size_diff) ^ new_val) & pending_data_wait_bit); + m_stream_data.resize(buffer_offset); stream.m_queued_data_to_write.push(std::move(m_stream_data)); } @@ -673,10 +694,17 @@ void compressed_serialization_file_handler::file_writer_thread_op() m_file->write(data); data = {}; // Deallocate before notification - if (m_pending_bytes.sub_fetch(last_size) == 1ull << 63) + const usz new_val = m_pending_bytes.sub_fetch(last_size); + const usz left = new_val & ~pending_data_wait_bit; + const bool pending_sig = m_pending_signal && m_pending_signal.exchange(false); + + if (pending_sig || (new_val & pending_data_wait_bit && left < pending_compress_bytes_bound && left + last_size >= pending_compress_bytes_bound)) { m_pending_bytes.notify_all(); } + + // Ensure wait bit state has not changed by the update + ensure(~((new_val + last_size) ^ new_val) & pending_data_wait_bit); } } } diff --git a/rpcs3/util/serialization_ext.hpp b/rpcs3/util/serialization_ext.hpp index 798b44b21f..72a96b809f 100644 --- a/rpcs3/util/serialization_ext.hpp +++ b/rpcs3/util/serialization_ext.hpp @@ -91,6 +91,7 @@ private: usz m_stream_data_index = 0; usz m_file_read_index = 0; atomic_t m_pending_bytes = 0; + atomic_t m_pending_signal = false; bool m_write_inited = false; bool m_read_inited = false; bool m_errored = false;