project64/Source/Project64/UserInterface/Debugger/ScriptAPI/JSSocketWorker.cpp

607 lines
14 KiB
C++

#include <stdafx.h>
#include "JSSocketWorker.h"
#include "ScriptAPI.h"
CJSSocketWorker::CJSSocketWorker(CScriptInstance* inst, void* objectHeapPtr, bool bAllowHalfOpen) :
CScriptWorker(inst, objectHeapPtr),
m_Socket(INVALID_SOCKET),
m_bAllowHalfOpen(bAllowHalfOpen),
m_bWinsockOK(false)
{
WSADATA wsaData;
m_bWinsockOK = (WSAStartup(MAKEWORD(2, 2), &wsaData) == 0);
}
CJSSocketWorker::~CJSSocketWorker()
{
StopWorkerProc();
if (m_bWinsockOK)
{
WSACleanup();
}
}
bool CJSSocketWorker::Init(SOCKET sock)
{
if (!m_bWinsockOK)
{
JSEmitError("WSAStartup() error");
return false;
}
if (m_hThread != nullptr)
{
JSEmitError("invalid state - socket is already active");
return false;
}
m_Socket = sock;
UpdateAddresses();
return true;
}
bool CJSSocketWorker::Init(const char* host, unsigned short port)
{
if (!m_bWinsockOK)
{
JSEmitError("WSAStartup() error");
return false;
}
if (m_hThread != nullptr)
{
JSEmitError("invalid state - socket is already active");
return false;
}
m_Queue.connectHost = host;
m_Queue.connectPort = port;
m_Queue.bConnectPending = true;
return true;
}
bool CJSSocketWorker::Write(const char* data, size_t length, duk_int_t callbackId, bool bEnd)
{
CGuard guard(m_Queue.cs);
if (m_Queue.bFullClosePending ||
m_Queue.bSendClosePending ||
m_Queue.bSendClosed)
{
return false;
}
if(bEnd)
{
m_Queue.bSendClosePending = true;
}
BufferedWrite bufferedWrite;
bufferedWrite.offset = 0;
bufferedWrite.callbackId = callbackId;
if (length != 0)
{
bufferedWrite.data.resize(length);
memcpy(&bufferedWrite.data[0], data, length);
}
m_Queue.writes.push_back(bufferedWrite);
return true;
}
void CJSSocketWorker::WorkerProc()
{
TIMEVAL timeout;
timeout.tv_sec = 0;
timeout.tv_usec = TIMEOUT_MS * 1000;
bool bHaveConnection = false;
bool bConnectPending = false;
bool bWritesPending = false;
bool bRecvClosed = false;
{
CGuard guard(m_Queue.cs);
bConnectPending = m_Queue.bConnectPending;
}
if (!bConnectPending)
{
// assume it's already connected
bHaveConnection = true;
}
if (bConnectPending && ProcConnect())
{
bConnectPending = false;
}
while (true)
{
if (StopRequested())
{
break;
}
{
CGuard guard(m_Queue.cs);
bWritesPending = m_Queue.writes.size() > 0;
bRecvClosed = m_Queue.bRecvClosed;
if (m_Queue.bFullClosePending && !bWritesPending)
{
break;
}
}
if (bRecvClosed && !bWritesPending)
{
// nothing to do
Sleep(TIMEOUT_MS);
continue;
}
int numFds;
fd_set readFds, writeFds;
fd_set* pWriteFds = nullptr;
fd_set* pReadFds = nullptr;
if (!bRecvClosed)
{
pReadFds = &readFds;
FD_ZERO(pReadFds);
FD_SET(m_Socket, pReadFds);
}
if (bWritesPending || !bHaveConnection)
{
pWriteFds = &writeFds;
FD_ZERO(pWriteFds);
FD_SET(m_Socket, pWriteFds);
}
numFds = select(0, pReadFds, pWriteFds, nullptr, &timeout);
if (numFds == SOCKET_ERROR)
{
JSEmitError("select() error");
break;
}
if (numFds == 0)
{
continue;
}
bool bWritable = pWriteFds && FD_ISSET(m_Socket, pWriteFds);
bool bReadable = pReadFds && FD_ISSET(m_Socket, pReadFds);
if (bWritable && !m_Queue.bSendClosed)
{
if (!bHaveConnection)
{
bHaveConnection = true;
JSEmitConnect();
}
if (bWritesPending)
{
ProcSendData();
}
}
else if (bHaveConnection)
{
JSEmitError("connection reset");
break;
}
if (bReadable)
{
ProcRecvData();
}
}
closesocket(m_Socket);
JSEmitClose();
ClearAddress();
ClearQueue();
m_Socket = INVALID_SOCKET;
m_Instance->PostCMethodCall(m_DukObjectHeapPtr, ScriptAPI::js__UnrefObject);
}
bool CJSSocketWorker::ProcConnect()
{
stdstr strPort, strHost;
{
CGuard guard(m_Queue.cs);
strPort = stdstr_f("%d", m_Queue.connectPort);
strHost = m_Queue.connectHost;
m_Queue.bConnectPending = false;
}
struct addrinfo hints, *result, *ptr;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int rc;
rc = getaddrinfo(m_Queue.connectHost.c_str(), strPort.c_str(), &hints, &result);
if (rc != 0)
{
JSEmitError("getaddrinfo() error");
JSEmitLookup(m_RemoteAddress); // port=0, err
freeaddrinfo(result);
return false;
}
for (ptr = result; ptr != nullptr; ptr = ptr->ai_next)
{
m_Socket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (m_Socket == INVALID_SOCKET)
{
JSEmitError("socket() error");
freeaddrinfo(result);
return false;
}
rc = connect(m_Socket, ptr->ai_addr, (int)ptr->ai_addrlen);
if (rc != SOCKET_ERROR)
{
ULONG nonBlock = 1;
rc = ioctlsocket(m_Socket, FIONBIO, &nonBlock);
if (rc == SOCKET_ERROR)
{
JSEmitError("ioctlsocket() FIONBIO error");
closesocket(m_Socket);
freeaddrinfo(result);
return false;
}
UpdateAddresses();
JSEmitLookup(m_RemoteAddress);
freeaddrinfo(result);
return true;
}
closesocket(m_Socket);
m_Socket = INVALID_SOCKET;
}
return false;
}
void CJSSocketWorker::ProcSendData()
{
CGuard guard(m_Queue.cs);
BufferedWrite& bufferedWrite = m_Queue.writes.front();
int avail = bufferedWrite.data.size() - bufferedWrite.offset;
int numBytesSent = send(m_Socket, &bufferedWrite.data[bufferedWrite.offset], avail, 0);
if (numBytesSent >= 0)
{
bufferedWrite.offset += numBytesSent;
if (bufferedWrite.offset == bufferedWrite.data.size())
{
if (bufferedWrite.callbackId != -1)
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr, ScriptAPI::js_Socket__invokeWriteCallback,
CbArgs_Write, &bufferedWrite.callbackId, sizeof(bufferedWrite.callbackId));
}
m_Queue.writes.erase(m_Queue.writes.begin());
}
if (m_Queue.writes.size() == 0)
{
JSEmitDrain();
}
}
else
{
JSEmitError("send() error");
}
if (m_Queue.writes.size() == 0 &&
m_Queue.bSendClosePending)
{
shutdown(m_Socket, SD_SEND);
m_Queue.bSendClosePending = false;
m_Queue.bSendClosed = true;
m_Instance->PostCMethodCall(m_DukObjectHeapPtr, ScriptAPI::js_Socket__invokeWriteEndCallbacks);
}
}
void CJSSocketWorker::ProcRecvData()
{
char recvBuffer[8192];
int numBytesReceived = recv(m_Socket, recvBuffer, sizeof(recvBuffer), 0);
if (numBytesReceived > 0)
{
JSEmitData(recvBuffer, numBytesReceived);
}
else if (numBytesReceived == 0)
{
JSEmitEnd();
CGuard guard(m_Queue.cs);
if (!m_bAllowHalfOpen)
{
m_Queue.bFullClosePending = true;
}
m_Queue.bRecvClosed = true;
}
else
{
JSEmitError("recv() error");
}
}
void CJSSocketWorker::UpdateAddresses()
{
CGuard guard(m_CS);
WSAPROTOCOL_INFO protocolInfo;
int protocolInfoSize = sizeof(protocolInfo);
if (getsockopt(m_Socket, SOL_SOCKET, SO_PROTOCOL_INFO, (char*)&protocolInfo, &protocolInfoSize) != 0)
{
return;
}
int& family = protocolInfo.iAddressFamily;
sockaddr* pLocalAddr = nullptr;
sockaddr* pRemoteAddr = nullptr;
union {
sockaddr_in ipv4;
sockaddr_in6 ipv6;
} localAddr;
union {
sockaddr_in ipv4;
sockaddr_in6 ipv6;
} remoteAddr;
int addrSize = 0;
if (family == AF_INET)
{
pLocalAddr = (sockaddr*)&localAddr.ipv4;
pRemoteAddr = (sockaddr*)&remoteAddr.ipv4;
addrSize = sizeof(sockaddr_in);
}
else if (family == AF_INET6)
{
pLocalAddr = (sockaddr*)&localAddr.ipv6;
pRemoteAddr = (sockaddr*)&remoteAddr.ipv6;
addrSize = sizeof(sockaddr_in6);
}
else
{
g_Notify->BreakPoint(__FILE__, __LINE__);
return;
}
getsockname(m_Socket, pLocalAddr, &addrSize);
getpeername(m_Socket, pRemoteAddr, &addrSize);
getnameinfo(pLocalAddr, addrSize,
m_LocalAddress.address, sizeof(m_LocalAddress.address),
0, 0, NI_NUMERICHOST);
getnameinfo(pRemoteAddr, addrSize,
m_RemoteAddress.address, sizeof(m_RemoteAddress.address),
0, 0, NI_NUMERICHOST);
if (family == AF_INET)
{
m_LocalAddress.port = ntohs(localAddr.ipv4.sin_port);
m_LocalAddress.family = "IPv4";
m_RemoteAddress.port = ntohs(remoteAddr.ipv4.sin_port);
m_RemoteAddress.family = "IPv4";
}
else
{
m_LocalAddress.port = ntohs(localAddr.ipv6.sin6_port);
m_LocalAddress.family = "IPv6";
m_RemoteAddress.port = ntohs(remoteAddr.ipv6.sin6_port);
m_RemoteAddress.family = "IPv6";
}
}
void CJSSocketWorker::JSEmitConnect()
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitConnect);
}
void CJSSocketWorker::JSEmitData(const char* data, size_t size)
{
JSEmitDataEnv env;
env.data = new char[size];
env.size = size;
memcpy(env.data, data, size);
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitData, (void*)&env, sizeof(env));
}
void CJSSocketWorker::JSEmitEnd()
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitEnd);
}
void CJSSocketWorker::JSEmitClose()
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitClose);
}
void CJSSocketWorker::JSEmitDrain()
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitDrain);
}
void CJSSocketWorker::JSEmitLookup(JSSocketAddrInfo& addr)
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitLookup, (void*)&addr, sizeof(addr));
}
void CJSSocketWorker::JSEmitError(const char* errMessage)
{
m_Instance->PostCMethodCall(m_DukObjectHeapPtr,
ScriptAPI::js__Emitter_emit, CbArgs_EmitError, (void*)errMessage, strlen(errMessage) + 1);
}
duk_idx_t CJSSocketWorker::CbArgs_EmitConnect(duk_context* ctx, void* /*_env*/)
{
duk_push_string(ctx, "connect");
return 1;
}
duk_idx_t CJSSocketWorker::CbArgs_EmitData(duk_context* ctx, void* _env)
{
JSEmitDataEnv* env = (JSEmitDataEnv*)_env;
duk_push_string(ctx, "data");
char* buffer = (char*)duk_push_fixed_buffer(ctx, env->size);
memcpy(buffer, env->data, env->size);
delete[] env->data;
duk_push_buffer_object(ctx, -1, 0, env->size, DUK_BUFOBJ_NODEJS_BUFFER);
duk_remove(ctx, -2);
return 2;
}
duk_idx_t CJSSocketWorker::CbArgs_EmitEnd(duk_context* ctx, void* /*_env*/)
{
duk_push_string(ctx, "end");
return 1;
}
duk_idx_t CJSSocketWorker::CbArgs_EmitClose(duk_context* ctx, void* /*_env*/)
{
duk_push_string(ctx, "close");
return 1;
}
duk_idx_t CJSSocketWorker::CbArgs_EmitDrain(duk_context* ctx, void* /*_env*/)
{
duk_push_string(ctx, "drain");
return 1;
}
duk_idx_t CJSSocketWorker::CbArgs_EmitLookup(duk_context* ctx, void* _env)
{
JSSocketAddrInfo* addr = (JSSocketAddrInfo*)_env;
duk_push_string(ctx, "lookup");
duk_push_object(ctx);
if (addr->port != 0)
{
duk_push_null(ctx);
}
else
{
duk_push_error_object(ctx, DUK_ERR_ERROR, "dns lookup error");
}
duk_put_prop_string(ctx, -2, "err");
duk_push_string(ctx, addr->address);
duk_put_prop_string(ctx, -2, "address");
duk_push_uint(ctx, addr->port);
duk_put_prop_string(ctx, -2, "port");
duk_push_string(ctx, addr->family);
duk_put_prop_string(ctx, -2, "family");
return 2;
}
duk_idx_t CJSSocketWorker::CbArgs_EmitError(duk_context* ctx, void* _env)
{
const char* errMessage = (const char*)_env;
duk_push_string(ctx, "error");
duk_push_error_object(ctx, DUK_ERR_ERROR, errMessage);
return 2;
}
duk_idx_t CJSSocketWorker::CbArgs_Write(duk_context* ctx, void* _env)
{
duk_int_t callbackId = *(duk_int_t*)_env;
duk_push_int(ctx, callbackId);
return 1;
}
std::string CJSSocketWorker::GetLocalAddress()
{
CGuard guard(m_CS);
return m_LocalAddress.address;
}
unsigned short CJSSocketWorker::GetLocalPort()
{
CGuard guard(m_CS);
return m_LocalAddress.port;
}
std::string CJSSocketWorker::GetRemoteAddress()
{
CGuard guard(m_CS);
return m_RemoteAddress.address;
}
unsigned short CJSSocketWorker::GetRemotePort()
{
CGuard guard(m_CS);
return m_RemoteAddress.port;
}
const char* CJSSocketWorker::GetFamily()
{
CGuard guard(m_CS);
return m_LocalAddress.family;
}
void CJSSocketWorker::ClearAddress()
{
CGuard guard(m_CS);
m_LocalAddress.port = 0;
memset(m_LocalAddress.address, 0, sizeof(m_LocalAddress.address));
m_LocalAddress.family = "";
m_RemoteAddress.port = 0;
memset(m_RemoteAddress.address, 0, sizeof(m_RemoteAddress.address));
m_RemoteAddress.family = "";
}
void CJSSocketWorker::ClearQueue()
{
CGuard guard(m_Queue.cs);
m_Queue.bSendClosePending = false;
m_Queue.bSendClosed = false;
m_Queue.bFullClosePending = false;
m_Queue.bConnectPending = false;
m_Queue.connectHost = "";
m_Queue.connectPort = 0;
m_Queue.writes.clear();
}