From b93bbbf2e534ecaed848e2b0dd53288ef6da7ab2 Mon Sep 17 00:00:00 2001 From: JosJuice <josjuice@gmail.com> Date: Wed, 22 Apr 2020 09:50:27 +0200 Subject: [PATCH] DiscIO: Implement multithreaded compression --- Source/Core/DiscIO/CMakeLists.txt | 1 + Source/Core/DiscIO/CompressedBlob.cpp | 233 ++++++++++++------- Source/Core/DiscIO/DiscIO.vcxproj | 1 + Source/Core/DiscIO/DiscIO.vcxproj.filters | 3 + Source/Core/DiscIO/MultithreadedCompressor.h | 225 ++++++++++++++++++ 5 files changed, 380 insertions(+), 83 deletions(-) create mode 100644 Source/Core/DiscIO/MultithreadedCompressor.h diff --git a/Source/Core/DiscIO/CMakeLists.txt b/Source/Core/DiscIO/CMakeLists.txt index 4970f135c2..6437bf35d1 100644 --- a/Source/Core/DiscIO/CMakeLists.txt +++ b/Source/Core/DiscIO/CMakeLists.txt @@ -21,6 +21,7 @@ add_library(discio FileSystemGCWii.h Filesystem.cpp Filesystem.h + MultithreadedCompressor.h NANDImporter.cpp NANDImporter.h ScrubbedBlob.cpp diff --git a/Source/Core/DiscIO/CompressedBlob.cpp b/Source/Core/DiscIO/CompressedBlob.cpp index cda3cad6d3..fdfd65baf8 100644 --- a/Source/Core/DiscIO/CompressedBlob.cpp +++ b/Source/Core/DiscIO/CompressedBlob.cpp @@ -28,6 +28,7 @@ #include "DiscIO/Blob.h" #include "DiscIO/CompressedBlob.h" #include "DiscIO/DiscScrubber.h" +#include "DiscIO/MultithreadedCompressor.h" #include "DiscIO/Volume.h" namespace DiscIO @@ -154,6 +155,119 @@ bool CompressedBlobReader::GetBlock(u64 block_num, u8* out_ptr) return true; } +struct CompressThreadState +{ + CompressThreadState() : z{} {} + ~CompressThreadState() { deflateEnd(&z); } + + // z_stream will stop working if it changes address, so this object must not be moved + CompressThreadState(const CompressThreadState&) = delete; + CompressThreadState(CompressThreadState&&) = delete; + CompressThreadState& operator=(const CompressThreadState&) = delete; + CompressThreadState& operator=(CompressThreadState&&) = delete; + + std::vector<u8> compressed_buffer; + z_stream z; +}; + +struct CompressParameters +{ + std::vector<u8> data; + u32 block_number; + u64 inpos; +}; + +struct OutputParameters +{ + std::vector<u8> data; + u32 block_number; + bool compressed; + u64 inpos; +}; + +static ConversionResultCode SetUpCompressThreadState(CompressThreadState* state) +{ + return deflateInit(&state->z, 9) == Z_OK ? ConversionResultCode::Success : + ConversionResultCode::InternalError; +} + +static ConversionResult<OutputParameters> Compress(CompressThreadState* state, + CompressParameters parameters, int block_size, + std::vector<u32>* hashes, int* num_stored, + int* num_compressed) +{ + state->compressed_buffer.resize(block_size); + + int retval = deflateReset(&state->z); + state->z.next_in = parameters.data.data(); + state->z.avail_in = block_size; + state->z.next_out = state->compressed_buffer.data(); + state->z.avail_out = block_size; + + if (retval != Z_OK) + { + ERROR_LOG(DISCIO, "Deflate failed"); + return ConversionResultCode::InternalError; + } + + const int status = deflate(&state->z, Z_FINISH); + + state->compressed_buffer.resize(block_size - state->z.avail_out); + + OutputParameters output_parameters; + if ((status != Z_STREAM_END) || (state->z.avail_out < 10)) + { + // let's store uncompressed + ++*num_stored; + output_parameters = OutputParameters{std::move(parameters.data), parameters.block_number, false, + parameters.inpos}; + } + else + { + // let's store compressed + ++*num_compressed; + output_parameters = OutputParameters{std::move(state->compressed_buffer), + parameters.block_number, true, parameters.inpos}; + } + + (*hashes)[parameters.block_number] = + Common::HashAdler32(output_parameters.data.data(), output_parameters.data.size()); + + return std::move(output_parameters); +} + +static ConversionResultCode Output(OutputParameters parameters, File::IOFile* outfile, + u64* position, std::vector<u64>* offsets, int progress_monitor, + u32 num_blocks, CompressCB callback, void* arg) +{ + u64 offset = *position; + if (!parameters.compressed) + offset |= 0x8000000000000000ULL; + (*offsets)[parameters.block_number] = offset; + + *position += parameters.data.size(); + + if (!outfile->WriteBytes(parameters.data.data(), parameters.data.size())) + return ConversionResultCode::WriteFailed; + + if (parameters.block_number % progress_monitor == 0) + { + const int ratio = + parameters.inpos == 0 ? 0 : static_cast<int>(100 * *position / parameters.inpos); + + const std::string text = + StringFromFormat(Common::GetStringT("%i of %i blocks. Compression ratio %i%%").c_str(), + parameters.block_number, num_blocks, ratio); + + const float completion = static_cast<float>(parameters.block_number) / num_blocks; + + if (!callback(text, completion, arg)) + return ConversionResultCode::Canceled; + } + + return ConversionResultCode::Success; +}; + bool ConvertToGCZ(BlobReader* infile, const std::string& infile_path, const std::string& outfile_path, u32 sub_type, int block_size, CompressCB callback, void* arg) @@ -170,10 +284,6 @@ bool ConvertToGCZ(BlobReader* infile, const std::string& infile_path, return false; } - z_stream z = {}; - if (deflateInit(&z, 9) != Z_OK) - return false; - callback(Common::GetStringT("Files opened, ready to compress."), 0, arg); CompressedBlobHeader header; @@ -187,8 +297,6 @@ bool ConvertToGCZ(BlobReader* infile, const std::string& infile_path, std::vector<u64> offsets(header.num_blocks); std::vector<u32> hashes(header.num_blocks); - std::vector<u8> out_buf(block_size); - std::vector<u8> in_buf(block_size); // seek past the header (we will write it at the end) outfile.Seek(sizeof(CompressedBlobHeader), SEEK_CUR); @@ -201,94 +309,48 @@ bool ConvertToGCZ(BlobReader* infile, const std::string& infile_path, int num_compressed = 0; int num_stored = 0; int progress_monitor = std::max<int>(1, header.num_blocks / 1000); - bool success = true; + const auto compress = [&](CompressThreadState* state, CompressParameters parameters) { + return Compress(state, std::move(parameters), block_size, &hashes, &num_stored, + &num_compressed); + }; + + const auto output = [&](OutputParameters parameters) { + return Output(std::move(parameters), &outfile, &position, &offsets, progress_monitor, + header.num_blocks, callback, arg); + }; + + MultithreadedCompressor<CompressThreadState, CompressParameters, OutputParameters> compressor( + SetUpCompressThreadState, compress, output); + + std::vector<u8> in_buf(block_size); for (u32 i = 0; i < header.num_blocks; i++) { - if (i % progress_monitor == 0) - { - int ratio = 0; - if (inpos != 0) - ratio = (int)(100 * position / inpos); - - const std::string temp = - StringFromFormat(Common::GetStringT("%i of %i blocks. Compression ratio %i%%").c_str(), i, - header.num_blocks, ratio); - bool was_cancelled = !callback(temp, (float)i / (float)header.num_blocks, arg); - if (was_cancelled) - { - success = false; - break; - } - } - - offsets[i] = position; + if (compressor.GetStatus() != ConversionResultCode::Success) + break; const u64 bytes_to_read = std::min<u64>(block_size, header.data_size - inpos); - success = infile->Read(inpos, bytes_to_read, in_buf.data()); - if (!success) + if (!infile->Read(inpos, bytes_to_read, in_buf.data())) { - PanicAlertT("Failed to read from the input file \"%s\".", infile_path.c_str()); + compressor.SetError(ConversionResultCode::ReadFailed); break; } std::fill(in_buf.begin() + bytes_to_read, in_buf.begin() + header.block_size, 0); - int retval = deflateReset(&z); - z.next_in = in_buf.data(); - z.avail_in = header.block_size; - z.next_out = out_buf.data(); - z.avail_out = block_size; - - if (retval != Z_OK) - { - ERROR_LOG(DISCIO, "Deflate failed"); - success = false; - break; - } - - int status = deflate(&z, Z_FINISH); - int comp_size = block_size - z.avail_out; - - u8* write_buf; - int write_size; - if ((status != Z_STREAM_END) || (z.avail_out < 10)) - { - // PanicAlert("%i %i Store %i", i*block_size, position, comp_size); - // let's store uncompressed - write_buf = in_buf.data(); - offsets[i] |= 0x8000000000000000ULL; - write_size = block_size; - num_stored++; - } - else - { - // let's store compressed - // PanicAlert("Comp %i to %i", block_size, comp_size); - write_buf = out_buf.data(); - write_size = comp_size; - num_compressed++; - } - - if (!outfile.WriteBytes(write_buf, write_size)) - { - PanicAlertT("Failed to write the output file \"%s\".\n" - "Check that you have enough space available on the target drive.", - outfile_path.c_str()); - success = false; - break; - } - inpos += block_size; - position += write_size; - hashes[i] = Common::HashAdler32(write_buf, write_size); + compressor.CompressAndWrite(CompressParameters{in_buf, i, inpos}); } + compressor.Shutdown(); + header.compressed_data_size = position; - if (!success) + const ConversionResultCode result = compressor.GetStatus(); + + if (result != ConversionResultCode::Success) { // Remove the incomplete output file. outfile.Close(); @@ -301,16 +363,21 @@ bool ConvertToGCZ(BlobReader* infile, const std::string& infile_path, outfile.WriteArray(&header, 1); outfile.WriteArray(offsets.data(), header.num_blocks); outfile.WriteArray(hashes.data(), header.num_blocks); - } - // Cleanup - deflateEnd(&z); - - if (success) - { callback(Common::GetStringT("Done compressing disc image."), 1.0f, arg); } - return success; + + if (result == ConversionResultCode::ReadFailed) + PanicAlertT("Failed to read from the input file \"%s\".", infile_path.c_str()); + + if (result == ConversionResultCode::WriteFailed) + { + PanicAlertT("Failed to write the output file \"%s\".\n" + "Check that you have enough space available on the target drive.", + outfile_path.c_str()); + } + + return result == ConversionResultCode::Success; } bool IsGCZBlob(File::IOFile& file) diff --git a/Source/Core/DiscIO/DiscIO.vcxproj b/Source/Core/DiscIO/DiscIO.vcxproj index 98bb500452..ee1d2ecd36 100644 --- a/Source/Core/DiscIO/DiscIO.vcxproj +++ b/Source/Core/DiscIO/DiscIO.vcxproj @@ -80,6 +80,7 @@ <ClInclude Include="FileBlob.h" /> <ClInclude Include="Filesystem.h" /> <ClInclude Include="FileSystemGCWii.h" /> + <ClInclude Include="MultithreadedCompressor.h" /> <ClInclude Include="NANDImporter.h" /> <ClInclude Include="ScrubbedBlob.h" /> <ClInclude Include="TGCBlob.h" /> diff --git a/Source/Core/DiscIO/DiscIO.vcxproj.filters b/Source/Core/DiscIO/DiscIO.vcxproj.filters index 9b19fa8471..43cf82fa03 100644 --- a/Source/Core/DiscIO/DiscIO.vcxproj.filters +++ b/Source/Core/DiscIO/DiscIO.vcxproj.filters @@ -161,6 +161,9 @@ <ClInclude Include="ScrubbedBlob.h"> <Filter>Volume\Blob</Filter> </ClInclude> + <ClInclude Include="MultithreadedCompressor.h"> + <Filter>Volume\Blob</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <Text Include="CMakeLists.txt" /> diff --git a/Source/Core/DiscIO/MultithreadedCompressor.h b/Source/Core/DiscIO/MultithreadedCompressor.h new file mode 100644 index 0000000000..36b0163f66 --- /dev/null +++ b/Source/Core/DiscIO/MultithreadedCompressor.h @@ -0,0 +1,225 @@ +// Copyright 2020 Dolphin Emulator Project +// Licensed under GPLv2+ +// Refer to the license.txt file included. + +#pragma once + +#include <atomic> +#include <functional> +#include <memory> +#include <thread> +#include <utility> +#include <variant> +#include <vector> + +#include "Common/Assert.h" +#include "Common/Event.h" +#include "Common/Result.h" + +namespace DiscIO +{ +enum class ConversionResultCode +{ + Success, + Canceled, + ReadFailed, + WriteFailed, + InternalError, +}; + +template <typename T> +using ConversionResult = Common::Result<ConversionResultCode, T>; + +// This class starts a number of compression threads and one output thread. +// The set_up_compress_thread_state function is called at the start of each compression thread. +// When CompressAndWrite is called, the compress function will be called on one of the +// compression threads, and then the output function will be called on the output thread. +// The output thread handles data in the order that data was submitted using CompressAndWrite, +// but the compression threads are not guaranteed to handle data in a predictable order. +// Remember to check GetStatus regularly and cancel if it doesn't return Success, +// and call Shutdown when you want to ensure that everything finishes. +template <typename CompressThreadState, typename CompressParameters, typename OutputParameters> +class MultithreadedCompressor +{ +public: + MultithreadedCompressor( + std::function<ConversionResultCode(CompressThreadState*)> set_up_compress_thread_state, + std::function<ConversionResult<OutputParameters>(CompressThreadState*, CompressParameters)> + compress, + std::function<ConversionResultCode(OutputParameters)> output) + : m_set_up_compress_thread_state(std::move(set_up_compress_thread_state)), + m_compress(std::move(compress)), m_output(std::move(output)), + m_threads(std::max<unsigned int>(1, std::thread::hardware_concurrency())) + { + m_compress_threads = std::make_unique<CompressThread[]>(m_threads); + + for (size_t i = 0; i < m_threads; ++i) + { + m_compress_threads[i].thread = + std::thread(std::mem_fn(&MultithreadedCompressor::CompressThreadFunction), this, + &m_compress_threads[i]); + } + + m_output_thread = + std::thread(std::mem_fn(&MultithreadedCompressor::OutputThreadFunction), this); + } + + ~MultithreadedCompressor() + { + if (!m_shutting_down.load()) + Shutdown(); + } + + void CompressAndWrite(CompressParameters parameters) + { + if (GetStatus() != ConversionResultCode::Success) + return; + + CompressThread& compress_thread = m_compress_threads[m_current_index]; + + compress_thread.compress_ready_event.Wait(); + compress_thread.compress_parameters = std::move(parameters); + compress_thread.compress_event.Set(); + + ++m_current_index; + if (m_current_index >= m_threads) + m_current_index -= m_threads; + } + + void SetError(ConversionResultCode result) + { + ASSERT(result != ConversionResultCode::Success); + + // If we already have an error, don't overwrite it + ConversionResultCode expected = ConversionResultCode::Success; + m_result.compare_exchange_strong(expected, result); + } + + ConversionResultCode GetStatus() const { return m_result.load(); } + + void Shutdown() + { + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].compress_ready_event.Wait(); + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].compress_done_event.Wait(); + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].output_ready_event.Wait(); + + m_shutting_down.store(true); + + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].compress_event.Set(); + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].output_event.Set(); + + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].thread.join(); + + m_output_thread.join(); + } + +private: + struct CompressThread + { + std::thread thread; + + Common::Event compress_ready_event; + Common::Event compress_event; + Common::Event compress_done_event; + Common::Event output_ready_event; + Common::Event output_event; + + CompressParameters compress_parameters; + OutputParameters output_parameters; + }; + + void CompressThreadFunction(CompressThread* state) + { + CompressThreadState compress_thread_state; + + ConversionResultCode setup_result = m_set_up_compress_thread_state(&compress_thread_state); + if (setup_result != ConversionResultCode::Success) + SetError(setup_result); + + state->compress_ready_event.Set(); + state->compress_done_event.Set(); + + while (true) + { + state->compress_event.Wait(); + + if (m_shutting_down.load()) + return; + + CompressParameters parameters = std::move(state->compress_parameters); + + state->compress_done_event.Reset(); + state->compress_ready_event.Set(); + + ConversionResult<OutputParameters> result = + m_compress(&compress_thread_state, std::move(parameters)); + + if (result) + { + state->output_ready_event.Wait(); + state->output_parameters = std::move(*result); + state->output_event.Set(); + } + else + { + SetError(result.Error()); + } + + state->compress_done_event.Set(); + } + } + + void OutputThreadFunction() + { + for (size_t i = 0; i < m_threads; ++i) + m_compress_threads[i].output_ready_event.Set(); + + size_t index = 0; + + while (true) + { + CompressThread& compress_thread = m_compress_threads[index]; + + compress_thread.output_event.Wait(); + + if (m_shutting_down.load()) + return; + + OutputParameters parameters = std::move(compress_thread.output_parameters); + + compress_thread.output_ready_event.Set(); + + const ConversionResultCode result = m_output(std::move(parameters)); + + if (result != ConversionResultCode::Success) + SetError(result); + + ++index; + if (index >= m_threads) + index -= m_threads; + } + } + + std::function<ConversionResultCode(CompressThreadState*)> m_set_up_compress_thread_state; + std::function<ConversionResult<OutputParameters>(CompressThreadState*, CompressParameters)> + m_compress; + std::function<ConversionResultCode(OutputParameters)> m_output; + + // We can't use std::vector for this, because Common::Event is not movable + std::unique_ptr<CompressThread[]> m_compress_threads; + std::thread m_output_thread; + + const size_t m_threads; + size_t m_current_index = 0; + + std::atomic<ConversionResultCode> m_result = ConversionResultCode::Success; + std::atomic<bool> m_shutting_down = false; +}; + +} // namespace DiscIO