mirror of https://github.com/PCSX2/pcsx2.git
CDVD: Add ThreadedFileReader
This commit is contained in:
parent
d8d69f2aa8
commit
bdcfcc65ea
|
@ -0,0 +1,360 @@
|
|||
/* PCSX2 - PS2 Emulator for PCs
|
||||
* Copyright (C) 2002-2021 PCSX2 Dev Team
|
||||
*
|
||||
* PCSX2 is free software: you can redistribute it and/or modify it under the terms
|
||||
* of the GNU Lesser General Public License as published by the Free Software Found-
|
||||
* ation, either version 3 of the License, or (at your option) any later version.
|
||||
*
|
||||
* PCSX2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
||||
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
* PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with PCSX2.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "PrecompiledHeader.h"
|
||||
#include "ThreadedFileReader.h"
|
||||
|
||||
ThreadedFileReader::ThreadedFileReader()
|
||||
{
|
||||
m_readThread = std::thread([](ThreadedFileReader* r){ r->Loop(); }, this);
|
||||
}
|
||||
|
||||
ThreadedFileReader::~ThreadedFileReader()
|
||||
{
|
||||
m_quit = true;
|
||||
(void)std::lock_guard<std::mutex>{m_mtx};
|
||||
m_condition.notify_one();
|
||||
m_readThread.join();
|
||||
for (auto& buffer : m_buffer)
|
||||
if (buffer.ptr)
|
||||
free(buffer.ptr);
|
||||
}
|
||||
|
||||
size_t ThreadedFileReader::CopyBlocks(void* dst, const void* src, size_t size) const
|
||||
{
|
||||
char* cdst = static_cast<char*>(dst);
|
||||
const char* csrc = static_cast<const char*>(src);
|
||||
const char* cend = csrc + size;
|
||||
if (m_internalBlockSize)
|
||||
{
|
||||
for (; csrc < cend; csrc += m_internalBlockSize, cdst += m_blocksize)
|
||||
{
|
||||
memcpy(cdst, csrc, m_blocksize);
|
||||
}
|
||||
return cdst - static_cast<char*>(dst);
|
||||
}
|
||||
else
|
||||
{
|
||||
memcpy(dst, src, size);
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadedFileReader::Loop()
|
||||
{
|
||||
Threading::SetNameOfCurrentThread("ISO Decompress");
|
||||
|
||||
std::unique_lock<std::mutex> lock(m_mtx);
|
||||
|
||||
while (true)
|
||||
{
|
||||
while (!m_requestSize && !m_quit)
|
||||
m_condition.wait(lock);
|
||||
|
||||
if (m_quit)
|
||||
return;
|
||||
|
||||
u64 requestOffset = m_requestOffset;
|
||||
u32 requestSize = m_requestSize;
|
||||
void* ptr = m_requestPtr.load(std::memory_order_relaxed);
|
||||
|
||||
m_running = true;
|
||||
lock.unlock();
|
||||
|
||||
bool ok = true;
|
||||
|
||||
if (ptr)
|
||||
{
|
||||
ok = Decompress(ptr, requestOffset, requestSize);
|
||||
}
|
||||
|
||||
m_requestPtr.store(nullptr, std::memory_order_release);
|
||||
m_condition.notify_one();
|
||||
|
||||
if (ok)
|
||||
{
|
||||
// Readahead
|
||||
Chunk blk = ChunkForOffset(requestOffset + requestSize);
|
||||
if (blk.chunkID >= 0)
|
||||
{
|
||||
(void)GetBlockPtr(blk, true);
|
||||
blk = ChunkForOffset(blk.offset + blk.length);
|
||||
if (blk.chunkID >= 0)
|
||||
(void)GetBlockPtr(blk, true);
|
||||
}
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
if (requestSize == m_requestSize && requestOffset == m_requestOffset && !m_requestPtr)
|
||||
{
|
||||
// If no one's added more work, mark this one as done
|
||||
m_requestSize = 0;
|
||||
}
|
||||
|
||||
m_running = false;
|
||||
m_condition.notify_one(); // For things waiting on m_running == false
|
||||
}
|
||||
}
|
||||
|
||||
ThreadedFileReader::Buffer* ThreadedFileReader::GetBlockPtr(const Chunk& block, bool isReadahead)
|
||||
{
|
||||
for (int i = 0; i < static_cast<int>(ArraySize(m_buffer)); i++)
|
||||
{
|
||||
if (m_buffer[i].valid.load(std::memory_order_acquire) && m_buffer[i].offset == block.offset)
|
||||
{
|
||||
m_nextBuffer = (i + 1) % ArraySize(m_buffer);
|
||||
return m_buffer + i;
|
||||
}
|
||||
}
|
||||
|
||||
Buffer& buf = m_buffer[m_nextBuffer];
|
||||
{
|
||||
// This can be called from both the read thread threads in ReadSync
|
||||
// Calls from ReadSync are done with the lock already held to keep the read thread out
|
||||
// Therefore we should only lock on the read thread
|
||||
std::unique_lock<std::mutex> lock(m_mtx, std::defer_lock);
|
||||
if (std::this_thread::get_id() == m_readThread.get_id())
|
||||
lock.lock();
|
||||
if (buf.size < block.length)
|
||||
buf.ptr = realloc(buf.ptr, block.length);
|
||||
buf.valid.store(false, std::memory_order_relaxed);
|
||||
|
||||
}
|
||||
int size = ReadChunk(buf.ptr, block.chunkID);
|
||||
if (size > 0)
|
||||
{
|
||||
buf.offset = block.offset;
|
||||
buf.size = size;
|
||||
buf.valid.store(true, std::memory_order_release);
|
||||
m_nextBuffer = (m_nextBuffer + 1) % ArraySize(m_buffer);
|
||||
return &buf;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool ThreadedFileReader::Decompress(void* target, u64 begin, u32 size)
|
||||
{
|
||||
Chunk blk = ChunkForOffset(begin);
|
||||
char* write = static_cast<char*>(target);
|
||||
u32 remaining = size;
|
||||
if (blk.offset != begin)
|
||||
{
|
||||
u32 off = begin - blk.offset;
|
||||
u32 len = std::min(blk.length - off, size);
|
||||
// Partial block
|
||||
if (Buffer* buf = GetBlockPtr(blk))
|
||||
{
|
||||
if (buf->size < blk.length)
|
||||
return false;
|
||||
write += CopyBlocks(write, static_cast<char*>(buf->ptr) + off, len);
|
||||
remaining -= len;
|
||||
blk = ChunkForOffset(blk.offset + blk.length);
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
while (blk.length <= remaining)
|
||||
{
|
||||
if (m_requestCancelled.load(std::memory_order_relaxed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (m_internalBlockSize)
|
||||
{
|
||||
if (Buffer* buf = GetBlockPtr(blk))
|
||||
{
|
||||
if (buf->size < blk.length)
|
||||
return false;
|
||||
write += CopyBlocks(write, buf->ptr, blk.length);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int amt = ReadChunk(write, blk.chunkID);
|
||||
if (amt < static_cast<int>(blk.length))
|
||||
return false;
|
||||
write += blk.length;
|
||||
}
|
||||
remaining -= blk.length;
|
||||
blk = ChunkForOffset(blk.offset + blk.length);
|
||||
}
|
||||
if (remaining)
|
||||
{
|
||||
if (Buffer* buf = GetBlockPtr(blk))
|
||||
{
|
||||
if (buf->size < remaining)
|
||||
return false;
|
||||
write += CopyBlocks(write, buf->ptr, remaining);
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
m_amtRead += write - static_cast<char*>(target);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ThreadedFileReader::TryCachedRead(void*& buffer, u64& offset, u32& size, const std::lock_guard<std::mutex>&)
|
||||
{
|
||||
// Run through twice so that if m_buffer[1] contains the first half and m_buffer[0] contains the second half it still works
|
||||
m_amtRead = 0;
|
||||
u64 end = 0;
|
||||
bool allDone = false;
|
||||
for (int i = 0; i < static_cast<int>(ArraySize(m_buffer) * 2); i++)
|
||||
{
|
||||
Buffer& buf = m_buffer[i % ArraySize(m_buffer)];
|
||||
if (!buf.valid.load(std::memory_order_acquire))
|
||||
continue;
|
||||
if (buf.offset <= offset && buf.offset + buf.size > offset)
|
||||
{
|
||||
u32 off = offset - buf.offset;
|
||||
u32 cpysize = std::min(size, buf.size - off);
|
||||
size_t read = CopyBlocks(buffer, static_cast<char*>(buf.ptr) + off, cpysize);
|
||||
m_amtRead += read;
|
||||
size -= cpysize;
|
||||
offset += cpysize;
|
||||
buffer = static_cast<char*>(buffer) + read;
|
||||
if (size == 0)
|
||||
end = buf.offset + buf.size;
|
||||
}
|
||||
// Do buffers contain the current and next block?
|
||||
if (end > 0 && buf.offset == end)
|
||||
allDone = true;
|
||||
}
|
||||
return allDone;
|
||||
}
|
||||
|
||||
bool ThreadedFileReader::Open(const wxString& fileName)
|
||||
{
|
||||
CancelAndWaitUntilStopped();
|
||||
return Open2(fileName);
|
||||
}
|
||||
|
||||
int ThreadedFileReader::ReadSync(void* pBuffer, uint sector, uint count)
|
||||
{
|
||||
u32 blocksize = InternalBlockSize();
|
||||
u64 offset = (u64)sector * (u64)blocksize + m_dataoffset;
|
||||
u32 size = count * blocksize;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_mtx);
|
||||
if (TryCachedRead(pBuffer, offset, size, l))
|
||||
return m_amtRead;
|
||||
|
||||
if (size > 0 && !m_running)
|
||||
{
|
||||
// Don't wait for read thread to start back up
|
||||
if (Decompress(pBuffer, offset, size))
|
||||
{
|
||||
offset += size;
|
||||
size = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (size == 0)
|
||||
{
|
||||
// For readahead
|
||||
m_requestOffset = offset - 1;
|
||||
m_requestSize = 1;
|
||||
m_requestPtr.store(nullptr, std::memory_order_relaxed);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_requestOffset = offset;
|
||||
m_requestSize = size;
|
||||
m_requestPtr.store(pBuffer, std::memory_order_relaxed);
|
||||
}
|
||||
m_requestCancelled.store(false, std::memory_order_relaxed);
|
||||
}
|
||||
m_condition.notify_one();
|
||||
if (size == 0)
|
||||
return m_amtRead;
|
||||
return FinishRead();
|
||||
}
|
||||
|
||||
void ThreadedFileReader::CancelAndWaitUntilStopped(void)
|
||||
{
|
||||
m_requestCancelled.store(true, std::memory_order_relaxed);
|
||||
std::unique_lock<std::mutex> lock(m_mtx);
|
||||
while (m_running)
|
||||
m_condition.wait(lock);
|
||||
}
|
||||
|
||||
void ThreadedFileReader::BeginRead(void* pBuffer, uint sector, uint count)
|
||||
{
|
||||
s32 blocksize = InternalBlockSize();
|
||||
u64 offset = (u64)sector * (u64)blocksize + m_dataoffset;
|
||||
u32 size = count * blocksize;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_mtx);
|
||||
if (TryCachedRead(pBuffer, offset, size, l))
|
||||
return;
|
||||
if (size == 0)
|
||||
{
|
||||
// For readahead
|
||||
m_requestOffset = offset - 1;
|
||||
m_requestSize = 1;
|
||||
m_requestPtr.store(nullptr, std::memory_order_relaxed);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_requestOffset = offset;
|
||||
m_requestSize = size;
|
||||
m_requestPtr.store(pBuffer, std::memory_order_relaxed);
|
||||
}
|
||||
m_requestCancelled.store(false, std::memory_order_relaxed);
|
||||
}
|
||||
m_condition.notify_one();
|
||||
}
|
||||
|
||||
int ThreadedFileReader::FinishRead(void)
|
||||
{
|
||||
if (m_requestPtr.load(std::memory_order_acquire) == nullptr)
|
||||
return m_amtRead;
|
||||
std::unique_lock<std::mutex> lock(m_mtx);
|
||||
while (m_requestPtr)
|
||||
m_condition.wait(lock);
|
||||
return m_amtRead;
|
||||
}
|
||||
|
||||
void ThreadedFileReader::CancelRead(void)
|
||||
{
|
||||
if (m_requestPtr.load(std::memory_order_acquire) == nullptr)
|
||||
return;
|
||||
m_requestCancelled.store(true, std::memory_order_release);
|
||||
std::unique_lock<std::mutex> lock(m_mtx);
|
||||
while (m_requestPtr.load(std::memory_order_relaxed))
|
||||
m_condition.wait(lock);
|
||||
}
|
||||
|
||||
void ThreadedFileReader::Close(void)
|
||||
{
|
||||
CancelAndWaitUntilStopped();
|
||||
for (auto& buf : m_buffer)
|
||||
buf.valid.store(false, std::memory_order_relaxed);
|
||||
Close2();
|
||||
}
|
||||
|
||||
void ThreadedFileReader::SetBlockSize(uint bytes)
|
||||
{
|
||||
m_blocksize = bytes;
|
||||
}
|
||||
|
||||
void ThreadedFileReader::SetDataOffset(int bytes)
|
||||
{
|
||||
m_dataoffset = bytes;
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/* PCSX2 - PS2 Emulator for PCs
|
||||
* Copyright (C) 2002-2021 PCSX2 Dev Team
|
||||
*
|
||||
* PCSX2 is free software: you can redistribute it and/or modify it under the terms
|
||||
* of the GNU Lesser General Public License as published by the Free Software Found-
|
||||
* ation, either version 3 of the License, or (at your option) any later version.
|
||||
*
|
||||
* PCSX2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
||||
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
* PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with PCSX2.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "AsyncFileReader.h"
|
||||
#include "Utilities/PersistentThread.h"
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
|
||||
/// A file reader for use with compressed formats
|
||||
/// Calls decompression code on a separate thread to make a synchronous decompression API async
|
||||
class ThreadedFileReader : public AsyncFileReader
|
||||
{
|
||||
ThreadedFileReader(ThreadedFileReader&&) = delete;
|
||||
protected:
|
||||
struct Chunk
|
||||
{
|
||||
/// Negative block IDs indicate invalid blocks
|
||||
s64 chunkID;
|
||||
u64 offset;
|
||||
u32 length;
|
||||
};
|
||||
|
||||
/// Set nonzero to separate block size of read blocks from m_blocksize
|
||||
/// Requires that chunk size is a multiple of internal block size
|
||||
/// Use to avoid overrunning stack because PCSX2 likes to allocate 2448-byte buffers
|
||||
int m_internalBlockSize = 0;
|
||||
|
||||
/// Get the block containing the given offset
|
||||
virtual Chunk ChunkForOffset(u64 offset) = 0;
|
||||
/// Synchronously read the given block into `dst`
|
||||
virtual int ReadChunk(void* dst, s64 chunkID) = 0;
|
||||
/// AsyncFileReader open but ThreadedFileReader needs prep work first
|
||||
virtual bool Open2(const wxString& fileName) = 0;
|
||||
/// AsyncFileReader close but ThreadedFileReader needs prep work first
|
||||
virtual void Close2(void) = 0;
|
||||
|
||||
ThreadedFileReader();
|
||||
~ThreadedFileReader();
|
||||
|
||||
private:
|
||||
int m_amtRead;
|
||||
/// Pointer to read into
|
||||
/// If null when m_requestSize > 0, indicates a request for readahead only
|
||||
std::atomic<void*> m_requestPtr{nullptr};
|
||||
/// Request offset in (internal block) bytes from the beginning of the file
|
||||
u64 m_requestOffset = 0;
|
||||
/// Request size in (internal block) bytes
|
||||
/// In addition to marking the request size, the loop thread uses this variable to decide whether there's work to do (size of 0 means no work)
|
||||
u32 m_requestSize = 0;
|
||||
/// Used to cancel requests early
|
||||
/// Note: It might take a while for the cancellation request to be noticed, wait until `m_requestPtr` is cleared to ensure it's not being written to
|
||||
std::atomic<bool> m_requestCancelled{false};
|
||||
struct Buffer
|
||||
{
|
||||
void* ptr = nullptr;
|
||||
u64 offset = 0;
|
||||
u32 size = 0;
|
||||
std::atomic<bool> valid{false};
|
||||
};
|
||||
/// 2 buffers for readahead (current block, next block)
|
||||
Buffer m_buffer[2];
|
||||
u32 m_nextBuffer = 0;
|
||||
|
||||
std::thread m_readThread;
|
||||
std::mutex m_mtx;
|
||||
std::condition_variable m_condition;
|
||||
/// True to tell the thread to exit
|
||||
bool m_quit = false;
|
||||
/// True if the thread is currently doing something other than waiting
|
||||
/// View while holding `m_mtx`. If false, you may touch decompression functions from other threads
|
||||
bool m_running = false;
|
||||
|
||||
/// Get the internal block size
|
||||
u32 InternalBlockSize() const { return m_internalBlockSize ? m_internalBlockSize : m_blocksize; }
|
||||
/// memcpy from internal to external blocks
|
||||
/// `size` is in internal block bytes
|
||||
/// Returns the number of external block bytes copied
|
||||
size_t CopyBlocks(void* dst, const void* src, size_t size) const;
|
||||
|
||||
/// Main loop of read thread
|
||||
void Loop();
|
||||
|
||||
/// Load the given block into one of the `m_buffer` buffers if necessary and return a pointer to its contents if successful
|
||||
/// Writes to `m_status` if `!isReadahead`
|
||||
Buffer* GetBlockPtr(const Chunk& block, bool isReadahead = false);
|
||||
/// Decompress from offset to size into
|
||||
bool Decompress(void* ptr, u64 offset, u32 size);
|
||||
/// Cancel any inflight read and wait until the thread is no longer doing anything
|
||||
void CancelAndWaitUntilStopped(void);
|
||||
/// Attempt to read from the cache
|
||||
/// Adjusts pointer, offset, and size if successful
|
||||
/// Returns true if no additional reads are necessary
|
||||
bool TryCachedRead(void*& buffer, u64& offset, u32& size, const std::lock_guard<std::mutex>&);
|
||||
|
||||
public:
|
||||
bool Open(const wxString& fileName) final override;
|
||||
int ReadSync(void* pBuffer, uint sector, uint count) final override;
|
||||
void BeginRead(void* pBuffer, uint sector, uint count) final override;
|
||||
int FinishRead(void) final override;
|
||||
void CancelRead(void) final override;
|
||||
void Close(void) final override;
|
||||
void SetBlockSize(uint bytes) final override;
|
||||
void SetDataOffset(int bytes) final override;
|
||||
};
|
|
@ -205,6 +205,7 @@ set(pcsx2CDVDSources
|
|||
CDVD/ChdFileReader.cpp
|
||||
CDVD/CsoFileReader.cpp
|
||||
CDVD/GzippedFileReader.cpp
|
||||
CDVD/ThreadedFileReader.cpp
|
||||
CDVD/IsoFS/IsoFile.cpp
|
||||
CDVD/IsoFS/IsoFSCDVD.cpp
|
||||
CDVD/IsoFS/IsoFS.cpp
|
||||
|
@ -224,6 +225,7 @@ set(pcsx2CDVDHeaders
|
|||
CDVD/ChdFileReader.h
|
||||
CDVD/CsoFileReader.h
|
||||
CDVD/GzippedFileReader.h
|
||||
CDVD/ThreadedFileReader.h
|
||||
CDVD/IsoFileFormats.h
|
||||
CDVD/IsoFS/IsoDirectory.h
|
||||
CDVD/IsoFS/IsoFileDescriptor.h
|
||||
|
|
|
@ -282,6 +282,7 @@
|
|||
<ClCompile Include="CDVD\CsoFileReader.cpp" />
|
||||
<ClCompile Include="CDVD\GzippedFileReader.cpp" />
|
||||
<ClCompile Include="CDVD\OutputIsoFile.cpp" />
|
||||
<ClCompile Include="CDVD\ThreadedFileReader.cpp" />
|
||||
<ClCompile Include="CDVD\Linux\DriveUtility.cpp">
|
||||
<ExcludedFromBuild>true</ExcludedFromBuild>
|
||||
</ClCompile>
|
||||
|
@ -732,6 +733,7 @@
|
|||
<ClInclude Include="CDVD\CompressedFileReaderUtils.h" />
|
||||
<ClInclude Include="CDVD\CsoFileReader.h" />
|
||||
<ClInclude Include="CDVD\GzippedFileReader.h" />
|
||||
<ClInclude Include="CDVD\ThreadedFileReader.h" />
|
||||
<ClInclude Include="CDVD\zlib_indexed.h" />
|
||||
<ClInclude Include="DebugTools\Breakpoints.h" />
|
||||
<ClInclude Include="DebugTools\DebugInterface.h" />
|
||||
|
|
|
@ -888,6 +888,9 @@
|
|||
<ClCompile Include="DebugTools\MipsStackWalk.cpp">
|
||||
<Filter>System\Ps2\Debug</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="CDVD\ThreadedFileReader.cpp">
|
||||
<Filter>System\ISO</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="CDVD\CsoFileReader.cpp">
|
||||
<Filter>System\ISO</Filter>
|
||||
</ClCompile>
|
||||
|
@ -1948,6 +1951,9 @@
|
|||
<ClInclude Include="DebugTools\MipsStackWalk.h">
|
||||
<Filter>System\Ps2\Debug</Filter>
|
||||
</ClInclude>
|
||||
<ClInclude Include="CDVD\ThreadedFileReader.h">
|
||||
<Filter>System\ISO</Filter>
|
||||
</ClInclude>
|
||||
<ClInclude Include="CDVD\CsoFileReader.h">
|
||||
<Filter>System\ISO</Filter>
|
||||
</ClInclude>
|
||||
|
|
Loading…
Reference in New Issue