From bdcfcc65eafd6e262f8b5fd0292d79abc1165fbd Mon Sep 17 00:00:00 2001 From: TellowKrinkle Date: Wed, 24 Mar 2021 03:11:14 -0500 Subject: [PATCH] CDVD: Add ThreadedFileReader --- pcsx2/CDVD/ThreadedFileReader.cpp | 360 ++++++++++++++++++++++++++++++ pcsx2/CDVD/ThreadedFileReader.h | 121 ++++++++++ pcsx2/CMakeLists.txt | 2 + pcsx2/pcsx2.vcxproj | 2 + pcsx2/pcsx2.vcxproj.filters | 6 + 5 files changed, 491 insertions(+) create mode 100644 pcsx2/CDVD/ThreadedFileReader.cpp create mode 100644 pcsx2/CDVD/ThreadedFileReader.h diff --git a/pcsx2/CDVD/ThreadedFileReader.cpp b/pcsx2/CDVD/ThreadedFileReader.cpp new file mode 100644 index 0000000000..e7c12f2449 --- /dev/null +++ b/pcsx2/CDVD/ThreadedFileReader.cpp @@ -0,0 +1,360 @@ +/* PCSX2 - PS2 Emulator for PCs + * Copyright (C) 2002-2021 PCSX2 Dev Team + * + * PCSX2 is free software: you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Found- + * ation, either version 3 of the License, or (at your option) any later version. + * + * PCSX2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with PCSX2. + * If not, see . + */ + +#include "PrecompiledHeader.h" +#include "ThreadedFileReader.h" + +ThreadedFileReader::ThreadedFileReader() +{ + m_readThread = std::thread([](ThreadedFileReader* r){ r->Loop(); }, this); +} + +ThreadedFileReader::~ThreadedFileReader() +{ + m_quit = true; + (void)std::lock_guard{m_mtx}; + m_condition.notify_one(); + m_readThread.join(); + for (auto& buffer : m_buffer) + if (buffer.ptr) + free(buffer.ptr); +} + +size_t ThreadedFileReader::CopyBlocks(void* dst, const void* src, size_t size) const +{ + char* cdst = static_cast(dst); + const char* csrc = static_cast(src); + const char* cend = csrc + size; + if (m_internalBlockSize) + { + for (; csrc < cend; csrc += m_internalBlockSize, cdst += m_blocksize) + { + memcpy(cdst, csrc, m_blocksize); + } + return cdst - static_cast(dst); + } + else + { + memcpy(dst, src, size); + return size; + } +} + +void ThreadedFileReader::Loop() +{ + Threading::SetNameOfCurrentThread("ISO Decompress"); + + std::unique_lock lock(m_mtx); + + while (true) + { + while (!m_requestSize && !m_quit) + m_condition.wait(lock); + + if (m_quit) + return; + + u64 requestOffset = m_requestOffset; + u32 requestSize = m_requestSize; + void* ptr = m_requestPtr.load(std::memory_order_relaxed); + + m_running = true; + lock.unlock(); + + bool ok = true; + + if (ptr) + { + ok = Decompress(ptr, requestOffset, requestSize); + } + + m_requestPtr.store(nullptr, std::memory_order_release); + m_condition.notify_one(); + + if (ok) + { + // Readahead + Chunk blk = ChunkForOffset(requestOffset + requestSize); + if (blk.chunkID >= 0) + { + (void)GetBlockPtr(blk, true); + blk = ChunkForOffset(blk.offset + blk.length); + if (blk.chunkID >= 0) + (void)GetBlockPtr(blk, true); + } + } + + lock.lock(); + if (requestSize == m_requestSize && requestOffset == m_requestOffset && !m_requestPtr) + { + // If no one's added more work, mark this one as done + m_requestSize = 0; + } + + m_running = false; + m_condition.notify_one(); // For things waiting on m_running == false + } +} + +ThreadedFileReader::Buffer* ThreadedFileReader::GetBlockPtr(const Chunk& block, bool isReadahead) +{ + for (int i = 0; i < static_cast(ArraySize(m_buffer)); i++) + { + if (m_buffer[i].valid.load(std::memory_order_acquire) && m_buffer[i].offset == block.offset) + { + m_nextBuffer = (i + 1) % ArraySize(m_buffer); + return m_buffer + i; + } + } + + Buffer& buf = m_buffer[m_nextBuffer]; + { + // This can be called from both the read thread threads in ReadSync + // Calls from ReadSync are done with the lock already held to keep the read thread out + // Therefore we should only lock on the read thread + std::unique_lock lock(m_mtx, std::defer_lock); + if (std::this_thread::get_id() == m_readThread.get_id()) + lock.lock(); + if (buf.size < block.length) + buf.ptr = realloc(buf.ptr, block.length); + buf.valid.store(false, std::memory_order_relaxed); + + } + int size = ReadChunk(buf.ptr, block.chunkID); + if (size > 0) + { + buf.offset = block.offset; + buf.size = size; + buf.valid.store(true, std::memory_order_release); + m_nextBuffer = (m_nextBuffer + 1) % ArraySize(m_buffer); + return &buf; + } + return nullptr; +} + +bool ThreadedFileReader::Decompress(void* target, u64 begin, u32 size) +{ + Chunk blk = ChunkForOffset(begin); + char* write = static_cast(target); + u32 remaining = size; + if (blk.offset != begin) + { + u32 off = begin - blk.offset; + u32 len = std::min(blk.length - off, size); + // Partial block + if (Buffer* buf = GetBlockPtr(blk)) + { + if (buf->size < blk.length) + return false; + write += CopyBlocks(write, static_cast(buf->ptr) + off, len); + remaining -= len; + blk = ChunkForOffset(blk.offset + blk.length); + } + else + { + return false; + } + } + while (blk.length <= remaining) + { + if (m_requestCancelled.load(std::memory_order_relaxed)) + { + return false; + } + if (m_internalBlockSize) + { + if (Buffer* buf = GetBlockPtr(blk)) + { + if (buf->size < blk.length) + return false; + write += CopyBlocks(write, buf->ptr, blk.length); + } + } + else + { + int amt = ReadChunk(write, blk.chunkID); + if (amt < static_cast(blk.length)) + return false; + write += blk.length; + } + remaining -= blk.length; + blk = ChunkForOffset(blk.offset + blk.length); + } + if (remaining) + { + if (Buffer* buf = GetBlockPtr(blk)) + { + if (buf->size < remaining) + return false; + write += CopyBlocks(write, buf->ptr, remaining); + } + else + { + return false; + } + } + m_amtRead += write - static_cast(target); + return true; +} + +bool ThreadedFileReader::TryCachedRead(void*& buffer, u64& offset, u32& size, const std::lock_guard&) +{ + // Run through twice so that if m_buffer[1] contains the first half and m_buffer[0] contains the second half it still works + m_amtRead = 0; + u64 end = 0; + bool allDone = false; + for (int i = 0; i < static_cast(ArraySize(m_buffer) * 2); i++) + { + Buffer& buf = m_buffer[i % ArraySize(m_buffer)]; + if (!buf.valid.load(std::memory_order_acquire)) + continue; + if (buf.offset <= offset && buf.offset + buf.size > offset) + { + u32 off = offset - buf.offset; + u32 cpysize = std::min(size, buf.size - off); + size_t read = CopyBlocks(buffer, static_cast(buf.ptr) + off, cpysize); + m_amtRead += read; + size -= cpysize; + offset += cpysize; + buffer = static_cast(buffer) + read; + if (size == 0) + end = buf.offset + buf.size; + } + // Do buffers contain the current and next block? + if (end > 0 && buf.offset == end) + allDone = true; + } + return allDone; +} + +bool ThreadedFileReader::Open(const wxString& fileName) +{ + CancelAndWaitUntilStopped(); + return Open2(fileName); +} + +int ThreadedFileReader::ReadSync(void* pBuffer, uint sector, uint count) +{ + u32 blocksize = InternalBlockSize(); + u64 offset = (u64)sector * (u64)blocksize + m_dataoffset; + u32 size = count * blocksize; + { + std::lock_guard l(m_mtx); + if (TryCachedRead(pBuffer, offset, size, l)) + return m_amtRead; + + if (size > 0 && !m_running) + { + // Don't wait for read thread to start back up + if (Decompress(pBuffer, offset, size)) + { + offset += size; + size = 0; + } + } + + if (size == 0) + { + // For readahead + m_requestOffset = offset - 1; + m_requestSize = 1; + m_requestPtr.store(nullptr, std::memory_order_relaxed); + } + else + { + m_requestOffset = offset; + m_requestSize = size; + m_requestPtr.store(pBuffer, std::memory_order_relaxed); + } + m_requestCancelled.store(false, std::memory_order_relaxed); + } + m_condition.notify_one(); + if (size == 0) + return m_amtRead; + return FinishRead(); +} + +void ThreadedFileReader::CancelAndWaitUntilStopped(void) +{ + m_requestCancelled.store(true, std::memory_order_relaxed); + std::unique_lock lock(m_mtx); + while (m_running) + m_condition.wait(lock); +} + +void ThreadedFileReader::BeginRead(void* pBuffer, uint sector, uint count) +{ + s32 blocksize = InternalBlockSize(); + u64 offset = (u64)sector * (u64)blocksize + m_dataoffset; + u32 size = count * blocksize; + { + std::lock_guard l(m_mtx); + if (TryCachedRead(pBuffer, offset, size, l)) + return; + if (size == 0) + { + // For readahead + m_requestOffset = offset - 1; + m_requestSize = 1; + m_requestPtr.store(nullptr, std::memory_order_relaxed); + } + else + { + m_requestOffset = offset; + m_requestSize = size; + m_requestPtr.store(pBuffer, std::memory_order_relaxed); + } + m_requestCancelled.store(false, std::memory_order_relaxed); + } + m_condition.notify_one(); +} + +int ThreadedFileReader::FinishRead(void) +{ + if (m_requestPtr.load(std::memory_order_acquire) == nullptr) + return m_amtRead; + std::unique_lock lock(m_mtx); + while (m_requestPtr) + m_condition.wait(lock); + return m_amtRead; +} + +void ThreadedFileReader::CancelRead(void) +{ + if (m_requestPtr.load(std::memory_order_acquire) == nullptr) + return; + m_requestCancelled.store(true, std::memory_order_release); + std::unique_lock lock(m_mtx); + while (m_requestPtr.load(std::memory_order_relaxed)) + m_condition.wait(lock); +} + +void ThreadedFileReader::Close(void) +{ + CancelAndWaitUntilStopped(); + for (auto& buf : m_buffer) + buf.valid.store(false, std::memory_order_relaxed); + Close2(); +} + +void ThreadedFileReader::SetBlockSize(uint bytes) +{ + m_blocksize = bytes; +} + +void ThreadedFileReader::SetDataOffset(int bytes) +{ + m_dataoffset = bytes; +} diff --git a/pcsx2/CDVD/ThreadedFileReader.h b/pcsx2/CDVD/ThreadedFileReader.h new file mode 100644 index 0000000000..eb520a834a --- /dev/null +++ b/pcsx2/CDVD/ThreadedFileReader.h @@ -0,0 +1,121 @@ +/* PCSX2 - PS2 Emulator for PCs + * Copyright (C) 2002-2021 PCSX2 Dev Team + * + * PCSX2 is free software: you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Found- + * ation, either version 3 of the License, or (at your option) any later version. + * + * PCSX2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with PCSX2. + * If not, see . + */ + +#pragma once + +#include "AsyncFileReader.h" +#include "Utilities/PersistentThread.h" + +#include +#include +#include +#include + +/// A file reader for use with compressed formats +/// Calls decompression code on a separate thread to make a synchronous decompression API async +class ThreadedFileReader : public AsyncFileReader +{ + ThreadedFileReader(ThreadedFileReader&&) = delete; +protected: + struct Chunk + { + /// Negative block IDs indicate invalid blocks + s64 chunkID; + u64 offset; + u32 length; + }; + + /// Set nonzero to separate block size of read blocks from m_blocksize + /// Requires that chunk size is a multiple of internal block size + /// Use to avoid overrunning stack because PCSX2 likes to allocate 2448-byte buffers + int m_internalBlockSize = 0; + + /// Get the block containing the given offset + virtual Chunk ChunkForOffset(u64 offset) = 0; + /// Synchronously read the given block into `dst` + virtual int ReadChunk(void* dst, s64 chunkID) = 0; + /// AsyncFileReader open but ThreadedFileReader needs prep work first + virtual bool Open2(const wxString& fileName) = 0; + /// AsyncFileReader close but ThreadedFileReader needs prep work first + virtual void Close2(void) = 0; + + ThreadedFileReader(); + ~ThreadedFileReader(); + +private: + int m_amtRead; + /// Pointer to read into + /// If null when m_requestSize > 0, indicates a request for readahead only + std::atomic m_requestPtr{nullptr}; + /// Request offset in (internal block) bytes from the beginning of the file + u64 m_requestOffset = 0; + /// Request size in (internal block) bytes + /// In addition to marking the request size, the loop thread uses this variable to decide whether there's work to do (size of 0 means no work) + u32 m_requestSize = 0; + /// Used to cancel requests early + /// Note: It might take a while for the cancellation request to be noticed, wait until `m_requestPtr` is cleared to ensure it's not being written to + std::atomic m_requestCancelled{false}; + struct Buffer + { + void* ptr = nullptr; + u64 offset = 0; + u32 size = 0; + std::atomic valid{false}; + }; + /// 2 buffers for readahead (current block, next block) + Buffer m_buffer[2]; + u32 m_nextBuffer = 0; + + std::thread m_readThread; + std::mutex m_mtx; + std::condition_variable m_condition; + /// True to tell the thread to exit + bool m_quit = false; + /// True if the thread is currently doing something other than waiting + /// View while holding `m_mtx`. If false, you may touch decompression functions from other threads + bool m_running = false; + + /// Get the internal block size + u32 InternalBlockSize() const { return m_internalBlockSize ? m_internalBlockSize : m_blocksize; } + /// memcpy from internal to external blocks + /// `size` is in internal block bytes + /// Returns the number of external block bytes copied + size_t CopyBlocks(void* dst, const void* src, size_t size) const; + + /// Main loop of read thread + void Loop(); + + /// Load the given block into one of the `m_buffer` buffers if necessary and return a pointer to its contents if successful + /// Writes to `m_status` if `!isReadahead` + Buffer* GetBlockPtr(const Chunk& block, bool isReadahead = false); + /// Decompress from offset to size into + bool Decompress(void* ptr, u64 offset, u32 size); + /// Cancel any inflight read and wait until the thread is no longer doing anything + void CancelAndWaitUntilStopped(void); + /// Attempt to read from the cache + /// Adjusts pointer, offset, and size if successful + /// Returns true if no additional reads are necessary + bool TryCachedRead(void*& buffer, u64& offset, u32& size, const std::lock_guard&); + +public: + bool Open(const wxString& fileName) final override; + int ReadSync(void* pBuffer, uint sector, uint count) final override; + void BeginRead(void* pBuffer, uint sector, uint count) final override; + int FinishRead(void) final override; + void CancelRead(void) final override; + void Close(void) final override; + void SetBlockSize(uint bytes) final override; + void SetDataOffset(int bytes) final override; +}; diff --git a/pcsx2/CMakeLists.txt b/pcsx2/CMakeLists.txt index 17e6af6dcb..6654fbf422 100644 --- a/pcsx2/CMakeLists.txt +++ b/pcsx2/CMakeLists.txt @@ -205,6 +205,7 @@ set(pcsx2CDVDSources CDVD/ChdFileReader.cpp CDVD/CsoFileReader.cpp CDVD/GzippedFileReader.cpp + CDVD/ThreadedFileReader.cpp CDVD/IsoFS/IsoFile.cpp CDVD/IsoFS/IsoFSCDVD.cpp CDVD/IsoFS/IsoFS.cpp @@ -224,6 +225,7 @@ set(pcsx2CDVDHeaders CDVD/ChdFileReader.h CDVD/CsoFileReader.h CDVD/GzippedFileReader.h + CDVD/ThreadedFileReader.h CDVD/IsoFileFormats.h CDVD/IsoFS/IsoDirectory.h CDVD/IsoFS/IsoFileDescriptor.h diff --git a/pcsx2/pcsx2.vcxproj b/pcsx2/pcsx2.vcxproj index 7aee440d76..99bf0c410a 100644 --- a/pcsx2/pcsx2.vcxproj +++ b/pcsx2/pcsx2.vcxproj @@ -282,6 +282,7 @@ + true @@ -732,6 +733,7 @@ + diff --git a/pcsx2/pcsx2.vcxproj.filters b/pcsx2/pcsx2.vcxproj.filters index db2e6dd2d4..ee2cd2a1e2 100644 --- a/pcsx2/pcsx2.vcxproj.filters +++ b/pcsx2/pcsx2.vcxproj.filters @@ -888,6 +888,9 @@ System\Ps2\Debug + + System\ISO + System\ISO @@ -1948,6 +1951,9 @@ System\Ps2\Debug + + System\ISO + System\ISO