diff --git a/pcsx2/IPC.cpp b/pcsx2/IPC.cpp index 0a1de5bb50..1f2011c9e0 100644 --- a/pcsx2/IPC.cpp +++ b/pcsx2/IPC.cpp @@ -22,11 +22,13 @@ #if _WIN32 #define read_portable(a, b, c) (recv(a, b, c, 0)) #define write_portable(a, b, c) (send(a, b, c, 0)) +#define close_portable(a) (closesocket(a)) #define bzero(b, len) (memset((b), '\0', (len)), (void)0) #include #else #define read_portable(a, b, c) (read(a, b, c)) #define write_portable(a, b, c) (write(a, b, c)) +#define close_portable(a) (close(a)) #include #include #endif @@ -64,6 +66,10 @@ SocketIPC::SocketIPC(SysCoreThread* vm) server.sin_addr.s_addr = inet_addr("127.0.0.1"); server.sin_port = htons(PORT); + // socket timeout + DWORD timeout = 10 * 1000; // 10 seconds + setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof timeout); + if (bind(m_sock, (struct sockaddr*)&server, sizeof(server)) == SOCKET_ERROR) { Console.WriteLn(Color_Red, "IPC: Error while binding to socket! Shutting down..."); @@ -82,6 +88,13 @@ SocketIPC::SocketIPC(SysCoreThread* vm) server.sun_family = AF_UNIX; strcpy(server.sun_path, SOCKET_NAME); + + // socket timeout + struct timeval tv; + tv.tv_sec = 10; + tv.tv_usec = 0; + setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv); + // we unlink the socket so that when releasing this thread the socket gets // freed even if we didn't close correctly the loop unlink(SOCKET_NAME); @@ -92,8 +105,10 @@ SocketIPC::SocketIPC(SysCoreThread* vm) } #endif - // maximum queue of SOMAXCONN commands before refusing, which stops the thread - listen(m_sock, SOMAXCONN); + // maximum queue of 4096 commands before refusing, approximated to the + // nearest legal value. We do not use SOMAXCONN as windows have this idea + // that a "reasonable" value is 5, which is not. + listen(m_sock, 4096); // we save a handle of the main vm object m_vm = vm; @@ -104,7 +119,7 @@ SocketIPC::SocketIPC(SysCoreThread* vm) void SocketIPC::ExecuteTaskInThread() { - int msgsock = 0; + m_end = false; // we allocate once buffers to not have to do mallocs for each IPC // request, as malloc is expansive when we optimize for µs. @@ -112,38 +127,53 @@ void SocketIPC::ExecuteTaskInThread() m_ipc_buffer = new char[MAX_IPC_SIZE]; while (true) { - msgsock = accept(m_sock, 0, 0); - if (msgsock == -1) + m_msgsock = accept(m_sock, 0, 0); + if (m_msgsock == -1) { - return; + // everything else is non recoverable in our scope + // we also mark as recoverable socket errors where it would block a + // non blocking socket, even though our socket is blocking, in case + // we ever have to implement a non blocking socket. +#ifdef _WIN32 + int errno_w = WSAGetLastError(); + if (!(errno_w == WSAECONNRESET || errno_w == WSAEINTR || errno_w == WSAEINPROGRESS || errno_w == WSAEMFILE || errno_w == WSAEWOULDBLOCK)) + { +#else + if (!(errno == ECONNABORTED || errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) + { +#endif + fprintf(stderr, "IPC: An unrecoverable error happened! Shutting down...\n"); + m_end = true; + break; + } } else { - if (read_portable(msgsock, m_ipc_buffer, 650000) < 0) - { - return; - } - else + if (read_portable(m_msgsock, m_ipc_buffer, MAX_IPC_SIZE) >= 0) { auto res = ParseCommand(m_ipc_buffer, m_ret_buffer); - if (write_portable(msgsock, res.buffer, res.size) < 0) + // we don't care about the error value as we will reset the + // connection after that anyways + if (write_portable(m_msgsock, res.buffer, res.size) < 0) { - return; } } } + close_portable(m_msgsock); } + return; } SocketIPC::~SocketIPC() { + m_end = true; #ifdef _WIN32 - closesocket(m_sock); WSACleanup(); #else - close(m_sock); unlink(SOCKET_NAME); #endif + close_portable(m_sock); + close_portable(m_msgsock); delete[] m_ret_buffer; delete[] m_ipc_buffer; // destroy the thread diff --git a/pcsx2/IPC.h b/pcsx2/IPC.h index c5749a02d6..2182b4f474 100644 --- a/pcsx2/IPC.h +++ b/pcsx2/IPC.h @@ -35,13 +35,18 @@ protected: // their SDK won't even run their own examples, so we go on TCP sockets. #define PORT 28011 SOCKET m_sock = INVALID_SOCKET; + // the message socket used in thread's accept(). + SOCKET m_msgsock = INVALID_SOCKET; #else // absolute path of the socket. Stored in the temporary directory in linux since // /run requires superuser permission const char* SOCKET_NAME = "/tmp/pcsx2.sock"; int m_sock = 0; + // the message socket used in thread's accept(). + int m_msgsock = 0; #endif + /** * Maximum memory used by an IPC message request. * Equivalent to 50,000 Write64 requests. @@ -112,7 +117,7 @@ protected: // handle to the main vm thread SysCoreThread* m_vm; - /* Thread used to relay IPC commands. */ + // Thread used to relay IPC commands. void ExecuteTaskInThread(); /* Internal function, Parses an IPC command. @@ -167,6 +172,9 @@ protected: } public: + // Whether the socket processing thread should stop executing/is stopped. + bool m_end = true; + /* Initializers */ SocketIPC(SysCoreThread* vm); virtual ~SocketIPC(); diff --git a/pcsx2/System/SysCoreThread.cpp b/pcsx2/System/SysCoreThread.cpp index 2c38c06d59..402757fe8c 100644 --- a/pcsx2/System/SysCoreThread.cpp +++ b/pcsx2/System/SysCoreThread.cpp @@ -249,6 +249,8 @@ void SysCoreThread::GameStartingInThread() m_IpcState = ON; m_socketIpc = std::make_unique(this); } + if (m_IpcState == ON && m_socketIpc->m_end) + m_socketIpc->Start(); } bool SysCoreThread::StateCheckInThread()