From 5650cf92ab0328a82179eeb6876782b6404b5e3b Mon Sep 17 00:00:00 2001 From: Ben Vanik Date: Sat, 2 Feb 2013 02:50:56 -0800 Subject: [PATCH] Initial the websocket implementation. Not yet running on other threads, but can read/write to a websocket from a browser. --- .gitmodules | 6 + common.gypi | 19 +- include/xenia/dbg/client.h | 7 +- src/dbg/client.cc | 6 +- src/dbg/ws_client.cc | 366 +++++++++++++++++++++++++++++++++++- src/dbg/ws_client.h | 25 ++- src/dbg/ws_listener.cc | 19 +- third_party/gflags.gypi | 2 +- third_party/openssl | 1 + third_party/sparsehash.gypi | 2 +- third_party/wslay | 1 + third_party/wslay.gypi | 52 +++++ xenia-build.py | 1 + xenia.gyp | 13 +- 14 files changed, 492 insertions(+), 28 deletions(-) create mode 160000 third_party/openssl create mode 160000 third_party/wslay create mode 100644 third_party/wslay.gypi diff --git a/.gitmodules b/.gitmodules index c2d9714c4..2bc3d0ba8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,9 @@ [submodule "third_party/sparsehash"] path = third_party/sparsehash url = https://github.com/benvanik/sparsehash.git +[submodule "third_party/wslay"] + path = third_party/wslay + url = https://github.com/benvanik/wslay.git +[submodule "third_party/openssl"] + path = third_party/openssl + url = https://github.com/benvanik/openssl.git diff --git a/common.gypi b/common.gypi index 3bec009e0..fec045922 100644 --- a/common.gypi +++ b/common.gypi @@ -2,6 +2,14 @@ { 'default_configuration': 'release', + 'conditions': [ + ['OS == "mac"', { + 'variables': { + 'is_clang': 1 + } + }], + ], + 'variables': { 'configurations': { 'debug': { @@ -10,6 +18,9 @@ }, }, + 'library%': 'static_library', + 'target_arch%': 'x64', + # LLVM paths. # TODO(benvanik): switch based on configuration. 'llvm_path': 'build/llvm/release/', @@ -64,7 +75,6 @@ ], }, - 'target_defaults': { 'include_dirs': [ 'include/', @@ -74,6 +84,9 @@ '__STDC_LIMIT_MACROS=1', '__STDC_CONSTANT_MACROS=1', '_ISOC99_SOURCE=1', + + 'OPENSSL_NO_INLINE_ASM', + 'OPENSSL_NO_NEXTPROTONEG', ], 'cflags': [ '-std=c99', @@ -134,9 +147,9 @@ 'ARCHS': ['x86_64'], #'CLANG_CXX_LANGUAGE_STANDARD': 'c++0x', 'COMBINE_HIDPI_IMAGES': 'YES', - 'GCC_C_LANGUAGE_STANDARD': 'c99', + 'GCC_C_LANGUAGE_STANDARD': 'gnu99', 'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', - 'GCC_TREAT_WARNINGS_AS_ERRORS': 'YES', + #'GCC_TREAT_WARNINGS_AS_ERRORS': 'YES', 'GCC_WARN_ABOUT_MISSING_NEWLINE': 'YES', 'GCC_VERSION': 'com.apple.compilers.llvm.clang.1_0', 'WARNING_CFLAGS': ['-Wall', '-Wendif-labels'], diff --git a/include/xenia/dbg/client.h b/include/xenia/dbg/client.h index 8853c3a20..e031c2bbc 100644 --- a/include/xenia/dbg/client.h +++ b/include/xenia/dbg/client.h @@ -23,9 +23,10 @@ public: Client(); virtual ~Client(); - void Write(const uint8_t* buffer, const size_t length); - virtual void Write(const uint8_t** buffers, const size_t* lengths, - size_t count) = 0; + virtual int Setup() = 0; + + void Write(uint8_t* buffer, size_t length); + virtual void Write(uint8_t** buffers, size_t* lengths, size_t count) = 0; protected: }; diff --git a/src/dbg/client.cc b/src/dbg/client.cc index 0ea6e0dd2..813dea104 100644 --- a/src/dbg/client.cc +++ b/src/dbg/client.cc @@ -20,8 +20,8 @@ Client::Client() { Client::~Client() { } -void Client::Write(const uint8_t* buffer, const size_t length) { - const uint8_t* buffers[] = {buffer}; - const size_t lengths[] = {length}; +void Client::Write(uint8_t* buffer, size_t length) { + uint8_t* buffers[] = {buffer}; + size_t lengths[] = {length}; Write(buffers, lengths, 1); } diff --git a/src/dbg/ws_client.cc b/src/dbg/ws_client.cc index a8b8e9b08..65f2399cc 100644 --- a/src/dbg/ws_client.cc +++ b/src/dbg/ws_client.cc @@ -9,6 +9,18 @@ #include "dbg/ws_client.h" +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + using namespace xe; using namespace xe::dbg; @@ -17,13 +29,361 @@ using namespace xe::dbg; WsClient::WsClient(int socket_id) : Client(), socket_id_(socket_id) { + mutex_ = xe_mutex_alloc(1000); + + int notify_ids[2]; + socketpair(PF_LOCAL, SOCK_STREAM, 0, notify_ids); + notify_rd_id_ = notify_ids[0]; + notify_wr_id_ = notify_ids[1]; } WsClient::~WsClient() { + xe_mutex_t* mutex = mutex_; + xe_mutex_lock(mutex); + mutex_ = NULL; + shutdown(socket_id_, SHUT_WR); close(socket_id_); + socket_id_ = 0; + xe_mutex_unlock(mutex); + xe_mutex_free(mutex); + + close(notify_rd_id_); + close(notify_wr_id_); } -void WsClient::Write(const uint8_t** buffers, const size_t* lengths, - size_t count) { - // +int WsClient::socket_id() { + return socket_id_; +} + +int WsClient::Setup() { + // Prep the socket. + int opt_value; + opt_value = 1; + setsockopt(socket_id_, SOL_SOCKET, SO_KEEPALIVE, + &opt_value, sizeof(opt_value)); + opt_value = 1; + setsockopt(socket_id_, IPPROTO_TCP, TCP_NODELAY, + &opt_value, sizeof(opt_value)); + + // launch thread/etc + EventThread(); + + return 0; +} + +namespace { + +ssize_t WsClientSendCallback(wslay_event_context_ptr ctx, + const uint8_t* data, size_t len, int flags, + void* user_data) { + WsClient* client = reinterpret_cast(user_data); + + int sflags = 0; +#ifdef MSG_MORE + if (flags & WSLAY_MSG_MORE) { + sflags |= MSG_MORE; + } +#endif // MSG_MORE + + ssize_t r; + while ((r = send(client->socket_id(), data, len, sflags)) == -1 && + errno == EINTR); + if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + } else { + wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); + } + } + return r; +} + +ssize_t WsClientRecvCallback(wslay_event_context_ptr ctx, + uint8_t* data, size_t len, int flags, + void* user_data) { + WsClient* client = reinterpret_cast(user_data); + ssize_t r; + while ((r = recv(client->socket_id(), data, len, 0)) == -1 && + errno == EINTR); + if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + } else { + wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); + } + } else if (r == 0) { + wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); + r = -1; + } + return r; +} + +void WsClientOnMsgCallback(wslay_event_context_ptr ctx, + const struct wslay_event_on_msg_recv_arg* arg, + void* user_data) { + if (wslay_is_ctrl_frame(arg->opcode)) { + // Ignore control frames. + return; + } + + switch (arg->opcode) { + case WSLAY_TEXT_FRAME: + // arg->msg, arg->msg_length + break; + case WSLAY_BINARY_FRAME: + // arg->msg, arg->msg_length + break; + default: + // Unknown opcode - some frame stuff? + break; + } + + // TODO(benvanik): dispatch + + // WsClient* client = reinterpret_cast(user_data); + // char hello[] = "HELLO"; + // size_t len = sizeof(hello); + // uint8_t* bs[] = {(uint8_t*)&hello}; + // size_t lens[] = {len}; + // client->Write((uint8_t**)bs, lens, 1); + // printf("on msg %d, %ld\n", arg->opcode, arg->msg_length); +} + +std::string EncodeBase64(const uint8_t* input, size_t length) { + // Good god what a horrible API. + BIO* b64 = BIO_new(BIO_f_base64()); + BIO* bmem = BIO_new(BIO_s_mem()); + b64 = BIO_push(b64, bmem); + BIO_write(b64, input, length); + XEIGNORE(BIO_flush(b64)); + BUF_MEM* bptr; + BIO_get_mem_ptr(b64, &bptr); + std::string result(bptr->data, bptr->length); + BIO_free_all(b64); + // Strip the last character, which is a \n. + result.erase(result.size() - 1); + return result; +} + +} + +int WsClient::PerformHandshake() { + std::string headers; + char buffer[4096]; + ssize_t r; + while (true) { + while ((r = read(socket_id_, buffer, sizeof(buffer))) == -1 && + errno == EINTR); + if (r == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + if (!headers.size()) { + // Nothing read yet - spin. + continue; + } + break; + } else { + XELOGE(XT("HTTP header read failure")); + return 1; + } + } else if (r == 0) { + // EOF. + XELOGE(XT("HTTP header EOF")); + return 2; + } else { + headers.append(buffer, buffer + r); + if (headers.size() > 8192) { + XELOGE(XT("HTTP headers exceeded max buffer size")); + return 3; + } + } + } + + if (headers.find("\r\n\r\n") == std::string::npos) { + XELOGE(XT("Incomplete HTTP headers: %s"), headers.c_str()); + return 1; + } + + // Parse the headers to verify its a websocket request. + std::string::size_type keyhdstart; + if (headers.find("Upgrade: websocket\r\n") == std::string::npos || + headers.find("Connection: Upgrade\r\n") == std::string::npos || + (keyhdstart = headers.find("Sec-WebSocket-Key: ")) == + std::string::npos) { + XELOGW(XT("HTTP connection does not contain websocket headers")); + return 2; + } + keyhdstart += 19; + std::string::size_type keyhdend = headers.find("\r\n", keyhdstart); + std::string client_key = headers.substr(keyhdstart, keyhdend - keyhdstart); + std::string accept_key = client_key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + uint8_t accept_sha[20]; + SHA1((uint8_t*)accept_key.c_str(), accept_key.size(), accept_sha); + accept_key = EncodeBase64(accept_sha, sizeof(accept_sha)); + + // Write the response to upgrade the connection. + std::string response = + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: " + accept_key + "\r\n" + "\r\n"; + size_t write_offset = 0; + size_t write_length = response.size(); + while (true) { + while ((r = write(socket_id_, response.c_str() + write_offset, + write_length)) == -1 && errno == EINTR); + if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } else { + XELOGE(XT("HTTP response write failure")); + return 4; + } + } else { + write_offset += r; + write_length -= r; + if (!write_length) { + break; + } + } + } + + return 0; +} + +void WsClient::EventThread() { + int r; + + // Enable non-blocking IO on the socket. + int flags; + while ((flags = fcntl(socket_id_, F_GETFL, 0)) == -1 && + errno == EINTR); + if (flags == -1) { + return; + } + while ((r = fcntl(socket_id_, F_SETFL, flags | O_NONBLOCK)) == -1 && + errno == EINTR); + if (r == -1) { + return; + } + + // First run the HTTP handshake. + // This will fail if the connection is not for websockets. + if (PerformHandshake()) { + return; + } + + // Prep callbacks. + struct wslay_event_callbacks callbacks = { + WsClientRecvCallback, + WsClientSendCallback, + NULL, + NULL, + NULL, + NULL, + WsClientOnMsgCallback, + }; + + // Prep the websocket server context. + wslay_event_context_ptr ctx; + wslay_event_context_server_init(&ctx, &callbacks, this); + + // Event for waiting on input. + struct pollfd events[2]; + xe_zero_struct(&events, sizeof(events)); + events[0].fd = socket_id_; + events[0].events = POLLIN; + events[1].fd = notify_rd_id_; + events[1].events = POLLIN; + + // Loop forever. + while(wslay_event_want_read(ctx) || wslay_event_want_write(ctx)) { + // Wait on the event. + while ((r = poll(events, XECOUNT(events), -1)) == -1 && errno == EINTR); + if (r == -1) { + break; + } + + // Handle any self-generated events to queue messages. + if (events[1].revents) { + uint8_t dummy; + XEIGNORE(recv(notify_rd_id_, &dummy, 1, 0)); + xe_mutex_lock(mutex_); + for (std::vector::iterator it = + pending_messages_.begin(); it != pending_messages_.end(); it++) { + struct wslay_event_msg* msg = &*it; + wslay_event_queue_msg(ctx, msg); + } + pending_messages_.clear(); + xe_mutex_unlock(mutex_); + events[1].revents = 0; + events[1].events = POLL_IN; + } + + // Handle websocket messages. + struct pollfd* event = &events[0]; + if (((event->revents & POLLIN) && wslay_event_recv(ctx)) || + ((event->revents & POLLOUT) && wslay_event_send(ctx)) || + ((event->revents & (POLLERR | POLLHUP | POLLNVAL)))) { + // Error handling the event. + XELOGE(XT("Error handling WebSocket data")); + break; + } + event->revents = 0; + event->events = 0; + if (wslay_event_want_read(ctx)) { + event->events |= POLLIN; + } + if (wslay_event_want_write(ctx)) { + event->events |= POLLOUT; + } + } + + wslay_event_context_free(ctx); +} + +void WsClient::Write(uint8_t** buffers, size_t* lengths, size_t count) { + if (!count) { + return; + } + + size_t combined_length; + uint8_t* combined_buffer; + if (count == 1) { + // Single buffer, just copy. + combined_length = lengths[0]; + combined_buffer = (uint8_t*)xe_malloc(lengths[0]); + XEIGNORE(xe_copy_memory(combined_buffer, combined_length, + buffers[0], lengths[0])); + } else { + // Multiple buffers, merge. + combined_length = 0; + for (size_t n = 0; n < count; n++) { + combined_length += lengths[n]; + } + combined_buffer = (uint8_t*)xe_malloc(combined_length); + for (size_t n = 0, offset = 0; n < count; n++) { + XEIGNORE(xe_copy_memory( + combined_buffer + offset, combined_length - offset, + buffers[n], lengths[n])); + offset += lengths[n]; + } + } + + struct wslay_event_msg msg = { + WSLAY_BINARY_FRAME, + combined_buffer, + combined_length, + }; + + xe_mutex_lock(mutex_); + pending_messages_.push_back(msg); + bool needs_signal = pending_messages_.size() == 1; + xe_mutex_unlock(mutex_); + + if (needs_signal) { + // Notify the poll(). + uint8_t b = 0xFF; + write(notify_wr_id_, &b, 1); + } } diff --git a/src/dbg/ws_client.h b/src/dbg/ws_client.h index 76ed73508..c5730a77c 100644 --- a/src/dbg/ws_client.h +++ b/src/dbg/ws_client.h @@ -13,9 +13,14 @@ #include #include +#include + #include +struct wslay_event_msg; + + namespace xe { namespace dbg { @@ -25,11 +30,23 @@ public: WsClient(int socket_id); virtual ~WsClient(); - virtual void Write(const uint8_t** buffers, const size_t* lengths, - size_t count); + int socket_id(); -protected: - int socket_id_; + virtual int Setup(); + + virtual void Write(uint8_t** buffers, size_t* lengths, size_t count); + +private: + int PerformHandshake(); + void EventThread(); + + int socket_id_; + + int notify_rd_id_; + int notify_wr_id_; + xe_mutex_t* mutex_; + + std::vector pending_messages_; }; diff --git a/src/dbg/ws_listener.cc b/src/dbg/ws_listener.cc index 9e48812b0..1f200c4ed 100644 --- a/src/dbg/ws_listener.cc +++ b/src/dbg/ws_listener.cc @@ -42,6 +42,9 @@ int WsListener::Setup() { opt_value = 1; setsockopt(socket_id_, SOL_SOCKET, SO_KEEPALIVE, &opt_value, sizeof(opt_value)); + opt_value = 1; + setsockopt(socket_id_, SOL_SOCKET, SO_REUSEADDR, + &opt_value, sizeof(opt_value)); opt_value = 0; setsockopt(socket_id_, IPPROTO_TCP, TCP_NODELAY, &opt_value, sizeof(opt_value)); @@ -50,8 +53,10 @@ int WsListener::Setup() { socket_addr.sin_family = AF_INET; socket_addr.sin_addr.s_addr = htonl(INADDR_ANY); socket_addr.sin_port = htons(port_); - if (bind(socket_id_, (struct sockaddr*)&socket_addr, - sizeof(socket_addr)) < 0) { + int r = bind(socket_id_, (struct sockaddr*)&socket_addr, + sizeof(socket_addr)); + if (r < 0) { + XELOGE(XT("Could not bind listen socket: %d"), errno); return 1; } @@ -78,9 +83,13 @@ int WsListener::WaitForClient() { inet_ntop(AF_INET, &client_ip, client_ip_str, XECOUNT(client_ip_str)); XELOGI(XT("Debugger connected from %s"), client_ip_str); - //WsClient* client = new WsClient(client_socket_id); - - // TODO(benvanik): add to list for cleanup + // Create the client object. + // Note that the client will delete itself when done. + WsClient* client = new WsClient(client_socket_id); + if (client->Setup()) { + // Client failed to setup - abort. + return 1; + } return 0; } diff --git a/third_party/gflags.gypi b/third_party/gflags.gypi index 477b78b7c..a5984c2ff 100644 --- a/third_party/gflags.gypi +++ b/third_party/gflags.gypi @@ -3,7 +3,7 @@ 'targets': [ { 'target_name': 'gflags', - 'type': 'static_library', + 'type': '<(library)', 'direct_dependent_settings': { 'conditions': [ diff --git a/third_party/openssl b/third_party/openssl new file mode 160000 index 000000000..d1b1f821b --- /dev/null +++ b/third_party/openssl @@ -0,0 +1 @@ +Subproject commit d1b1f821ba7b0203e065c26a12b341eac382514d diff --git a/third_party/sparsehash.gypi b/third_party/sparsehash.gypi index b9f124813..ec611db8e 100644 --- a/third_party/sparsehash.gypi +++ b/third_party/sparsehash.gypi @@ -3,7 +3,7 @@ 'targets': [ { 'target_name': 'sparsehash', - 'type': 'static_library', + 'type': '<(library)', 'direct_dependent_settings': { 'include_dirs': [ diff --git a/third_party/wslay b/third_party/wslay new file mode 160000 index 000000000..d2caaf13d --- /dev/null +++ b/third_party/wslay @@ -0,0 +1 @@ +Subproject commit d2caaf13db958249d3878926238f1096a459a11b diff --git a/third_party/wslay.gypi b/third_party/wslay.gypi new file mode 100644 index 000000000..1a12f32f2 --- /dev/null +++ b/third_party/wslay.gypi @@ -0,0 +1,52 @@ +# Copyright 2013 Ben Vanik. All Rights Reserved. +{ + 'targets': [ + { + 'target_name': 'wslay', + 'type': '<(library)', + + 'direct_dependent_settings': { + 'include_dirs': [ + 'wslay/lib/includes/', + ], + + 'defines': [ + 'WSLAY_VERSION=1', + ], + + # libraries: ws2_32 on windows + }, + + 'defines': [ + 'WSLAY_VERSION="1"', + ], + + 'conditions': [ + ['OS != "win"', { + 'defines': [ + 'HAVE_ARPA_INET_H=1', + 'HAVE_NETINET_IN_H=1', + ], + }], + ['OS == "win"', { + 'defines': [ + 'HAVE_WINSOCK2_H=1', + ], + }], + ], + + 'include_dirs': [ + 'wslay/lib/', + 'wslay/lib/includes/', + ], + + 'sources': [ + 'wslay/lib/wslay_event.c', + 'wslay/lib/wslay_frame.c', + 'wslay/lib/wslay_net.c', + 'wslay/lib/wslay_queue.c', + 'wslay/lib/wslay_stack.c', + ], + } + ] +} diff --git a/xenia-build.py b/xenia-build.py index 7558914b2..ac6349985 100755 --- a/xenia-build.py +++ b/xenia-build.py @@ -348,6 +348,7 @@ def run_gyp(format): """ shell_call(' '.join([ 'gyp', + '--include=common.gypi', '-f %s' % (format), # Set the VS version. # TODO(benvanik): allow user to set? diff --git a/xenia.gyp b/xenia.gyp index 109177c92..0351f243e 100644 --- a/xenia.gyp +++ b/xenia.gyp @@ -1,23 +1,26 @@ # Copyright 2013 Ben Vanik. All Rights Reserved. { 'includes': [ - 'common.gypi', 'tools/tools.gypi', 'third_party/gflags.gypi', 'third_party/sparsehash.gypi', + 'third_party/wslay.gypi', ], 'targets': [ { 'target_name': 'xeniacore', 'product_name': 'xeniacore', - 'type': 'static_library', + 'type': '<(library)', 'dependencies': [ 'gflags', + 'wslay', + './third_party/openssl/openssl.gyp:openssl', ], 'export_dependent_settings': [ 'gflags', + 'wslay', ], 'direct_dependent_settings': { @@ -41,7 +44,7 @@ { 'target_name': 'xeniakernel', 'product_name': 'xeniakernel', - 'type': 'static_library', + 'type': '<(library)', 'dependencies': [ 'xeniacore', @@ -63,7 +66,7 @@ { 'target_name': 'xeniacpu', 'product_name': 'xeniacpu', - 'type': 'static_library', + 'type': '<(library)', 'dependencies': [ 'xeniacore', @@ -121,7 +124,7 @@ { 'target_name': 'xeniagpu', 'product_name': 'xeniagpu', - 'type': 'static_library', + 'type': '<(library)', 'dependencies': [ 'xeniacore',