Moving socket stuff to core/.

Stubbed win32 methods.
This commit is contained in:
Ben Vanik 2013-02-06 22:55:18 -08:00
parent 2c42ea909b
commit c1e2119db4
9 changed files with 422 additions and 128 deletions

View File

@ -18,6 +18,7 @@
#include <xenia/core/mutex.h>
#include <xenia/core/pal.h>
#include <xenia/core/ref.h>
#include <xenia/core/socket.h>
#include <xenia/core/thread.h>
#endif // XENIA_CORE_H_

56
src/xenia/core/socket.h Normal file
View File

@ -0,0 +1,56 @@
/**
******************************************************************************
* Xenia : Xbox 360 Emulator Research Project *
******************************************************************************
* Copyright 2013 Ben Vanik. All rights reserved. *
* Released under the BSD license - see LICENSE in the root for more details. *
******************************************************************************
*/
#ifndef XENIA_CORE_SOCKET_H_
#define XENIA_CORE_SOCKET_H_
#include <xenia/common.h>
#include <xenia/core.h>
typedef int socket_t;
#define XE_INVALID_SOCKET -1
void xe_socket_init();
socket_t xe_socket_create_tcp();
void xe_socket_close(socket_t socket);
void xe_socket_set_keepalive(socket_t socket, bool value);
void xe_socket_set_reuseaddr(socket_t socket, bool value);
void xe_socket_set_nodelay(socket_t socket, bool value);
void xe_socket_set_nonblock(socket_t socket, bool value);
int xe_socket_bind(socket_t socket, uint32_t port);
int xe_socket_listen(socket_t socket);
typedef struct {
socket_t socket;
char addr[16];
} xe_socket_connection_t;
int xe_socket_accept(socket_t socket, xe_socket_connection_t* out_client_info);
ssize_t xe_socket_send(socket_t socket, const uint8_t* data, size_t length,
int flags, int* out_error_code);
ssize_t xe_socket_recv(socket_t socket, uint8_t* data, size_t length, int flags,
int* out_error_code);
typedef struct xe_socket_loop xe_socket_loop_t;
xe_socket_loop_t* xe_socket_loop_create(socket_t socket);
void xe_socket_loop_destroy(xe_socket_loop_t* loop);
int xe_socket_loop_poll(xe_socket_loop_t* loop,
bool check_read, bool check_write);
void xe_socket_loop_set_queued_write(xe_socket_loop_t* loop);
bool xe_socket_loop_check_queued_write(xe_socket_loop_t* loop);
bool xe_socket_loop_check_socket_recv(xe_socket_loop_t* loop);
bool xe_socket_loop_check_socket_send(xe_socket_loop_t* loop);
#endif // XENIA_CORE_SOCKET_H_

View File

@ -0,0 +1,214 @@
/**
******************************************************************************
* Xenia : Xbox 360 Emulator Research Project *
******************************************************************************
* Copyright 2013 Ben Vanik. All rights reserved. *
* Released under the BSD license - see LICENSE in the root for more details. *
******************************************************************************
*/
#include <xenia/core/socket.h>
#include <fcntl.h>
#include <poll.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
void xe_socket_init() {
// No-op.
}
socket_t xe_socket_create_tcp() {
socket_t socket_result = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket_result < 1) {
return XE_INVALID_SOCKET;
}
return socket_result;
}
void xe_socket_close(socket_t socket) {
shutdown(socket, SHUT_WR);
close(socket);
}
void xe_socket_set_keepalive(socket_t socket, bool value) {
int opt_value = value ? 1 : 0;
setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
&opt_value, sizeof(opt_value));
}
void xe_socket_set_reuseaddr(socket_t socket, bool value) {
int opt_value = value ? 1 : 0;
setsockopt(socket, SOL_SOCKET, SO_REUSEADDR,
&opt_value, sizeof(opt_value));
}
void xe_socket_set_nodelay(socket_t socket, bool value) {
int opt_value = value ? 1 : 0;
setsockopt(socket, IPPROTO_TCP, TCP_NODELAY,
&opt_value, sizeof(opt_value));
}
void xe_socket_set_nonblock(socket_t socket, bool value) {
int flags;
while ((flags = fcntl(socket, F_GETFL, 0)) == -1 && errno == EINTR);
if (flags == -1) {
return;
}
int r;
while ((r = fcntl(socket, F_SETFL, flags | O_NONBLOCK)) == -1 &&
errno == EINTR);
if (r == -1) {
return;
}
}
int xe_socket_bind(socket_t socket, uint32_t port) {
struct sockaddr_in socket_addr;
socket_addr.sin_family = AF_INET;
socket_addr.sin_addr.s_addr = htonl(INADDR_ANY);
socket_addr.sin_port = htons(port);
int r = bind(socket, (struct sockaddr*)&socket_addr, sizeof(socket_addr));
if (r < 0) {
return 1;
}
return 0;
}
int xe_socket_listen(socket_t socket) {
int r = listen(socket, 5);
if (r < 0) {
return 1;
}
return 0;
}
int xe_socket_accept(socket_t socket, xe_socket_connection_t* out_client_info) {
struct sockaddr_in client_addr;
socklen_t client_count = sizeof(client_addr);
int client_socket_id = accept(socket, (struct sockaddr*)&client_addr,
&client_count);
if (client_socket_id < 0) {
return 1;
}
out_client_info->socket = client_socket_id;
int client_ip = client_addr.sin_addr.s_addr;
inet_ntop(AF_INET, &client_ip,
out_client_info->addr, XECOUNT(out_client_info->addr));
return 0;
}
ssize_t xe_socket_send(socket_t socket, const uint8_t* data, size_t length,
int flags, int* out_error_code) {
ssize_t result = send(socket, data, length, flags);
*out_error_code = errno;
return result;
}
ssize_t xe_socket_recv(socket_t socket, uint8_t* data, size_t length, int flags,
int* out_error_code) {
ssize_t result = recv(socket, data, length, flags);
*out_error_code = errno;
return result;
}
struct xe_socket_loop {
socket_t socket;
int notify_rd_id;
int notify_wr_id;
struct pollfd events[2];
bool pending_queued_write;
bool pending_recv;
bool pending_send;
};
xe_socket_loop_t* xe_socket_loop_create(socket_t socket) {
xe_socket_loop_t* loop = (xe_socket_loop_t*)xe_calloc(
sizeof(xe_socket_loop_t));
loop->socket = socket;
int notify_ids[2];
socketpair(PF_LOCAL, SOCK_STREAM, 0, notify_ids);
loop->notify_rd_id = notify_ids[0];
loop->notify_wr_id = notify_ids[1];
loop->events[0].fd = socket;
loop->events[0].events = POLLIN;
loop->events[1].fd = loop->notify_rd_id;
loop->events[1].events = POLLIN;
return loop;
}
void xe_socket_loop_destroy(xe_socket_loop_t* loop) {
close(loop->notify_rd_id);
close(loop->notify_wr_id);
xe_free(loop);
}
int xe_socket_loop_poll(xe_socket_loop_t* loop,
bool check_read, bool check_write) {
// Prep events object.
if (check_read) {
loop->events[0].events |= POLLIN;
}
if (check_write) {
loop->events[0].events |= POLLOUT;
}
// Poll.
int r;
while ((r = poll(loop->events, XECOUNT(loop->events), -1)) == -1 &&
errno == EINTR);
if (r == -1) {
return 1;
}
// If we failed, die.
if (loop->events[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
return 2;
}
// Check queued write.
loop->pending_queued_write = loop->events[1].revents != 0;
if (loop->pending_queued_write) {
uint8_t dummy;
XEIGNORE(recv(loop->notify_rd_id, &dummy, 1, 0));
}
loop->events[1].revents = 0;
loop->events[1].events = POLL_IN;
// Check send/recv.
loop->pending_recv = (loop->events[0].revents & POLLIN) != 0;
loop->pending_send = (loop->events[0].revents & POLLOUT) != 0;
loop->events[0].revents = 0;
loop->events[0].events = 0;
return 0;
}
void xe_socket_loop_set_queued_write(xe_socket_loop_t* loop) {
uint8_t b = 0xFF;
write(loop->notify_wr_id, &b, 1);
}
bool xe_socket_loop_check_queued_write(xe_socket_loop_t* loop) {
return loop->pending_queued_write;
}
bool xe_socket_loop_check_socket_recv(xe_socket_loop_t* loop) {
return loop->pending_recv;
}
bool xe_socket_loop_check_socket_send(xe_socket_loop_t* loop) {
return loop->pending_send;
}

View File

@ -0,0 +1,90 @@
/**
******************************************************************************
* Xenia : Xbox 360 Emulator Research Project *
******************************************************************************
* Copyright 2013 Ben Vanik. All rights reserved. *
* Released under the BSD license - see LICENSE in the root for more details. *
******************************************************************************
*/
#include <xenia/core/socket.h>
#include <winsock2.h>
// TODO(benvanik): win32 calls
void xe_socket_init() {
// TODO(benvanik): do WSA init
}
socket_t xe_socket_create_tcp() {
}
void xe_socket_close(socket_t socket) {
}
void xe_socket_set_keepalive(socket_t socket, bool value) {
}
void xe_socket_set_reuseaddr(socket_t socket, bool value) {
}
void xe_socket_set_nodelay(socket_t socket, bool value) {
}
void xe_socket_set_nonblock(socket_t socket, bool value) {
}
int xe_socket_bind(socket_t socket, uint32_t port) {
}
int xe_socket_listen(socket_t socket) {
}
int xe_socket_accept(socket_t socket, xe_socket_connection_t* out_client_info) {
return 0;
}
ssize_t xe_socket_send(socket_t socket, const uint8_t* data, size_t length,
int flags, int* out_error_code) {
}
ssize_t xe_socket_recv(socket_t socket, uint8_t* data, size_t length, int flags,
int* out_error_code) {
}
struct xe_socket_loop {
socket_t socket;
};
xe_socket_loop_t* xe_socket_loop_create(socket_t socket) {
xe_socket_loop_t* loop = (xe_socket_loop_t*)xe_calloc(
sizeof(xe_socket_loop_t));
loop->socket = socket;
return loop;
}
void xe_socket_loop_destroy(xe_socket_loop_t* loop) {
xe_free(loop);
}
int xe_socket_loop_poll(xe_socket_loop_t* loop,
bool check_read, bool check_write) {
return 0;
}
void xe_socket_loop_set_queued_write(xe_socket_loop_t* loop) {
}
bool xe_socket_loop_check_queued_write(xe_socket_loop_t* loop) {
}
bool xe_socket_loop_check_socket_recv(xe_socket_loop_t* loop) {
}
bool xe_socket_loop_check_socket_send(xe_socket_loop_t* loop) {
}

View File

@ -11,6 +11,7 @@
'pal.h',
'ref.cc',
'ref.h',
'socket.h',
'thread.cc',
'thread.h',
],
@ -20,12 +21,14 @@
'sources': [
'mmap_posix.cc',
'mutex_posix.cc',
'socket_posix.cc',
],
}],
['OS == "win"', {
'sources': [
'mmap_win.cc',
'mutex_win.cc',
'socket_win.cc',
],
}],
],

View File

@ -12,11 +12,6 @@
#include <xenia/dbg/debugger.h>
#include <xenia/dbg/simple_sha1.h>
#include <fcntl.h>
#include <poll.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <wslay/wslay.h>
@ -30,26 +25,25 @@ WsClient::WsClient(Debugger* debugger, int socket_id) :
socket_id_(socket_id) {
mutex_ = xe_mutex_alloc(1000);
int notify_ids[2];
socketpair(PF_LOCAL, SOCK_STREAM, 0, notify_ids);
notify_rd_id_ = notify_ids[0];
notify_wr_id_ = notify_ids[1];
loop_ = xe_socket_loop_create(socket_id);
}
WsClient::~WsClient() {
xe_mutex_t* mutex = mutex_;
xe_mutex_lock(mutex);
mutex_ = NULL;
shutdown(socket_id_, SHUT_WR);
close(socket_id_);
xe_socket_close(socket_id_);
socket_id_ = 0;
xe_socket_loop_destroy(loop_);
loop_ = NULL;
xe_mutex_unlock(mutex);
xe_mutex_free(mutex);
xe_thread_release(thread_);
close(notify_rd_id_);
close(notify_wr_id_);
}
int WsClient::socket_id() {
@ -58,13 +52,8 @@ int WsClient::socket_id() {
int WsClient::Setup() {
// Prep the socket.
int opt_value;
opt_value = 1;
setsockopt(socket_id_, SOL_SOCKET, SO_KEEPALIVE,
&opt_value, sizeof(opt_value));
opt_value = 1;
setsockopt(socket_id_, IPPROTO_TCP, TCP_NODELAY,
&opt_value, sizeof(opt_value));
xe_socket_set_keepalive(socket_id_, true);
xe_socket_set_nodelay(socket_id_, true);
xe_pal_ref pal = debugger_->pal();
thread_ = xe_thread_create(pal, "Debugger Client",
@ -85,18 +74,12 @@ ssize_t WsClientSendCallback(wslay_event_context_ptr ctx,
void* user_data) {
WsClient* client = reinterpret_cast<WsClient*>(user_data);
int sflags = 0;
#ifdef MSG_MORE
if (flags & WSLAY_MSG_MORE) {
sflags |= MSG_MORE;
}
#endif // MSG_MORE
int error_code = 0;
ssize_t r;
while ((r = send(client->socket_id(), data, len, sflags)) == -1 &&
errno == EINTR);
while ((r = xe_socket_send(client->socket_id(), data, len, 0,
&error_code)) == -1 && error_code == EINTR);
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (error_code == EAGAIN || error_code == EWOULDBLOCK) {
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
} else {
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
@ -109,11 +92,13 @@ ssize_t WsClientRecvCallback(wslay_event_context_ptr ctx,
uint8_t* data, size_t len, int flags,
void* user_data) {
WsClient* client = reinterpret_cast<WsClient*>(user_data);
int error_code = 0;
ssize_t r;
while ((r = recv(client->socket_id(), data, len, 0)) == -1 &&
errno == EINTR);
while ((r = xe_socket_recv(client->socket_id(), data, len, 0,
&error_code)) == -1 && error_code == EINTR);
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (error_code == EAGAIN || error_code == EWOULDBLOCK) {
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
} else {
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
@ -180,13 +165,14 @@ std::string EncodeBase64(const uint8_t* input, size_t length) {
int WsClient::PerformHandshake() {
std::string headers;
char buffer[4096];
uint8_t buffer[4096];
int error_code = 0;
ssize_t r;
while (true) {
while ((r = read(socket_id_, buffer, sizeof(buffer))) == -1 &&
errno == EINTR);
while ((r = xe_socket_recv(socket_id_, buffer, sizeof(buffer), 0,
&error_code)) == -1 && error_code == EINTR);
if (r == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
if (error_code == EWOULDBLOCK || error_code == EAGAIN) {
if (!headers.size()) {
// Nothing read yet - spin.
continue;
@ -241,10 +227,12 @@ int WsClient::PerformHandshake() {
size_t write_offset = 0;
size_t write_length = response.size();
while (true) {
while ((r = write(socket_id_, response.c_str() + write_offset,
write_length)) == -1 && errno == EINTR);
while ((r = xe_socket_send(socket_id_,
(uint8_t*)response.c_str() + write_offset,
write_length, 0, &error_code)) == -1 &&
error_code == EINTR);
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (error_code == EAGAIN || error_code == EWOULDBLOCK) {
break;
} else {
XELOGE(XT("HTTP response write failure"));
@ -263,20 +251,8 @@ int WsClient::PerformHandshake() {
}
void WsClient::EventThread() {
int r;
// Enable non-blocking IO on the socket.
int flags;
while ((flags = fcntl(socket_id_, F_GETFL, 0)) == -1 &&
errno == EINTR);
if (flags == -1) {
return;
}
while ((r = fcntl(socket_id_, F_SETFL, flags | O_NONBLOCK)) == -1 &&
errno == EINTR);
if (r == -1) {
return;
}
xe_socket_set_nonblock(socket_id_, true);
// First run the HTTP handshake.
// This will fail if the connection is not for websockets.
@ -299,26 +275,17 @@ void WsClient::EventThread() {
wslay_event_context_ptr ctx;
wslay_event_context_server_init(&ctx, &callbacks, this);
// Event for waiting on input.
struct pollfd events[2];
xe_zero_struct(&events, sizeof(events));
events[0].fd = socket_id_;
events[0].events = POLLIN;
events[1].fd = notify_rd_id_;
events[1].events = POLLIN;
// Loop forever.
while(wslay_event_want_read(ctx) || wslay_event_want_write(ctx)) {
while (wslay_event_want_read(ctx) || wslay_event_want_write(ctx)) {
// Wait on the event.
while ((r = poll(events, XECOUNT(events), -1)) == -1 && errno == EINTR);
if (r == -1) {
if (xe_socket_loop_poll(loop_,
wslay_event_want_read(ctx),
wslay_event_want_write(ctx))) {
break;
}
// Handle any self-generated events to queue messages.
if (events[1].revents) {
uint8_t dummy;
XEIGNORE(recv(notify_rd_id_, &dummy, 1, 0));
if (xe_socket_loop_check_queued_write(loop_)) {
xe_mutex_lock(mutex_);
for (std::vector<struct wslay_event_msg>::iterator it =
pending_messages_.begin(); it != pending_messages_.end(); it++) {
@ -327,27 +294,15 @@ void WsClient::EventThread() {
}
pending_messages_.clear();
xe_mutex_unlock(mutex_);
events[1].revents = 0;
events[1].events = POLL_IN;
}
// Handle websocket messages.
struct pollfd* event = &events[0];
if (((event->revents & POLLIN) && wslay_event_recv(ctx)) ||
((event->revents & POLLOUT) && wslay_event_send(ctx)) ||
((event->revents & (POLLERR | POLLHUP | POLLNVAL)))) {
if ((xe_socket_loop_check_socket_recv(loop_) && wslay_event_recv(ctx)) ||
(xe_socket_loop_check_socket_send(loop_) && wslay_event_send(ctx))) {
// Error handling the event.
XELOGE(XT("Error handling WebSocket data"));
break;
}
event->revents = 0;
event->events = 0;
if (wslay_event_want_read(ctx)) {
event->events |= POLLIN;
}
if (wslay_event_want_write(ctx)) {
event->events |= POLLOUT;
}
}
wslay_event_context_free(ctx);
@ -394,7 +349,6 @@ void WsClient::Write(uint8_t** buffers, size_t* lengths, size_t count) {
if (needs_signal) {
// Notify the poll().
uint8_t b = 0xFF;
write(notify_wr_id_, &b, 1);
xe_socket_loop_set_queued_write(loop_);
}
}

View File

@ -30,7 +30,7 @@ public:
WsClient(Debugger* debugger, int socket_id);
virtual ~WsClient();
int socket_id();
socket_t socket_id();
virtual int Setup();
@ -44,12 +44,9 @@ private:
xe_thread_ref thread_;
int socket_id_;
int notify_rd_id_;
int notify_wr_id_;
xe_mutex_t* mutex_;
socket_t socket_id_;
xe_socket_loop_t* loop_;
xe_mutex_t* mutex_;
std::vector<struct wslay_event_msg> pending_messages_;
};

View File

@ -9,10 +9,6 @@
#include <xenia/dbg/ws_listener.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <xenia/dbg/ws_client.h>
@ -28,40 +24,29 @@ WsListener::WsListener(Debugger* debugger, xe_pal_ref pal, uint32_t port) :
WsListener::~WsListener() {
if (socket_id_) {
close(socket_id_);
xe_socket_close(socket_id_);
}
}
int WsListener::Setup() {
socket_id_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket_id_ < 1) {
xe_socket_init();
socket_id_ = xe_socket_create_tcp();
if (socket_id_ == XE_INVALID_SOCKET) {
return 1;
}
int opt_value;
opt_value = 1;
setsockopt(socket_id_, SOL_SOCKET, SO_KEEPALIVE,
&opt_value, sizeof(opt_value));
opt_value = 1;
setsockopt(socket_id_, SOL_SOCKET, SO_REUSEADDR,
&opt_value, sizeof(opt_value));
opt_value = 0;
setsockopt(socket_id_, IPPROTO_TCP, TCP_NODELAY,
&opt_value, sizeof(opt_value));
xe_socket_set_keepalive(socket_id_, true);
xe_socket_set_reuseaddr(socket_id_, true);
xe_socket_set_nodelay(socket_id_, true);
struct sockaddr_in socket_addr;
socket_addr.sin_family = AF_INET;
socket_addr.sin_addr.s_addr = htonl(INADDR_ANY);
socket_addr.sin_port = htons(port_);
int r = bind(socket_id_, (struct sockaddr*)&socket_addr,
sizeof(socket_addr));
if (r < 0) {
if (xe_socket_bind(socket_id_, port_)) {
XELOGE(XT("Could not bind listen socket: %d"), errno);
return 1;
}
if (listen(socket_id_, 5) < 0) {
close(socket_id_);
if (xe_socket_listen(socket_id_)) {
xe_socket_close(socket_id_);
return 1;
}
@ -70,22 +55,16 @@ int WsListener::Setup() {
int WsListener::WaitForClient() {
// Accept the first connection we get.
struct sockaddr_in client_addr;
socklen_t client_count = sizeof(client_addr);
int client_socket_id = accept(socket_id_, (struct sockaddr*)&client_addr,
&client_count);
if (client_socket_id < 0) {
xe_socket_connection_t client_info;
if (xe_socket_accept(socket_id_, &client_info)) {
return 1;
}
int client_ip = client_addr.sin_addr.s_addr;
char client_ip_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &client_ip, client_ip_str, XECOUNT(client_ip_str));
XELOGI(XT("Debugger connected from %s"), client_ip_str);
XELOGI(XT("Debugger connected from %s"), client_info.addr);
// Create the client object.
// Note that the client will delete itself when done.
WsClient* client = new WsClient(debugger_, client_socket_id);
WsClient* client = new WsClient(debugger_, client_info.socket);
if (client->Setup()) {
// Client failed to setup - abort.
return 1;

View File

@ -34,7 +34,7 @@ public:
protected:
uint32_t port_;
int socket_id_;
socket_t socket_id_;
};