IPC: keep reusing the same socket to avoid windows overfill & better perf

This commit is contained in:
Gauvain 'GovanifY' Roussel-Tarbouriech 2021-02-28 22:32:29 +01:00 committed by refractionpcsx2
parent 3f9ea1d01b
commit b7bb3daa9d
2 changed files with 84 additions and 71 deletions

View File

@ -134,6 +134,44 @@ char* SocketIPC::MakeFailIPC(char* ret_buffer, uint32_t size = 5)
return ret_buffer;
}
int SocketIPC::StartSocket()
{
#ifdef _WIN32
// socket timeout
DWORD tv = 10 * 1000; // 10 seconds
#else
// socket timeout
struct timeval tv;
tv.tv_sec = 10;
tv.tv_usec = 0;
#endif
m_msgsock = accept(m_sock, 0, 0);
setsockopt(m_msgsock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
setsockopt(m_msgsock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof tv);
if (m_msgsock == -1)
{
// everything else is non recoverable in our scope
// we also mark as recoverable socket errors where it would block a
// non blocking socket, even though our socket is blocking, in case
// we ever have to implement a non blocking socket.
#ifdef _WIN32
int errno_w = WSAGetLastError();
if (!(errno_w == WSAECONNRESET || errno_w == WSAEINTR || errno_w == WSAEINPROGRESS || errno_w == WSAEMFILE || errno_w == WSAEWOULDBLOCK))
{
#else
if (!(errno == ECONNABORTED || errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
{
#endif
fprintf(stderr, "IPC: An unrecoverable error happened! Shutting down...\n");
m_end = true;
return -1;
}
}
return 0;
}
void SocketIPC::ExecuteTaskInThread()
{
m_end = false;
@ -142,92 +180,61 @@ void SocketIPC::ExecuteTaskInThread()
// request, as malloc is expansive when we optimize for µs.
m_ret_buffer = new char[MAX_IPC_RETURN_SIZE];
m_ipc_buffer = new char[MAX_IPC_SIZE];
if (StartSocket() < 0)
return;
while (true)
{
m_msgsock = accept(m_sock, 0, 0);
if (m_msgsock == -1)
// either int or ssize_t depending on the platform, so we have to
// use a bunch of auto
auto receive_length = 0;
auto end_length = 4;
// while we haven't received the entire packet, maybe due to
// socket datagram splittage, we continue to read
while (receive_length < end_length)
{
// everything else is non recoverable in our scope
// we also mark as recoverable socket errors where it would block a
// non blocking socket, even though our socket is blocking, in case
// we ever have to implement a non blocking socket.
#ifdef _WIN32
int errno_w = WSAGetLastError();
if (!(errno_w == WSAECONNRESET || errno_w == WSAEINTR || errno_w == WSAEINPROGRESS || errno_w == WSAEMFILE || errno_w == WSAEWOULDBLOCK))
auto tmp_length = read_portable(m_msgsock, &m_ipc_buffer[receive_length], MAX_IPC_SIZE - receive_length);
// we recreate the socket if an error happens
if (tmp_length <= 0)
{
#else
if (!(errno == ECONNABORTED || errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
{
#endif
fprintf(stderr, "IPC: An unrecoverable error happened! Shutting down...\n");
m_end = true;
receive_length = 0;
if (StartSocket() < 0)
return;
break;
}
}
else
{
#ifdef _WIN32
// socket timeout
DWORD tv = 10 * 1000; // 10 seconds
#else
// socket timeout
struct timeval tv;
tv.tv_sec = 10;
tv.tv_usec = 0;
#endif
setsockopt(m_msgsock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
setsockopt(m_msgsock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof tv);
receive_length += tmp_length;
// either int or ssize_t depending on the platform, so we have to
// use a bunch of auto
auto receive_length = 0;
auto end_length = 4;
// while we haven't received the entire packet, maybe due to
// socket datagram splittage, we continue to read
while (receive_length < end_length)
// if we got at least the final size then update
if (end_length == 4 && receive_length >= 4)
{
auto tmp_length = read_portable(m_msgsock, &m_ipc_buffer[receive_length], MAX_IPC_SIZE - receive_length);
// we close the connection if an error happens
if (tmp_length <= 0)
end_length = FromArray<u32>(m_ipc_buffer, 0);
// we'd like to avoid a client trying to do OOB
if (end_length > MAX_IPC_SIZE || end_length < 4)
{
receive_length = 0;
break;
}
receive_length += tmp_length;
// if we got at least the final size then update
if (end_length == 4 && receive_length >= 4)
{
end_length = FromArray<u32>(m_ipc_buffer, 0);
// we'd like to avoid a client trying to do OOB
if (end_length > MAX_IPC_SIZE || end_length < 4)
{
receive_length = 0;
break;
}
}
}
SocketIPC::IPCBuffer res;
// we remove 4 bytes to get the message size out of the IPC command
// size in ParseCommand
if (receive_length == 0)
res = IPCBuffer{5, MakeFailIPC(m_ret_buffer)};
else
res = ParseCommand(&m_ipc_buffer[4], m_ret_buffer, (u32)end_length - 4);
// we don't care about the error value as we will reset the
// connection after that anyways
if (write_portable(m_msgsock, res.buffer, res.size) < 0)
{
}
}
close_portable(m_msgsock);
SocketIPC::IPCBuffer res;
// we remove 4 bytes to get the message size out of the IPC command
// size in ParseCommand
if (receive_length == 0)
res = IPCBuffer{5, MakeFailIPC(m_ret_buffer)};
else
res = ParseCommand(&m_ipc_buffer[4], m_ret_buffer, (u32)end_length - 4);
// if we cannot send back our answer restart the socket
if (write_portable(m_msgsock, res.buffer, res.size) < 0)
{
if (StartSocket() < 0)
return;
}
}
return;
}

View File

@ -143,6 +143,12 @@ protected:
static inline char* MakeOkIPC(char* ret_buffer, uint32_t size);
static inline char* MakeFailIPC(char* ret_buffer, uint32_t size);
/**
* Initializes an open socket for IPC communication.
* return value: -1 if a fatal failure happened, 0 otherwise.
*/
int StartSocket();
/**
* Converts an uint to an char* in little endian
* res_array: the array to modify