From 59566eb363a08c51296068717fc1371a1321b101 Mon Sep 17 00:00:00 2001 From: Flyinghead Date: Mon, 18 Oct 2021 16:26:24 +0200 Subject: [PATCH] ggpo: support application messages --- core/deps/ggpo/include/ggponet.h | 18 ++++++++++++ core/deps/ggpo/lib/ggpo/backends/backend.h | 1 + core/deps/ggpo/lib/ggpo/backends/p2p.cpp | 27 +++++++++++++++++ core/deps/ggpo/lib/ggpo/backends/p2p.h | 1 + .../deps/ggpo/lib/ggpo/backends/spectator.cpp | 7 +++++ core/deps/ggpo/lib/ggpo/main.cpp | 10 +++++++ core/deps/ggpo/lib/ggpo/network/udp_msg.h | 19 ++++++++---- core/deps/ggpo/lib/ggpo/network/udp_proto.cpp | 29 +++++++++++++++++++ core/deps/ggpo/lib/ggpo/network/udp_proto.h | 10 ++++++- 9 files changed, 116 insertions(+), 6 deletions(-) diff --git a/core/deps/ggpo/include/ggponet.h b/core/deps/ggpo/include/ggponet.h index cfa7d4f0e..29eba49c1 100644 --- a/core/deps/ggpo/include/ggponet.h +++ b/core/deps/ggpo/include/ggponet.h @@ -259,6 +259,12 @@ typedef struct { * structure above for more information. */ bool (__cdecl *on_event)(GGPOEvent *info); + + /* + * on_message - Delivers an application message. Allows arbitrary data to be + * exchanged with peers. This callback is optional. + */ + void (__cdecl *on_message)(unsigned char *msg, int len); } GGPOSessionCallbacks; /* @@ -593,6 +599,18 @@ GGPO_API void __cdecl ggpo_logv(GGPOSession *, const char *fmt, va_list args); +/* + * ggpo_send_message -- + * + * Sends application-specific data to other peers. The message is + * delivered by the on_user_message callback. + * If spectators is true, the message is also sent to spectators. + */ +GGPO_API GGPOErrorCode __cdecl ggpo_send_message(GGPOSession *, + const void *msg, + int len, + bool spectators); + #ifdef __cplusplus }; #endif diff --git a/core/deps/ggpo/lib/ggpo/backends/backend.h b/core/deps/ggpo/lib/ggpo/backends/backend.h index 946d03997..0f1921b14 100644 --- a/core/deps/ggpo/lib/ggpo/backends/backend.h +++ b/core/deps/ggpo/lib/ggpo/backends/backend.h @@ -26,6 +26,7 @@ struct GGPOSession { virtual GGPOErrorCode SetFrameDelay(GGPOPlayerHandle player, int delay) { return GGPO_ERRORCODE_UNSUPPORTED; } virtual GGPOErrorCode SetDisconnectTimeout(int timeout) { return GGPO_ERRORCODE_UNSUPPORTED; } virtual GGPOErrorCode SetDisconnectNotifyStart(int timeout) { return GGPO_ERRORCODE_UNSUPPORTED; } + virtual GGPOErrorCode SendMessage(const void *msg, int len, bool spectators) { return GGPO_ERRORCODE_UNSUPPORTED; } }; typedef struct GGPOSession Quark, IQuarkBackend; /* XXX: nuke this */ diff --git a/core/deps/ggpo/lib/ggpo/backends/p2p.cpp b/core/deps/ggpo/lib/ggpo/backends/p2p.cpp index e7f17516e..57628da26 100644 --- a/core/deps/ggpo/lib/ggpo/backends/p2p.cpp +++ b/core/deps/ggpo/lib/ggpo/backends/p2p.cpp @@ -392,6 +392,16 @@ Peer2PeerBackend::OnUdpProtocolPeerEvent(UdpProtocol::Event &evt, int queue) DisconnectPlayer(QueueToPlayerHandle(queue)); break; + case UdpProtocol::Event::AppData: + if (evt.u.app_data.spectators) + for (int i = 0; i < _num_spectators; i++) + if (_spectators[i].IsInitialized()) + _spectators[i].SendAppData(evt.u.app_data.data, evt.u.app_data.size, true); + Log("calling on_message callback with %d bytes", evt.u.app_data.size); + if (_callbacks.on_message != nullptr) + _callbacks.on_message(evt.u.app_data.data, evt.u.app_data.size); + break; + default: break; } @@ -635,3 +645,20 @@ Peer2PeerBackend::CheckInitialSync() _synchronizing = false; } } + +GGPOErrorCode Peer2PeerBackend::SendMessage(const void *msg, int len, bool spectators) +{ + if (_synchronizing) + return GGPO_ERRORCODE_NOT_SYNCHRONIZED; + if (len > MAX_APPDATA_SIZE) + return GGPO_ERRORCODE_INVALID_REQUEST; + for (int i = 0; i < _num_players; i++) + if (_endpoints[i].IsInitialized()) + _endpoints[i].SendAppData(msg, len, spectators); + if (spectators) + for (int i = 0; i < _num_spectators; i++) + if (_spectators[i].IsInitialized()) + _spectators[i].SendAppData(msg, len, spectators); + + return GGPO_ERRORCODE_SUCCESS; +} diff --git a/core/deps/ggpo/lib/ggpo/backends/p2p.h b/core/deps/ggpo/lib/ggpo/backends/p2p.h index 83e4acf0c..4ddd77c8d 100644 --- a/core/deps/ggpo/lib/ggpo/backends/p2p.h +++ b/core/deps/ggpo/lib/ggpo/backends/p2p.h @@ -33,6 +33,7 @@ public: virtual GGPOErrorCode SetFrameDelay(GGPOPlayerHandle player, int delay); virtual GGPOErrorCode SetDisconnectTimeout(int timeout); virtual GGPOErrorCode SetDisconnectNotifyStart(int timeout); + GGPOErrorCode SendMessage(const void *msg, int len, bool spectators) override; public: virtual void OnMsg(sockaddr_in &from, UdpMsg *msg, int len); diff --git a/core/deps/ggpo/lib/ggpo/backends/spectator.cpp b/core/deps/ggpo/lib/ggpo/backends/spectator.cpp index 91ab3cb6f..df65dd410 100644 --- a/core/deps/ggpo/lib/ggpo/backends/spectator.cpp +++ b/core/deps/ggpo/lib/ggpo/backends/spectator.cpp @@ -167,6 +167,13 @@ SpectatorBackend::OnUdpProtocolEvent(UdpProtocol::Event &evt) } break; + case UdpProtocol::Event::AppData: + Log("calling on_message callback with %d bytes", evt.u.app_data.size); + if (_callbacks.on_message != nullptr) + _callbacks.on_message(evt.u.app_data.data, evt.u.app_data.size); + break; + + default: break; } diff --git a/core/deps/ggpo/lib/ggpo/main.cpp b/core/deps/ggpo/lib/ggpo/main.cpp index dc424c623..7a9e43b3e 100644 --- a/core/deps/ggpo/lib/ggpo/main.cpp +++ b/core/deps/ggpo/lib/ggpo/main.cpp @@ -277,3 +277,13 @@ GGPOErrorCode ggpo_start_spectating(GGPOSession **session, } } +GGPOErrorCode ggpo_send_message(GGPOSession *ggpo, + const void *msg, + int len, + bool spectators) +{ + if (ggpo == nullptr) + return GGPO_ERRORCODE_INVALID_SESSION; + return ggpo->SendMessage(msg, len, spectators); +} + diff --git a/core/deps/ggpo/lib/ggpo/network/udp_msg.h b/core/deps/ggpo/lib/ggpo/network/udp_msg.h index 70304188d..e113a371d 100644 --- a/core/deps/ggpo/lib/ggpo/network/udp_msg.h +++ b/core/deps/ggpo/lib/ggpo/network/udp_msg.h @@ -11,6 +11,7 @@ #define MAX_COMPRESSED_BITS 4096 #define UDP_MSG_MAX_PLAYERS 4 #define MAX_VERIFICATION_SIZE 256 +#define MAX_APPDATA_SIZE 512 #pragma pack(push, 1) @@ -25,6 +26,7 @@ struct UdpMsg QualityReply = 5, KeepAlive = 6, InputAck = 7, + AppData = 8 }; struct connect_status { @@ -76,6 +78,12 @@ struct UdpMsg int ack_frame:31; } input_ack; + struct { + uint16 size; + uint8 spectators; + uint8 data[MAX_APPDATA_SIZE]; + } app_data; + } u; int verification_size = 0; @@ -86,8 +94,6 @@ public: } int PayloadSize() { - int size; - switch (hdr.type) { case SyncRequest: return (int)(&u.sync_request.verification[0] - (uint8 *)&u) + verification_size; case SyncReply: return sizeof(u.sync_reply); @@ -96,9 +102,12 @@ public: case InputAck: return sizeof(u.input_ack); case KeepAlive: return 0; case Input: - size = (int)((char *)&u.input.bits - (char *)&u.input); - size += (u.input.num_bits + 7) / 8; - return size; + { + int size = (int)((char *)&u.input.bits - (char *)&u.input); + size += (u.input.num_bits + 7) / 8; + return size; + } + case AppData: return sizeof(u.app_data) - sizeof(u.app_data.data) + u.app_data.size; } ASSERT(false); return 0; diff --git a/core/deps/ggpo/lib/ggpo/network/udp_proto.cpp b/core/deps/ggpo/lib/ggpo/network/udp_proto.cpp index 2765ace94..d1e4d57f0 100644 --- a/core/deps/ggpo/lib/ggpo/network/udp_proto.cpp +++ b/core/deps/ggpo/lib/ggpo/network/udp_proto.cpp @@ -328,6 +328,7 @@ UdpProtocol::OnMsg(UdpMsg *msg, int len) &UdpProtocol::OnQualityReply, /* QualityReply */ &UdpProtocol::OnKeepAlive, /* KeepAlive */ &UdpProtocol::OnInputAck, /* InputAck */ + &UdpProtocol::OnAppData, /* AppData */ }; // filter out messages that don't match what we expect @@ -458,6 +459,9 @@ UdpProtocol::LogMsg(const char *prefix, UdpMsg *msg) case UdpMsg::InputAck: Log("%s input ack.\n", prefix); break; + case UdpMsg::AppData: + Log("%s app data (%d bytes).\n", prefix, msg->u.app_data.size); + break; default: ASSERT(false && "Unknown UdpMsg type."); } @@ -794,3 +798,28 @@ UdpProtocol::ClearSendQueue() _send_queue.pop(); } } + +void UdpProtocol::SendAppData(const void *data, int len, bool spectators) +{ + if (_udp == nullptr) + return; + if (_current_state != Synchronzied && _current_state != Running) + return; + + UdpMsg *msg = new UdpMsg(UdpMsg::AppData); + msg->u.app_data.spectators = spectators; + msg->u.app_data.size = len; + memcpy(msg->u.app_data.data, data, len); + + SendMsg(msg); +} + +bool UdpProtocol::OnAppData(UdpMsg *msg, int len) +{ + UdpProtocol::Event evt(UdpProtocol::Event::AppData); + evt.u.app_data.spectators = msg->u.app_data.spectators != 0; + evt.u.app_data.size = msg->u.app_data.size; + memcpy(evt.u.app_data.data, msg->u.app_data.data, msg->u.app_data.size); + QueueEvent(evt); + return true; +} diff --git a/core/deps/ggpo/lib/ggpo/network/udp_proto.h b/core/deps/ggpo/lib/ggpo/network/udp_proto.h index ad3bf8cff..054ba762d 100644 --- a/core/deps/ggpo/lib/ggpo/network/udp_proto.h +++ b/core/deps/ggpo/lib/ggpo/network/udp_proto.h @@ -38,6 +38,7 @@ public: Disconnected, NetworkInterrupted, NetworkResumed, + AppData, }; Type type; @@ -52,6 +53,11 @@ public: struct { int disconnect_timeout; } network_interrupted; + struct { + bool spectators; + int size; + uint8 data[MAX_APPDATA_SIZE]; + } app_data; } u; Event(Type t = Unknown) : type(t) { } @@ -76,6 +82,7 @@ public: bool HandlesMsg(sockaddr_in &from, UdpMsg *msg); void OnMsg(UdpMsg *msg, int len); void Disconnect(); + void SendAppData(const void *data, int len, bool spectators); void GetNetworkStats(struct GGPONetworkStats *stats); bool GetEvent(UdpProtocol::Event &e); @@ -117,7 +124,7 @@ protected: void SendSyncRequest(); void SendMsg(UdpMsg *msg); void PumpSendQueue(); - void DispatchMsg(uint8 *buffer, int len); +// void DispatchMsg(uint8 *buffer, int len); void SendPendingOutput(); bool OnInvalid(UdpMsg *msg, int len); bool OnSyncRequest(UdpMsg *msg, int len); @@ -127,6 +134,7 @@ protected: bool OnQualityReport(UdpMsg *msg, int len); bool OnQualityReply(UdpMsg *msg, int len); bool OnKeepAlive(UdpMsg *msg, int len); + bool OnAppData(UdpMsg *msg, int len); protected: /*