IPC: implement better error handling

This commit is contained in:
Gauvain 'GovanifY' Roussel-Tarbouriech 2020-08-21 22:13:03 +02:00 committed by tellowkrinkle
parent 439ed9617c
commit d70309503f
3 changed files with 56 additions and 16 deletions

View File

@ -22,11 +22,13 @@
#if _WIN32
#define read_portable(a, b, c) (recv(a, b, c, 0))
#define write_portable(a, b, c) (send(a, b, c, 0))
#define close_portable(a) (closesocket(a))
#define bzero(b, len) (memset((b), '\0', (len)), (void)0)
#include <windows.h>
#else
#define read_portable(a, b, c) (read(a, b, c))
#define write_portable(a, b, c) (write(a, b, c))
#define close_portable(a) (close(a))
#include <sys/socket.h>
#include <sys/un.h>
#endif
@ -64,6 +66,10 @@ SocketIPC::SocketIPC(SysCoreThread* vm)
server.sin_addr.s_addr = inet_addr("127.0.0.1");
server.sin_port = htons(PORT);
// socket timeout
DWORD timeout = 10 * 1000; // 10 seconds
setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof timeout);
if (bind(m_sock, (struct sockaddr*)&server, sizeof(server)) == SOCKET_ERROR)
{
Console.WriteLn(Color_Red, "IPC: Error while binding to socket! Shutting down...");
@ -82,6 +88,13 @@ SocketIPC::SocketIPC(SysCoreThread* vm)
server.sun_family = AF_UNIX;
strcpy(server.sun_path, SOCKET_NAME);
// socket timeout
struct timeval tv;
tv.tv_sec = 10;
tv.tv_usec = 0;
setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
// we unlink the socket so that when releasing this thread the socket gets
// freed even if we didn't close correctly the loop
unlink(SOCKET_NAME);
@ -92,8 +105,10 @@ SocketIPC::SocketIPC(SysCoreThread* vm)
}
#endif
// maximum queue of SOMAXCONN commands before refusing, which stops the thread
listen(m_sock, SOMAXCONN);
// maximum queue of 4096 commands before refusing, approximated to the
// nearest legal value. We do not use SOMAXCONN as windows have this idea
// that a "reasonable" value is 5, which is not.
listen(m_sock, 4096);
// we save a handle of the main vm object
m_vm = vm;
@ -104,7 +119,7 @@ SocketIPC::SocketIPC(SysCoreThread* vm)
void SocketIPC::ExecuteTaskInThread()
{
int msgsock = 0;
m_end = false;
// we allocate once buffers to not have to do mallocs for each IPC
// request, as malloc is expansive when we optimize for µs.
@ -112,38 +127,53 @@ void SocketIPC::ExecuteTaskInThread()
m_ipc_buffer = new char[MAX_IPC_SIZE];
while (true)
{
msgsock = accept(m_sock, 0, 0);
if (msgsock == -1)
m_msgsock = accept(m_sock, 0, 0);
if (m_msgsock == -1)
{
return;
// everything else is non recoverable in our scope
// we also mark as recoverable socket errors where it would block a
// non blocking socket, even though our socket is blocking, in case
// we ever have to implement a non blocking socket.
#ifdef _WIN32
int errno_w = WSAGetLastError();
if (!(errno_w == WSAECONNRESET || errno_w == WSAEINTR || errno_w == WSAEINPROGRESS || errno_w == WSAEMFILE || errno_w == WSAEWOULDBLOCK))
{
#else
if (!(errno == ECONNABORTED || errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
{
#endif
fprintf(stderr, "IPC: An unrecoverable error happened! Shutting down...\n");
m_end = true;
break;
}
}
else
{
if (read_portable(msgsock, m_ipc_buffer, 650000) < 0)
{
return;
}
else
if (read_portable(m_msgsock, m_ipc_buffer, MAX_IPC_SIZE) >= 0)
{
auto res = ParseCommand(m_ipc_buffer, m_ret_buffer);
if (write_portable(msgsock, res.buffer, res.size) < 0)
// we don't care about the error value as we will reset the
// connection after that anyways
if (write_portable(m_msgsock, res.buffer, res.size) < 0)
{
return;
}
}
}
close_portable(m_msgsock);
}
return;
}
SocketIPC::~SocketIPC()
{
m_end = true;
#ifdef _WIN32
closesocket(m_sock);
WSACleanup();
#else
close(m_sock);
unlink(SOCKET_NAME);
#endif
close_portable(m_sock);
close_portable(m_msgsock);
delete[] m_ret_buffer;
delete[] m_ipc_buffer;
// destroy the thread

View File

@ -35,13 +35,18 @@ protected:
// their SDK won't even run their own examples, so we go on TCP sockets.
#define PORT 28011
SOCKET m_sock = INVALID_SOCKET;
// the message socket used in thread's accept().
SOCKET m_msgsock = INVALID_SOCKET;
#else
// absolute path of the socket. Stored in the temporary directory in linux since
// /run requires superuser permission
const char* SOCKET_NAME = "/tmp/pcsx2.sock";
int m_sock = 0;
// the message socket used in thread's accept().
int m_msgsock = 0;
#endif
/**
* Maximum memory used by an IPC message request.
* Equivalent to 50,000 Write64 requests.
@ -112,7 +117,7 @@ protected:
// handle to the main vm thread
SysCoreThread* m_vm;
/* Thread used to relay IPC commands. */
// Thread used to relay IPC commands.
void ExecuteTaskInThread();
/* Internal function, Parses an IPC command.
@ -167,6 +172,9 @@ protected:
}
public:
// Whether the socket processing thread should stop executing/is stopped.
bool m_end = true;
/* Initializers */
SocketIPC(SysCoreThread* vm);
virtual ~SocketIPC();

View File

@ -249,6 +249,8 @@ void SysCoreThread::GameStartingInThread()
m_IpcState = ON;
m_socketIpc = std::make_unique<SocketIPC>(this);
}
if (m_IpcState == ON && m_socketIpc->m_end)
m_socketIpc->Start();
}
bool SysCoreThread::StateCheckInThread()