Netplay: Use enet for connections

This commit is contained in:
Stenzek 2023-05-07 17:48:50 +10:00
parent d6512dc8bc
commit 68e7fe0209
23 changed files with 524 additions and 912 deletions

View File

@ -14,11 +14,9 @@
<ClInclude Include="src\game_input.h" />
<ClInclude Include="src\input_queue.h" />
<ClInclude Include="src\log.h" />
<ClInclude Include="src\network\udp.h" />
<ClInclude Include="src\network\udp_msg.h" />
<ClInclude Include="src\network\udp_proto.h" />
<ClInclude Include="src\platform_windows.h" />
<ClInclude Include="src\poll.h" />
<ClInclude Include="src\ring_buffer.h" />
<ClInclude Include="src\static_buffer.h" />
<ClInclude Include="src\sync.h" />
@ -34,10 +32,8 @@
<ClCompile Include="src\input_queue.cpp" />
<ClCompile Include="src\log.cpp" />
<ClCompile Include="src\main.cpp" />
<ClCompile Include="src\network\udp.cpp" />
<ClCompile Include="src\network\udp_proto.cpp" />
<ClCompile Include="src\platform_windows.cpp" />
<ClCompile Include="src\poll.cpp" />
<ClCompile Include="src\sync.cpp" />
<ClCompile Include="src\timesync.cpp" />
</ItemGroup>
@ -46,7 +42,8 @@
<ClCompile>
<WarningLevel>TurnOffAllWarnings</WarningLevel>
<PreprocessorDefinitions>_WINDOWS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>$(ProjectDir)src;$(ProjectDir)include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>%(AdditionalIncludeDirectories);$(ProjectDir)src;$(ProjectDir)include</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>%(AdditionalIncludeDirectories);$(SolutionDir)dep\enet\include</AdditionalIncludeDirectories>
</ClCompile>
</ItemDefinitionGroup>
<Import Project="..\msvc\vsprops\Targets.props" />

View File

@ -6,9 +6,7 @@
<ClInclude Include="src\game_input.h" />
<ClInclude Include="src\input_queue.h" />
<ClInclude Include="src\log.h" />
<ClInclude Include="src\platform_linux.h" />
<ClInclude Include="src\platform_windows.h" />
<ClInclude Include="src\poll.h" />
<ClInclude Include="src\ring_buffer.h" />
<ClInclude Include="src\static_buffer.h" />
<ClInclude Include="src\sync.h" />
@ -32,9 +30,6 @@
<ClInclude Include="src\network\udp_proto.h">
<Filter>network</Filter>
</ClInclude>
<ClInclude Include="src\network\udp.h">
<Filter>network</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="src\bitvector.cpp" />
@ -42,9 +37,7 @@
<ClCompile Include="src\input_queue.cpp" />
<ClCompile Include="src\log.cpp" />
<ClCompile Include="src\main.cpp" />
<ClCompile Include="src\platform_linux.cpp" />
<ClCompile Include="src\platform_windows.cpp" />
<ClCompile Include="src\poll.cpp" />
<ClCompile Include="src\sync.cpp" />
<ClCompile Include="src\timesync.cpp" />
<ClCompile Include="src\backends\spectator.cpp">
@ -59,9 +52,6 @@
<ClCompile Include="src\network\udp_proto.cpp">
<Filter>network</Filter>
</ClCompile>
<ClCompile Include="src\network\udp.cpp">
<Filter>network</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="backends">

View File

@ -8,6 +8,8 @@
#ifndef _GGPONET_H_
#define _GGPONET_H_
#include "enet/enet.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -78,8 +80,7 @@ typedef struct GGPOPlayer {
struct {
} local;
struct {
char ip_address[32];
unsigned short port;
ENetPeer* peer;
} remote;
} u;
} GGPOPlayer;
@ -149,12 +150,8 @@ typedef enum {
GGPO_EVENTCODE_SYNCHRONIZING_WITH_PEER = 1001,
GGPO_EVENTCODE_SYNCHRONIZED_WITH_PEER = 1002,
GGPO_EVENTCODE_RUNNING = 1003,
GGPO_EVENTCODE_DISCONNECTED_FROM_PEER = 1004,
GGPO_EVENTCODE_TIMESYNC = 1005,
GGPO_EVENTCODE_CONNECTION_INTERRUPTED = 1006,
GGPO_EVENTCODE_CONNECTION_RESUMED = 1007,
GGPO_EVENTCODE_CHAT = 1008,
GGPO_EVENTCODE_DESYNC = 1009
GGPO_EVENTCODE_TIMESYNC = 1004,
GGPO_EVENTCODE_DESYNC = 1006
} GGPOEventCode;
/*
@ -190,10 +187,6 @@ typedef struct {
struct {
GGPOPlayerHandle player;
} connection_resumed;
struct {
int senderID;
const char* msg;
} chat;
struct {
int nFrameOfDesync;
uint32_t ourCheckSum;
@ -460,6 +453,7 @@ GGPO_API GGPOErrorCode __cdecl ggpo_set_frame_delay(GGPOSession *,
* in ggpo_idle.
*/
GGPO_API GGPOErrorCode __cdecl ggpo_idle(GGPOSession *);
GGPO_API GGPOErrorCode __cdecl ggpo_network_idle(GGPOSession *);
/*
* ggpo_add_local_input --
@ -480,25 +474,6 @@ GGPO_API GGPOErrorCode __cdecl ggpo_add_local_input(GGPOSession *,
GGPOPlayerHandle player,
void *values,
int size);
/*
* ggpo_add_local_input --
*
* Used to notify GGPO.net of inputs that should be trasmitted to remote
* players. ggpo_add_local_input must be called once every frame for
* all player of type GGPO_PLAYERTYPE_LOCAL.
*
* player - The player handle returned for this player when you called
* ggpo_add_local_player.
*
* values - The controller inputs for this player.
*
* size - The size of the controller inputs. This must be exactly equal to the
* size passed into ggpo_start_session.
*/
GGPO_API GGPOErrorCode __cdecl ggpo_client_chat(GGPOSession *,
const char* message);
/*
* ggpo_synchronize_input --
*
@ -575,15 +550,6 @@ GGPO_API GGPOErrorCode __cdecl ggpo_get_network_stats(GGPOSession *,
GGPO_API GGPOErrorCode __cdecl ggpo_set_disconnect_timeout(GGPOSession *,
int timeout);
/*
* ggpo_enable_manual_network_polling --
*
* disables polling done by ggpo and it's expected that ggpo_poll_network will be used instead.
*
*/
GGPO_API GGPOErrorCode __cdecl ggpo_set_manual_network_polling(GGPOSession*,
bool value);
/*
* ggpo_set_disconnect_notify_start --
*
* The time to wait before the first GGPO_EVENTCODE_NETWORK_INTERRUPTED timeout
@ -594,14 +560,12 @@ GGPO_API GGPOErrorCode __cdecl ggpo_set_manual_network_polling(GGPOSession*,
*/
GGPO_API GGPOErrorCode __cdecl ggpo_set_disconnect_notify_start(GGPOSession *,
int timeout);
/*
* ggpo_poll_network --
*
* polls the network socket for any messages to be sent and recieved.
*
*/
GGPO_API GGPOErrorCode __cdecl ggpo_poll_network(GGPOSession*);
/*
* ENet packet processing
*/
GGPO_API GGPOErrorCode __cdecl ggpo_handle_packet(GGPOSession*,
ENetPeer* peer, const ENetPacket* pkt);
/*
* ggpo_log --

View File

@ -15,21 +15,20 @@ class GGPOSession {
public:
virtual ~GGPOSession() { }
virtual GGPOErrorCode DoPoll() = 0;
virtual GGPOErrorCode NetworkIdle() = 0;
virtual GGPOErrorCode AddPlayer(GGPOPlayer *player, GGPOPlayerHandle *handle) = 0;
virtual GGPOErrorCode AddLocalInput(GGPOPlayerHandle player, void *values, int size) = 0;
virtual GGPOErrorCode SyncInput(void *values, int size, int *disconnect_flags) = 0;
virtual GGPOErrorCode IncrementFrame(uint16_t checksum) = 0;
virtual GGPOErrorCode CurrentFrame(int& current) =0;
virtual GGPOErrorCode Chat(const char* text) = 0;// { return GGPO_OK; }
virtual GGPOErrorCode DisconnectPlayer(GGPOPlayerHandle handle) = 0;// { return GGPO_OK; }
virtual GGPOErrorCode PollNetwork() = 0;
virtual GGPOErrorCode GetNetworkStats(GGPONetworkStats *stats, GGPOPlayerHandle handle) { return GGPO_OK; }
virtual GGPOErrorCode Logv(const char *fmt, va_list list) { ::Logv(fmt, list); return GGPO_OK; }
virtual GGPOErrorCode SetFrameDelay(GGPOPlayerHandle player, int delay) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode SetDisconnectTimeout(int timeout) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode SetDisconnectNotifyStart(int timeout) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode SetManualNetworkPolling(bool value) = 0;
virtual GGPOErrorCode OnPacket(ENetPeer* peer, const ENetPacket* pkt) = 0;
};

View File

@ -8,8 +8,6 @@
#include "p2p.h"
static const int RECOMMENDATION_INTERVAL = 120;
static const int DEFAULT_DISCONNECT_TIMEOUT = 5000;
static const int DEFAULT_DISCONNECT_NOTIFY_START = 750;
Peer2PeerBackend::Peer2PeerBackend(GGPOSessionCallbacks *cb,
const char *gamename,
@ -19,15 +17,12 @@ Peer2PeerBackend::Peer2PeerBackend(GGPOSessionCallbacks *cb,
_num_players(num_players),
_input_size(input_size),
_sync(_local_connect_status, nframes),
_disconnect_timeout(DEFAULT_DISCONNECT_TIMEOUT),
_disconnect_notify_start(DEFAULT_DISCONNECT_NOTIFY_START),
_num_spectators(0),
_next_spectator_frame(0)
{
_callbacks = *cb;
_synchronizing = true;
_next_recommended_sleep = 0;
_manual_network_polling = false;
/*
* Initialize the synchronziation layer
@ -39,11 +34,6 @@ Peer2PeerBackend::Peer2PeerBackend(GGPOSessionCallbacks *cb,
config.num_prediction_frames = nframes;
_sync.Init(config);
/*
* Initialize the UDP port
*/
_udp.Init(localport, &_poll, this);
_endpoints.resize(_num_players);
memset(_local_connect_status, 0, sizeof(_local_connect_status));
for (int i = 0; i < ARRAY_SIZE(_local_connect_status); i++) {
@ -61,23 +51,18 @@ Peer2PeerBackend::~Peer2PeerBackend()
}
void
Peer2PeerBackend::AddRemotePlayer(char *ip,
uint16 port,
int queue)
Peer2PeerBackend::AddRemotePlayer(ENetPeer* peer, int queue)
{
/*
* Start the state machine (xxx: no)
*/
_synchronizing = true;
_endpoints[queue].Init(&_udp, _poll, queue, ip, port, _local_connect_status);
_endpoints[queue].SetDisconnectTimeout(_disconnect_timeout);
_endpoints[queue].SetDisconnectNotifyStart(_disconnect_notify_start);
_endpoints[queue].Init(peer, queue, _local_connect_status);
_endpoints[queue].Synchronize();
}
GGPOErrorCode Peer2PeerBackend::AddSpectator(char *ip,
uint16 port)
GGPOErrorCode Peer2PeerBackend::AddSpectator(ENetPeer* peer)
{
if (_num_spectators == GGPO_MAX_SPECTATORS) {
return GGPO_ERRORCODE_TOO_MANY_SPECTATORS;
@ -90,9 +75,7 @@ GGPOErrorCode Peer2PeerBackend::AddSpectator(char *ip,
}
int queue = _num_spectators++;
_spectators[queue].Init(&_udp, _poll, queue + 1000, ip, port, _local_connect_status);
_spectators[queue].SetDisconnectTimeout(_disconnect_timeout);
_spectators[queue].SetDisconnectNotifyStart(_disconnect_notify_start);
_spectators[queue].Init(peer, queue + 1000, _local_connect_status);
_spectators[queue].Synchronize();
return GGPO_OK;
@ -155,21 +138,8 @@ void Peer2PeerBackend::CheckDesync()
GGPOErrorCode
Peer2PeerBackend::DoPoll()
{
// Pass on chat
for (int i = 0; i < _num_players; i++) {
_endpoints[i].ConsumeChat([&](const char* msg) {
GGPOEvent info;
info.u.chat.senderID = i;
info.code = GGPO_EVENTCODE_CHAT;
info.u.chat.msg = msg;
_callbacks.on_event(_callbacks.context, &info);
});
}
if (!_sync.InRollback()) {
if (!_manual_network_polling)
_poll.Pump(0);
if (!_sync.InRollback())
{
PollUdpProtocolEvents();
CheckDesync();
if (!_synchronizing) {
@ -232,6 +202,14 @@ Peer2PeerBackend::DoPoll()
return GGPO_OK;
}
GGPOErrorCode Peer2PeerBackend::NetworkIdle()
{
for (UdpProtocol& udp : _endpoints)
udp.NetworkIdle();
return GGPO_OK;
}
int Peer2PeerBackend::Poll2Players(int current_frame)
{
int i;
@ -309,7 +287,7 @@ Peer2PeerBackend::AddPlayer(GGPOPlayer *player,
GGPOPlayerHandle *handle)
{
if (player->type == GGPO_PLAYERTYPE_SPECTATOR) {
return AddSpectator(player->u.remote.ip_address, player->u.remote.port);
return AddSpectator(player->u.remote.peer);
}
int queue = player->player_num - 1;
@ -319,7 +297,7 @@ Peer2PeerBackend::AddPlayer(GGPOPlayer *player,
*handle = QueueToPlayerHandle(queue);
if (player->type == GGPO_PLAYERTYPE_REMOTE) {
AddRemotePlayer(player->u.remote.ip_address, player->u.remote.port, queue);
AddRemotePlayer(player->u.remote.peer, queue);
}
return GGPO_OK;
}
@ -413,19 +391,6 @@ Peer2PeerBackend::CurrentFrame(int& current)
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::PollNetwork()
{
_poll.Pump(0);
return GGPO_OK;
}
GGPOErrorCode Peer2PeerBackend::SetManualNetworkPolling(bool value)
{
_manual_network_polling = value;
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::IncrementFrame(uint16_t checksum1)
{
@ -570,10 +535,6 @@ Peer2PeerBackend::OnUdpProtocolPeerEvent(UdpProtocol::Event &evt, int queue)
}
}
break;
case UdpProtocol::Event::Disconnected:
DisconnectPlayer(QueueToPlayerHandle(queue));
break;
}
}
@ -583,19 +544,6 @@ Peer2PeerBackend::OnUdpProtocolSpectatorEvent(UdpProtocol::Event &evt, int queue
{
GGPOPlayerHandle handle = QueueToSpectatorHandle(queue);
OnUdpProtocolEvent(evt, handle);
GGPOEvent info;
switch (evt.type) {
case UdpProtocol::Event::Disconnected:
_spectators[queue].Disconnect();
info.code = GGPO_EVENTCODE_DISCONNECTED_FROM_PEER;
info.u.disconnected.player = handle;
_callbacks.on_event(_callbacks.context, &info);
break;
}
}
void
@ -623,19 +571,6 @@ Peer2PeerBackend::OnUdpProtocolEvent(UdpProtocol::Event &evt, GGPOPlayerHandle h
CheckInitialSync();
break;
case UdpProtocol::Event::NetworkInterrupted:
info.code = GGPO_EVENTCODE_CONNECTION_INTERRUPTED;
info.u.connection_interrupted.player = handle;
info.u.connection_interrupted.disconnect_timeout = evt.u.network_interrupted.disconnect_timeout;
_callbacks.on_event(_callbacks.context, &info);
break;
case UdpProtocol::Event::NetworkResumed:
info.code = GGPO_EVENTCODE_CONNECTION_RESUMED;
info.u.connection_resumed.player = handle;
_callbacks.on_event(_callbacks.context, &info);
break;
}
}
@ -682,6 +617,7 @@ Peer2PeerBackend::DisconnectPlayerQueue(int queue, int syncto)
GGPOEvent info;
int framecount = _sync.GetFrameCount();
// TODO: Where does the endpoint actually get removed? I can't see it anywhere...
_endpoints[queue].Disconnect();
Log("Changing queue %d local connect status for last frame from %d to %d on disconnect request (current: %d).\n",
@ -696,10 +632,6 @@ Peer2PeerBackend::DisconnectPlayerQueue(int queue, int syncto)
Log("finished adjusting simulation.\n");
}*/
info.code = GGPO_EVENTCODE_DISCONNECTED_FROM_PEER;
info.u.disconnected.player = QueueToPlayerHandle(queue);
_callbacks.on_event(_callbacks.context, &info);
CheckInitialSync();
}
@ -742,45 +674,6 @@ Peer2PeerBackend::SetFrameDelay(GGPOPlayerHandle player, int delay)
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::SetDisconnectTimeout(int timeout)
{
_disconnect_timeout = timeout;
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
_endpoints[i].SetDisconnectTimeout(_disconnect_timeout);
}
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::SetDisconnectNotifyStart(int timeout)
{
_disconnect_notify_start = timeout;
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
_endpoints[i].SetDisconnectNotifyStart(_disconnect_notify_start);
}
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::Chat(const char* text)
{
if (strlen(text) >= MAX_CHAT_LENGTH)
return GGPO_CHAT_MESSAGE_TOO_LONG;
// Send the input to all the remote players.
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].IsInitialized()) {
_endpoints[i].SendChat(text);
}
}
return GGPO_OK;
}
GGPOErrorCode
Peer2PeerBackend::PlayerHandleToQueue(GGPOPlayerHandle player, int *queue)
{
@ -793,21 +686,27 @@ Peer2PeerBackend::PlayerHandleToQueue(GGPOPlayerHandle player, int *queue)
}
void
Peer2PeerBackend::OnMsg(sockaddr_in &from, UdpMsg *msg, int len)
GGPOErrorCode
Peer2PeerBackend::OnPacket(ENetPeer* peer, const ENetPacket* pkt)
{
// ugh why is const so hard for some people...
UdpMsg* msg = const_cast<UdpMsg*>(reinterpret_cast<const UdpMsg*>(pkt->data));
const int len = static_cast<int>(pkt->dataLength);
for (int i = 0; i < _num_players; i++) {
if (_endpoints[i].HandlesMsg(from, msg)) {
if (_endpoints[i].GetENetPeer() == peer) {
_endpoints[i].OnMsg(msg, len);
return;
return GGPO_OK;
}
}
for (int i = 0; i < _num_spectators; i++) {
if (_spectators[i].HandlesMsg(from, msg)) {
if (_spectators[i].GetENetPeer() == peer) {
_spectators[i].OnMsg(msg, len);
return;
return GGPO_OK;
}
}
return GGPO_ERRORCODE_INVALID_PLAYER_HANDLE;
}
void

View File

@ -9,13 +9,13 @@
#define _P2P_H
#include "types.h"
#include "poll.h"
#include "sync.h"
#include "backend.h"
#include "timesync.h"
#include "network/udp_proto.h"
#include <map>
class Peer2PeerBackend : public GGPOSession, Udp::Callbacks {
class Peer2PeerBackend final : public GGPOSession
{
public:
Peer2PeerBackend(GGPOSessionCallbacks *cb, const char *gamename, uint16 localport, int num_players, int input_size, int nframes);
virtual ~Peer2PeerBackend();
@ -23,6 +23,7 @@ public:
public:
virtual GGPOErrorCode DoPoll() override;
virtual GGPOErrorCode NetworkIdle() override;
virtual GGPOErrorCode AddPlayer(GGPOPlayer *player, GGPOPlayerHandle *handle) override;
virtual GGPOErrorCode AddLocalInput(GGPOPlayerHandle player, void *values, int size) override;
virtual GGPOErrorCode SyncInput(void *values, int size, int *disconnect_flags) override;
@ -30,15 +31,8 @@ public:
virtual GGPOErrorCode DisconnectPlayer(GGPOPlayerHandle handle) override;
virtual GGPOErrorCode GetNetworkStats(GGPONetworkStats *stats, GGPOPlayerHandle handle) override;
virtual GGPOErrorCode SetFrameDelay(GGPOPlayerHandle player, int delay) override;
virtual GGPOErrorCode SetDisconnectTimeout(int timeout) override;
virtual GGPOErrorCode SetDisconnectNotifyStart(int timeout) override;
virtual GGPOErrorCode Chat(const char* text) override;
virtual GGPOErrorCode CurrentFrame(int& current) override;
virtual GGPOErrorCode PollNetwork() override;
virtual GGPOErrorCode SetManualNetworkPolling(bool value) override;
public:
virtual void OnMsg(sockaddr_in &from, UdpMsg *msg, int len);
virtual GGPOErrorCode OnPacket(ENetPeer* peer, const ENetPacket* pkt) override;
protected:
GGPOErrorCode PlayerHandleToQueue(GGPOPlayerHandle player, int *queue);
@ -50,8 +44,8 @@ protected:
void CheckInitialSync(void);
int Poll2Players(int current_frame);
int PollNPlayers(int current_frame);
void AddRemotePlayer(char *remoteip, uint16 reportport, int queue);
GGPOErrorCode AddSpectator(char *remoteip, uint16 reportport);
void AddRemotePlayer(ENetPeer* peer, int queue);
GGPOErrorCode AddSpectator(ENetPeer* peer);
virtual void OnSyncEvent(Sync::Event &e) { }
virtual void OnUdpProtocolEvent(UdpProtocol::Event &e, GGPOPlayerHandle handle);
virtual void OnUdpProtocolPeerEvent(UdpProtocol::Event &e, int queue);
@ -59,9 +53,7 @@ protected:
protected:
GGPOSessionCallbacks _callbacks;
Poll _poll;
Sync _sync;
Udp _udp;
std::vector<UdpProtocol> _endpoints;
UdpProtocol _spectators[GGPO_MAX_SPECTATORS];
int _num_spectators;
@ -72,10 +64,6 @@ protected:
int _next_recommended_sleep;
int _next_spectator_frame;
int _disconnect_timeout;
int _disconnect_notify_start;
bool _manual_network_polling;
UdpMsg::connect_status _local_connect_status[UDP_MSG_MAX_PLAYERS];
struct ChecksumEntry {

View File

@ -20,7 +20,6 @@ SpectatorBackend::SpectatorBackend(GGPOSessionCallbacks *cb,
{
_callbacks = *cb;
_synchronizing = true;
_manual_network_polling = false;
for (int i = 0; i < ARRAY_SIZE(_inputs); i++) {
_inputs[i].frame = -1;
@ -29,12 +28,14 @@ SpectatorBackend::SpectatorBackend(GGPOSessionCallbacks *cb,
/*
* Initialize the UDP port
*/
_udp.Init(localport, &_poll, this);
// FIXME
abort();
//_udp.Init(localport, &_poll, this);
/*
* Init the host endpoint
*/
_host.Init(&_udp, _poll, 0, hostip, hostport, NULL);
//_host.Init(&_udp, _poll, 0, hostip, hostport, NULL);
_host.Synchronize();
/*
@ -50,13 +51,16 @@ SpectatorBackend::~SpectatorBackend()
GGPOErrorCode
SpectatorBackend::DoPoll()
{
if (!_manual_network_polling)
_poll.Pump(0);
PollUdpProtocolEvents();
return GGPO_OK;
}
GGPOErrorCode SpectatorBackend::NetworkIdle()
{
_host.NetworkIdle();
return GGPO_OK;
}
GGPOErrorCode
SpectatorBackend::SyncInput(void *values,
int size,
@ -95,20 +99,6 @@ SpectatorBackend::CurrentFrame(int& current)
return GGPO_OK;
}
GGPOErrorCode
SpectatorBackend::PollNetwork()
{
_poll.Pump(0);
return GGPO_OK;
}
GGPOErrorCode
SpectatorBackend::SetManualNetworkPolling(bool value)
{
_manual_network_polling = value;
return GGPO_OK;
}
GGPOErrorCode
SpectatorBackend::IncrementFrame(uint16_t checksum)
{
@ -159,25 +149,6 @@ SpectatorBackend::OnUdpProtocolEvent(UdpProtocol::Event &evt)
}
break;
case UdpProtocol::Event::NetworkInterrupted:
info.code = GGPO_EVENTCODE_CONNECTION_INTERRUPTED;
info.u.connection_interrupted.player = 0;
info.u.connection_interrupted.disconnect_timeout = evt.u.network_interrupted.disconnect_timeout;
_callbacks.on_event(_callbacks.context, &info);
break;
case UdpProtocol::Event::NetworkResumed:
info.code = GGPO_EVENTCODE_CONNECTION_RESUMED;
info.u.connection_resumed.player = 0;
_callbacks.on_event(_callbacks.context, &info);
break;
case UdpProtocol::Event::Disconnected:
info.code = GGPO_EVENTCODE_DISCONNECTED_FROM_PEER;
info.u.disconnected.player = 0;
_callbacks.on_event(_callbacks.context, &info);
break;
case UdpProtocol::Event::Input:
GameInput& input = evt.u.input.input;
@ -188,11 +159,14 @@ SpectatorBackend::OnUdpProtocolEvent(UdpProtocol::Event &evt)
}
}
void
SpectatorBackend::OnMsg(sockaddr_in &from, UdpMsg *msg, int len)
GGPOErrorCode SpectatorBackend::OnPacket(ENetPeer* peer, const ENetPacket* pkt)
{
if (_host.HandlesMsg(from, msg)) {
_host.OnMsg(msg, len);
}
if (_host.GetENetPeer() != peer)
return GGPO_ERRORCODE_INVALID_PLAYER_HANDLE;
UdpMsg* msg = const_cast<UdpMsg*>(reinterpret_cast<const UdpMsg*>(pkt->data));
const int len = static_cast<int>(pkt->dataLength);
_host.OnMsg(msg, len);
return GGPO_OK;
}

View File

@ -9,7 +9,6 @@
#define _SPECTATOR_H
#include "types.h"
#include "poll.h"
#include "sync.h"
#include "backend.h"
#include "timesync.h"
@ -17,7 +16,7 @@
#define SPECTATOR_FRAME_BUFFER_SIZE 64
class SpectatorBackend : public GGPOSession, Udp::Callbacks {
class SpectatorBackend final : public GGPOSession {
public:
SpectatorBackend(GGPOSessionCallbacks *cb, const char *gamename, uint16 localport, int num_players, int input_size, char *hostip, u_short hostport);
virtual ~SpectatorBackend();
@ -25,6 +24,7 @@ public:
public:
virtual GGPOErrorCode DoPoll();
virtual GGPOErrorCode NetworkIdle();
virtual GGPOErrorCode AddPlayer(GGPOPlayer *player, GGPOPlayerHandle *handle) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode AddLocalInput(GGPOPlayerHandle player, void *values, int size) { return GGPO_OK; }
virtual GGPOErrorCode SyncInput(void *values, int size, int *disconnect_flags);
@ -34,13 +34,8 @@ public:
virtual GGPOErrorCode SetFrameDelay(GGPOPlayerHandle player, int delay) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode SetDisconnectTimeout(int timeout) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode SetDisconnectNotifyStart(int timeout) { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode Chat(const char* text) override { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode CurrentFrame(int& current) override;
virtual GGPOErrorCode PollNetwork() override;
virtual GGPOErrorCode SetManualNetworkPolling(bool value) override;
public:
virtual void OnMsg(sockaddr_in &from, UdpMsg *msg, int len);
virtual GGPOErrorCode OnPacket(ENetPeer* peer, const ENetPacket* pkt) override;
protected:
void PollUdpProtocolEvents(void);
@ -50,15 +45,12 @@ protected:
protected:
GGPOSessionCallbacks _callbacks;
Poll _poll;
Udp _udp;
UdpProtocol _host;
bool _synchronizing;
int _input_size;
int _num_players;
int _next_input_to_send;
GameInput _inputs[SPECTATOR_FRAME_BUFFER_SIZE];
bool _manual_network_polling;
};
#endif

View File

@ -54,6 +54,12 @@ SyncTestBackend::DoPoll()
return GGPO_OK;
}
GGPOErrorCode
SyncTestBackend::NetworkIdle()
{
return GGPO_OK;
}
GGPOErrorCode
SyncTestBackend::AddPlayer(GGPOPlayer *player, GGPOPlayerHandle *handle)
{

View File

@ -13,22 +13,21 @@
#include "sync.h"
#include "ring_buffer.h"
class SyncTestBackend : public GGPOSession {
class SyncTestBackend final : public GGPOSession {
public:
SyncTestBackend(GGPOSessionCallbacks *cb, char *gamename, int frames, int num_players);
virtual ~SyncTestBackend();
virtual GGPOErrorCode DoPoll();
virtual GGPOErrorCode NetworkIdle();
virtual GGPOErrorCode AddPlayer(GGPOPlayer *player, GGPOPlayerHandle *handle);
virtual GGPOErrorCode AddLocalInput(GGPOPlayerHandle player, void *values, int size);
virtual GGPOErrorCode SyncInput(void *values, int size, int *disconnect_flags);
virtual GGPOErrorCode IncrementFrame(uint16_t checksum);
virtual GGPOErrorCode Logv(char *fmt, va_list list);
virtual GGPOErrorCode DisconnectPlayer(GGPOPlayerHandle handle) { return GGPO_OK; }
virtual GGPOErrorCode Chat(const char* text) override { return GGPO_ERRORCODE_UNSUPPORTED; }
virtual GGPOErrorCode CurrentFrame(int& current) override;
virtual GGPOErrorCode PollNetwork() override { return GGPO_OK; };
virtual GGPOErrorCode SetManualNetworkPolling(bool value) override { return GGPO_OK; };
virtual GGPOErrorCode OnPacket(ENetPeer* peer, const ENetPacket* pkt) override { return GGPO_ERRORCODE_UNSUPPORTED; }
protected:
struct SavedInfo {

View File

@ -100,6 +100,15 @@ ggpo_idle(GGPOSession *ggpo)
return ggpo->DoPoll();
}
GGPOErrorCode
ggpo_network_idle(GGPOSession* ggpo)
{
if (!ggpo) {
return GGPO_ERRORCODE_INVALID_SESSION;
}
return ggpo->NetworkIdle();
}
GGPOErrorCode
ggpo_add_local_input(GGPOSession *ggpo,
GGPOPlayerHandle player,
@ -151,15 +160,6 @@ ggpo_get_current_frame(GGPOSession *ggpo, int& nFrame)
return ggpo->CurrentFrame(nFrame);
}
GGPOErrorCode
ggpo_client_chat(GGPOSession *ggpo, const char *text)
{
if (!ggpo) {
return GGPO_ERRORCODE_INVALID_SESSION;
}
return ggpo->Chat(text);
}
GGPOErrorCode
ggpo_get_network_stats(GGPOSession *ggpo,
GGPOPlayerHandle player,
@ -172,6 +172,13 @@ ggpo_get_network_stats(GGPOSession *ggpo,
}
GGPOErrorCode ggpo_handle_packet(GGPOSession* ggpo, ENetPeer* peer, const ENetPacket* pkt)
{
if (!ggpo)
return GGPO_ERRORCODE_INVALID_SESSION;
return ggpo->OnPacket(peer, pkt);
}
GGPOErrorCode
ggpo_close_session(GGPOSession *ggpo)
{
@ -191,16 +198,6 @@ ggpo_set_disconnect_timeout(GGPOSession *ggpo, int timeout)
return ggpo->SetDisconnectTimeout(timeout);
}
GGPOErrorCode
ggpo_set_manual_network_polling(GGPOSession *ggpo, bool value)
{
if (!ggpo)
{
return GGPO_ERRORCODE_INVALID_SESSION;
}
return ggpo->SetManualNetworkPolling(value);
}
GGPOErrorCode
ggpo_set_disconnect_notify_start(GGPOSession *ggpo, int timeout)
{
@ -210,16 +207,6 @@ ggpo_set_disconnect_notify_start(GGPOSession *ggpo, int timeout)
return ggpo->SetDisconnectNotifyStart(timeout);
}
GGPOErrorCode
ggpo_poll_network(GGPOSession* ggpo)
{
if (!ggpo)
{
return GGPO_ERRORCODE_INVALID_SESSION;
}
return ggpo->PollNetwork();
}
GGPOErrorCode ggpo_start_spectating(GGPOSession **session,
GGPOSessionCallbacks *cb,
const char *game,
@ -238,4 +225,3 @@ GGPOErrorCode ggpo_start_spectating(GGPOSession **session,
host_port);
return GGPO_OK;
}

View File

@ -1,125 +0,0 @@
/* -----------------------------------------------------------------------
* GGPO.net (http://ggpo.net) - Copyright 2009 GroundStorm Studios, LLC.
*
* Use of this software is governed by the MIT license that can be found
* in the LICENSE file.
*/
#include "types.h"
#include "udp.h"
SOCKET
CreateSocket(uint16 bind_port, int retries)
{
SOCKET s;
sockaddr_in sin;
uint16 port;
int optval = 1;
s = socket(AF_INET, SOCK_DGRAM, 0);
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&optval, sizeof optval);
setsockopt(s, SOL_SOCKET, SO_DONTLINGER, (const char *)&optval, sizeof optval);
// non-blocking...
u_long iMode = 1;
ioctlsocket(s, FIONBIO, &iMode);
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
for (port = bind_port; port <= bind_port + retries; port++) {
sin.sin_port = htons(port);
if (bind(s, (sockaddr *)&sin, sizeof sin) != SOCKET_ERROR) {
Log("Udp bound to port: %d.\n", port);
return s;
}
}
closesocket(s);
return INVALID_SOCKET;
}
Udp::Udp() :
_socket(INVALID_SOCKET),
_callbacks(NULL)
{
}
Udp::~Udp(void)
{
if (_socket != INVALID_SOCKET) {
closesocket(_socket);
_socket = INVALID_SOCKET;
}
}
void
Udp::Init(uint16 port, Poll *poll, Callbacks *callbacks)
{
_callbacks = callbacks;
_poll = poll;
_poll->RegisterLoop(this);
Log("binding udp socket to port %d.\n", port);
_socket = CreateSocket(port, 0);
}
void
Udp::SendTo(char *buffer, int len, int flags, struct sockaddr *dst, int destlen)
{
struct sockaddr_in *to = (struct sockaddr_in *)dst;
int res = sendto(_socket, buffer, len, flags, dst, destlen);
if (res == SOCKET_ERROR) {
DWORD err = WSAGetLastError();
Log("unknown error in sendto (erro: %d wsaerr: %d).\n", res, err);
ASSERT(FALSE && "Unknown error in sendto");
}
char dst_ip[1024];
Log("sent packet length %d to %s:%d (ret:%d).\n", len, inet_ntop(AF_INET, (void *)&to->sin_addr, dst_ip, ARRAY_SIZE(dst_ip)), ntohs(to->sin_port), res);
}
bool
Udp::OnLoopPoll(void *cookie)
{
uint8 recv_buf[MAX_UDP_PACKET_SIZE];
sockaddr_in recv_addr;
int recv_addr_len;
for (;;) {
recv_addr_len = sizeof(recv_addr);
int len = recvfrom(_socket, (char *)recv_buf, MAX_UDP_PACKET_SIZE, 0, (struct sockaddr *)&recv_addr, &recv_addr_len);
// TODO: handle len == 0... indicates a disconnect.
if (len == -1) {
int error = WSAGetLastError();
if (error != WSAEWOULDBLOCK) {
Log("recvfrom WSAGetLastError returned %d (%x).\n", error, error);
}
break;
} else if (len > 0) {
char src_ip[1024];
Log("recvfrom returned (len:%d from:%s:%d).\n", len, inet_ntop(AF_INET, (void*)&recv_addr.sin_addr, src_ip, ARRAY_SIZE(src_ip)), ntohs(recv_addr.sin_port) );
UdpMsg *msg = (UdpMsg *)recv_buf;
_callbacks->OnMsg(recv_addr, msg, len);
}
}
return true;
}
void
Udp::Log(const char *fmt, ...)
{
char buf[1024];
size_t offset;
va_list args;
strcpy_s(buf, "udp | ");
offset = strlen(buf);
va_start(args, fmt);
vsnprintf(buf + offset, ARRAY_SIZE(buf) - offset - 1, fmt, args);
buf[ARRAY_SIZE(buf)-1] = '\0';
::Log(buf);
va_end(args);
}

View File

@ -1,61 +0,0 @@
/* -----------------------------------------------------------------------
* GGPO.net (http://ggpo.net) - Copyright 2009 GroundStorm Studios, LLC.
*
* Use of this software is governed by the MIT license that can be found
* in the LICENSE file.
*/
#ifndef _UDP_H
#define _UDP_H
#include "poll.h"
#include "udp_msg.h"
#include "ggponet.h"
#include "ring_buffer.h"
#define MAX_UDP_ENDPOINTS 16
static const int MAX_UDP_PACKET_SIZE = 4096;
class Udp : public IPollSink
{
public:
struct Stats {
int bytes_sent;
int packets_sent;
float kbps_sent;
};
struct Callbacks {
virtual ~Callbacks() { }
virtual void OnMsg(sockaddr_in &from, UdpMsg *msg, int len) = 0;
};
protected:
void Log(const char *fmt, ...);
public:
Udp();
void Init(uint16 port, Poll *p, Callbacks *callbacks);
void SendTo(char *buffer, int len, int flags, struct sockaddr *dst, int destlen);
bool OnLoopPoll(void *cookie) override;
public:
~Udp(void);
protected:
// Network transmission information
SOCKET _socket;
// state management
Callbacks *_callbacks;
Poll *_poll;
};
#endif

View File

@ -10,7 +10,6 @@
#define MAX_COMPRESSED_BITS 4096
#define UDP_MSG_MAX_PLAYERS 4
#define MAX_CHAT_LENGTH 120
#pragma pack(push, 1)
struct UdpMsg
@ -22,9 +21,7 @@ struct UdpMsg
Input = 3,
QualityReport = 4,
QualityReply = 5,
KeepAlive = 6,
InputAck = 7,
Chat = 8,
InputAck = 6,
};
struct connect_status {
@ -63,8 +60,7 @@ struct UdpMsg
uint32 start_frame;
int disconnect_requested:1;
int ack_frame:31;
int ack_frame;
uint16 num_bits;
uint32 checksum32;
@ -73,11 +69,8 @@ struct UdpMsg
} input;
struct {
int ack_frame:31;
int ack_frame;
} input_ack;
struct {
char msg[MAX_CHAT_LENGTH];
} chat;
} u;
public:
@ -94,8 +87,6 @@ public:
case QualityReport: return sizeof(u.quality_report);
case QualityReply: return sizeof(u.quality_reply);
case InputAck: return sizeof(u.input_ack);
case Chat: return MAX_CHAT_LENGTH;
case KeepAlive: return 0;
case Input:
size = (int)((char *)&u.input.bits - (char *)&u.input);
size += (u.input.num_bits + 7) / 8;

View File

@ -15,12 +15,12 @@ static const int NUM_SYNC_PACKETS = 5;
static const int SYNC_RETRY_INTERVAL = 2000;
static const int SYNC_FIRST_RETRY_INTERVAL = 500;
static const int RUNNING_RETRY_INTERVAL = 200;
static const int KEEP_ALIVE_INTERVAL = 200;
static const int QUALITY_REPORT_INTERVAL = 333;
static const int NETWORK_STATS_INTERVAL = 500;
static const int UDP_SHUTDOWN_TIMER = 5000;
static const int MAX_SEQ_DISTANCE = (1 << 15);
static const uint8_t ENET_CHANNEL_ID = 1;
UdpProtocol::UdpProtocol() :
_local_frame_advantage(0),
_remote_frame_advantage(0),
@ -31,15 +31,10 @@ UdpProtocol::UdpProtocol() :
_bytes_sent(0),
_stats_start_time(0),
_last_send_time(0),
_shutdown_timeout(0),
_disconnect_timeout(0),
_disconnect_notify_start(0),
_disconnect_notify_sent(false),
_disconnect_event_sent(false),
_connected(false),
_next_send_seq(0),
_next_recv_seq(0),
_udp(NULL)
_peer(nullptr)
{
_last_sent_input.init(-1, NULL, 1);
_last_received_input.init(-1, NULL, 1);
@ -50,17 +45,12 @@ UdpProtocol::UdpProtocol() :
for (int i = 0; i < ARRAY_SIZE(_peer_connect_status); i++) {
_peer_connect_status[i].last_frame = -1;
}
memset(&_peer_addr, 0, sizeof _peer_addr);
_oo_packet.msg = NULL;
_send_latency = Platform::GetConfigInt("ggpo.network.delay");
_oop_percent = Platform::GetConfigInt("ggpo.oop.percent");
}
UdpProtocol::~UdpProtocol()
{
ClearSendQueue();
}
void UdpProtocol::SetFrameDelay(int delay)
{
_timesync.SetFrameDelay(delay);
@ -70,32 +60,22 @@ int UdpProtocol::RemoteFrameDelay()const
{
return _timesync._remoteFrameDelay;
}
void
UdpProtocol::Init(Udp *udp,
Poll &poll,
int queue,
char *ip,
u_short port,
UdpMsg::connect_status *status)
void UdpProtocol::Init(ENetPeer* peer, int queue, UdpMsg::connect_status *status)
{
_udp = udp;
_peer = peer;
_queue = queue;
_local_connect_status = status;
_peer_addr.sin_family = AF_INET;
_peer_addr.sin_port = htons(port);
inet_pton(AF_INET, ip, &_peer_addr.sin_addr.s_addr);
do {
_magic_number = (uint16)rand();
} while (_magic_number == 0);
poll.RegisterLoop(this);
}
void
UdpProtocol::SendInput(GameInput &input)
{
if (_udp) {
if (_peer) {
if (_current_state == Running) {
/*
* Check to see if this is a good time to adjust for the rift...
@ -156,7 +136,6 @@ UdpProtocol::SendPendingOutput()
msg->u.input.ack_frame = _last_received_input.frame;
msg->u.input.num_bits = (uint16)offset;
msg->u.input.disconnect_requested = _current_state == Disconnected;
if (_local_connect_status) {
memcpy(msg->u.input.peer_connect_status, _local_connect_status, sizeof(UdpMsg::connect_status) * UDP_MSG_MAX_PLAYERS);
} else {
@ -205,24 +184,17 @@ void UdpProtocol::EndPollLoop()
if (_remoteCheckSumsThisFrame.size())
_remoteCheckSums.emplace(*_remoteCheckSumsThisFrame.rbegin());
}
void UdpProtocol::SendChat(const char* message)
{
UdpMsg* msg = new UdpMsg(UdpMsg::Chat);
strcpy_s<MAX_CHAT_LENGTH>(msg->u.chat.msg, message);
SendMsg(msg);
}
bool
UdpProtocol::OnLoopPoll(void *cookie)
UdpProtocol::NetworkIdle()
{
if (!_udp) {
if (!_peer) {
return true;
}
unsigned int now = Platform::GetCurrentTimeMS();
unsigned int next_interval;
PumpSendQueue();
switch (_current_state) {
case Syncing:
next_interval = (_state.sync.roundtrips_remaining == NUM_SYNC_PACKETS) ? SYNC_FIRST_RETRY_INTERVAL : SYNC_RETRY_INTERVAL;
@ -255,35 +227,10 @@ UdpProtocol::OnLoopPoll(void *cookie)
_state.running.last_network_stats_interval = now;
}
if (_last_send_time && _last_send_time + KEEP_ALIVE_INTERVAL < now) {
Log("Sending keep alive packet\n");
SendMsg(new UdpMsg(UdpMsg::KeepAlive));
}
if (_disconnect_timeout && _disconnect_notify_start &&
!_disconnect_notify_sent && (_last_recv_time + _disconnect_notify_start < now)) {
Log("Endpoint has stopped receiving packets for %d ms. Sending notification.\n", _disconnect_notify_start);
Event e(Event::NetworkInterrupted);
e.u.network_interrupted.disconnect_timeout = _disconnect_timeout - _disconnect_notify_start;
QueueEvent(e);
_disconnect_notify_sent = true;
}
if (_disconnect_timeout && (_last_recv_time + _disconnect_timeout < now)) {
if (!_disconnect_event_sent) {
Log("Endpoint has stopped receiving packets for %d ms. Disconnecting.\n", _disconnect_timeout);
QueueEvent(Event(Event::Disconnected));
_disconnect_event_sent = true;
}
}
break;
case Disconnected:
if (_shutdown_timeout < now) {
Log("Shutting down udp connection.\n");
_udp = NULL;
_shutdown_timeout = 0;
}
break;
}
@ -295,7 +242,7 @@ void
UdpProtocol::Disconnect()
{
_current_state = Disconnected;
_shutdown_timeout = Platform::GetCurrentTimeMS() + UDP_SHUTDOWN_TIMER;
_peer = nullptr;
}
void
@ -311,28 +258,21 @@ UdpProtocol::SendSyncRequest()
void
UdpProtocol::SendMsg(UdpMsg *msg)
{
const int size = msg->PacketSize();
LogMsg("send", msg);
_packets_sent++;
_last_send_time = Platform::GetCurrentTimeMS();
_bytes_sent += msg->PacketSize();
_bytes_sent += size;
msg->hdr.magic = _magic_number;
msg->hdr.sequence_number = _next_send_seq++;
_send_queue.push(QueueEntry(Platform::GetCurrentTimeMS(), _peer_addr, msg));
PumpSendQueue();
}
bool
UdpProtocol::HandlesMsg(sockaddr_in &from,
UdpMsg *msg)
{
if (!_udp) {
return false;
}
return _peer_addr.sin_addr.S_un.S_addr == from.sin_addr.S_un.S_addr &&
_peer_addr.sin_port == from.sin_port;
ENetPacket* pkt = enet_packet_create(msg, size, 0);
enet_peer_send(_peer, ENET_CHANNEL_ID, pkt);
// TODO: flush packets?
// TODO: get rid of the extra heap allocation here...
delete msg;
}
void
@ -347,9 +287,7 @@ UdpProtocol::OnMsg(UdpMsg *msg, int len)
&UdpProtocol::OnInput, /* Input */
&UdpProtocol::OnQualityReport, /* QualityReport */
&UdpProtocol::OnQualityReply, /* QualityReply */
&UdpProtocol::OnKeepAlive, /* KeepAlive */
&UdpProtocol::OnInputAck, /* InputAck */
&UdpProtocol::OnChat, /* InputAck */
};
// filter out messages that don't match what we expect
@ -379,10 +317,6 @@ UdpProtocol::OnMsg(UdpMsg *msg, int len)
}
if (handled) {
_last_recv_time = Platform::GetCurrentTimeMS();
if (_disconnect_notify_sent && _current_state == Running) {
QueueEvent(Event(Event::NetworkResumed));
_disconnect_notify_sent = false;
}
}
}
@ -422,7 +356,7 @@ UdpProtocol::QueueEvent(const UdpProtocol::Event &evt)
void
UdpProtocol::Synchronize()
{
if (_udp) {
if (_peer) {
_current_state = Syncing;
_state.sync.roundtrips_remaining = NUM_SYNC_PACKETS;
SendSyncRequest();
@ -470,18 +404,12 @@ UdpProtocol::LogMsg(const char *prefix, UdpMsg *msg)
case UdpMsg::QualityReply:
Log("%s quality reply.\n", prefix);
break;
case UdpMsg::KeepAlive:
Log("%s keep alive.\n", prefix);
break;
case UdpMsg::Input:
Log("%s game-compressed-input %d (+ %d bits).\n", prefix, msg->u.input.start_frame, msg->u.input.num_bits);
break;
case UdpMsg::InputAck:
Log("%s input ack.\n", prefix);
break;
case UdpMsg::Chat:
Log("%s chat.\n", prefix);
break;
default:
Log("Unknown UdpMsg type.");
}
@ -559,28 +487,16 @@ UdpProtocol::OnSyncReply(UdpMsg *msg, int len)
bool
UdpProtocol::OnInput(UdpMsg *msg, int len)
{
/*
* If a disconnect is requested, go ahead and disconnect now.
*/
bool disconnect_requested = msg->u.input.disconnect_requested;
if (disconnect_requested) {
if (_current_state != Disconnected && !_disconnect_event_sent) {
Log("Disconnecting endpoint on remote request.\n");
QueueEvent(Event(Event::Disconnected));
_disconnect_event_sent = true;
}
} else {
/*
* Update the peer connection status if this peer is still considered to be part
* of the network.
*/
UdpMsg::connect_status* remote_status = msg->u.input.peer_connect_status;
for (int i = 0; i < ARRAY_SIZE(_peer_connect_status); i++) {
ASSERT(remote_status[i].last_frame >= _peer_connect_status[i].last_frame);
_peer_connect_status[i].disconnected = _peer_connect_status[i].disconnected || remote_status[i].disconnected;
_peer_connect_status[i].last_frame = MAX(_peer_connect_status[i].last_frame, remote_status[i].last_frame);
}
}
/*
* Update the peer connection status if this peer is still considered to be part
* of the network.
*/
UdpMsg::connect_status* remote_status = msg->u.input.peer_connect_status;
for (int i = 0; i < ARRAY_SIZE(_peer_connect_status); i++) {
ASSERT(remote_status[i].last_frame >= _peer_connect_status[i].last_frame);
_peer_connect_status[i].disconnected = _peer_connect_status[i].disconnected || remote_status[i].disconnected;
_peer_connect_status[i].last_frame = MAX(_peer_connect_status[i].last_frame, remote_status[i].last_frame);
}
/*
* Decompress the input.
@ -701,23 +617,6 @@ UdpProtocol::OnQualityReply(UdpMsg *msg, int len)
return true;
}
bool
UdpProtocol::OnKeepAlive(UdpMsg *msg, int len)
{
return true;
}
void UdpProtocol::ConsumeChat(std::function<void(const char*)> onChat)
{
for (const auto& msg : _chatMessages)
onChat(msg.c_str());
_chatMessages.clear();
}
bool UdpProtocol::OnChat(UdpMsg* msg, int len)
{
_chatMessages.push_back(msg->u.chat.msg);
return true;
}
void
UdpProtocol::GetNetworkStats(struct GGPONetworkStats *s)
{
@ -755,65 +654,3 @@ UdpProtocol::RecommendFrameDelay()
// XXX: require idle input should be a configuration parameter
return _timesync.recommend_frame_wait_duration(false);
}
void
UdpProtocol::SetDisconnectTimeout(int timeout)
{
_disconnect_timeout = timeout;
}
void
UdpProtocol::SetDisconnectNotifyStart(int timeout)
{
_disconnect_notify_start = timeout;
}
void
UdpProtocol::PumpSendQueue()
{
while (!_send_queue.empty()) {
QueueEntry &entry = _send_queue.front();
if (_send_latency) {
// should really come up with a gaussian distributation based on the configured
// value, but this will do for now.
int jitter = (_send_latency * 2 / 3) + ((rand() % _send_latency) / 3);
if (Platform::GetCurrentTimeMS() < _send_queue.front().queue_time + jitter) {
break;
}
}
if (_oop_percent && !_oo_packet.msg && ((rand() % 100) < _oop_percent)) {
int delay = rand() % (_send_latency * 10 + 1000);
Log("creating rogue oop (seq: %d delay: %d)\n", entry.msg->hdr.sequence_number, delay);
_oo_packet.send_time = Platform::GetCurrentTimeMS() + delay;
_oo_packet.msg = entry.msg;
_oo_packet.dest_addr = entry.dest_addr;
} else {
ASSERT(entry.dest_addr.sin_addr.s_addr);
_udp->SendTo((char *)entry.msg, entry.msg->PacketSize(), 0,
(struct sockaddr *)&entry.dest_addr, sizeof entry.dest_addr);
delete entry.msg;
}
_send_queue.pop();
}
if (_oo_packet.msg && _oo_packet.send_time < Platform::GetCurrentTimeMS()) {
Log("sending rogue oop!");
_udp->SendTo((char *)_oo_packet.msg, _oo_packet.msg->PacketSize(), 0,
(struct sockaddr *)&_oo_packet.dest_addr, sizeof _oo_packet.dest_addr);
delete _oo_packet.msg;
_oo_packet.msg = NULL;
}
}
void
UdpProtocol::ClearSendQueue()
{
while (!_send_queue.empty()) {
delete _send_queue.front().msg;
_send_queue.pop();
}
}

View File

@ -8,8 +8,7 @@
#ifndef _UDP_PROTO_H_
#define _UDP_PROTO_H_
#include "poll.h"
#include "udp.h"
#include "enet/enet.h"
#include "udp_msg.h"
#include "game_input.h"
#include "timesync.h"
@ -19,7 +18,7 @@
#include <vector>
#include <string>
#include <map>
class UdpProtocol : public IPollSink
class UdpProtocol
{
public:
struct Stats {
@ -29,7 +28,6 @@ public:
float av_remote_frame_advantage;
float av_local_frame_advantage;
int send_queue_len;
Udp::Stats udp;
};
struct Event {
@ -39,9 +37,6 @@ public:
Synchronizing,
Synchronzied,
Input,
Disconnected,
NetworkInterrupted,
NetworkResumed,
};
Type type;
@ -62,23 +57,23 @@ public:
};
public:
virtual bool OnLoopPoll(void *cookie);
bool NetworkIdle();
public:
UdpProtocol();
virtual ~UdpProtocol();
~UdpProtocol();
void Init(Udp *udp, Poll &p, int queue, char *ip, u_short port, UdpMsg::connect_status *status);
ENetPeer* GetENetPeer() const { return _peer; }
void Init(ENetPeer* peer, int queue, UdpMsg::connect_status *status);
void Synchronize();
bool GetPeerConnectStatus(int id, int *frame);
bool IsInitialized() { return _udp != NULL; }
bool IsInitialized() { return _peer != nullptr; }
bool IsSynchronized() { return _current_state == Running; }
bool IsRunning() { return _current_state == Running; }
void SendInput(GameInput &input);
void SendChat(const char* message);
void SendInputAck();
bool HandlesMsg(sockaddr_in &from, UdpMsg *msg);
void OnMsg(UdpMsg *msg, int len);
void Disconnect();
@ -87,10 +82,7 @@ public:
void SetLocalFrameNumber(int num);
float RecommendFrameDelay();
int RemoteFrameDelay()const;
void SetDisconnectTimeout(int timeout);
void SetDisconnectNotifyStart(int timeout);
void SetFrameDelay(int delay);
void ConsumeChat(std::function<void(const char*)> onChat);
void ApplyToEvents(std::function<void(UdpProtocol::Event&)> cb);
void StartPollLoop();
void EndPollLoop();
@ -114,13 +106,11 @@ protected:
void UpdateNetworkStats(void);
void QueueEvent(const UdpProtocol::Event &evt);
void ClearSendQueue(void);
void Log(const char *fmt, ...);
void LogMsg(const char *prefix, UdpMsg *msg);
void LogEvent(const char *prefix, const UdpProtocol::Event &evt);
void SendSyncRequest();
void SendMsg(UdpMsg *msg);
void PumpSendQueue();
void SendPendingOutput();
bool OnInvalid(UdpMsg *msg, int len);
bool OnSyncRequest(UdpMsg *msg, int len);
@ -129,27 +119,16 @@ protected:
bool OnInputAck(UdpMsg *msg, int len);
bool OnQualityReport(UdpMsg *msg, int len);
bool OnQualityReply(UdpMsg *msg, int len);
bool OnKeepAlive(UdpMsg *msg, int len);
bool OnChat(UdpMsg *msg, int len);
protected:
/*
* Network transmission information
*/
Udp *_udp;
sockaddr_in _peer_addr;
ENetPeer *_peer;
uint16 _magic_number;
int _queue;
uint16 _remote_magic_number;
bool _connected;
int _send_latency;
int _oop_percent;
struct {
int send_time;
sockaddr_in dest_addr;
UdpMsg* msg;
} _oo_packet;
RingBuffer<QueueEntry, 64> _send_queue;
/*
* Stats
@ -194,11 +173,6 @@ protected:
GameInput _last_acked_input;
unsigned int _last_send_time;
unsigned int _last_recv_time;
unsigned int _shutdown_timeout;
unsigned int _disconnect_event_sent;
unsigned int _disconnect_timeout;
unsigned int _disconnect_notify_start;
bool _disconnect_notify_sent;
uint16 _next_send_seq;
uint16 _next_recv_seq;
@ -212,7 +186,6 @@ protected:
* Event queue
*/
RingBuffer<UdpProtocol::Event, 64> _event_queue;
std::vector<std::string> _chatMessages;
};
#endif

View File

@ -1,72 +0,0 @@
/* -----------------------------------------------------------------------
* GGPO.net (http://ggpo.net) - Copyright 2009 GroundStorm Studios, LLC.
*
* Use of this software is governed by the MIT license that can be found
* in the LICENSE file.
*/
#include "types.h"
#include "poll.h"
Poll::Poll(void) :
_start_time(0)
{
}
//
//void
//Poll::RegisterMsgLoop(IPollSink *sink, void *cookie)
//{
// _msg_sinks.push_back(PollSinkCb(sink, cookie));
//}
void
Poll::RegisterLoop(IPollSink *sink, void *cookie)
{
_loop_sinks.push_back(PollSinkCb(sink, cookie));
}
//void
//Poll::RegisterPeriodic(IPollSink *sink, int interval, void *cookie)
//{
// _periodic_sinks.push_back(PollPeriodicSinkCb(sink, cookie, interval));
//}
void
Poll::Run()
{
while (Pump(100)) {
continue;
}
}
bool
Poll::Pump(int timeout)
{
int i;
bool finished = false;
for (i = 0; i < _loop_sinks.size(); i++) {
PollSinkCb &cb = _loop_sinks[i];
finished = !cb.sink->OnLoopPoll(cb.cookie) || finished;
}
return finished;
}
//
//int
//Poll::ComputeWaitTime(int elapsed)
//{
// int waitTime = INFINITE;
// size_t count = _periodic_sinks.size();
//
// if (count > 0) {
// for (int i = 0; i < count; i++) {
// PollPeriodicSinkCb &cb = _periodic_sinks[i];
// int timeout = (cb.interval + cb.last_fired) - elapsed;
// if (waitTime == INFINITE || (timeout < waitTime)) {
// waitTime = MAX(timeout, 0);
// }
// }
// }
// return waitTime;
//}

View File

@ -1,56 +0,0 @@
/* -----------------------------------------------------------------------
* GGPO.net (http://ggpo.net) - Copyright 2009 GroundStorm Studios, LLC.
*
* Use of this software is governed by the MIT license that can be found
* in the LICENSE file.
*/
#ifndef _POLL_H
#define _POLL_H
#include "static_buffer.h"
#define MAX_POLLABLE_HANDLES 64
class IPollSink {
public:
virtual ~IPollSink() { }
//virtual bool OnMsgPoll(void*) = 0;//{ return true; }
// virtual bool OnPeriodicPoll(void*, int) = 0;// { return true; }
virtual bool OnLoopPoll(void*) = 0;// { return true; }
};
class Poll {
public:
Poll(void);
void RegisterLoop(IPollSink *sink, void *cookie = NULL);
void Run();
bool Pump(int timeout);
protected:
//int ComputeWaitTime(int elapsed);
struct PollSinkCb {
IPollSink *sink;
void *cookie;
PollSinkCb() : sink(NULL), cookie(NULL) { }
PollSinkCb(IPollSink *s, void *c) : sink(s), cookie(c) { }
};
struct PollPeriodicSinkCb : public PollSinkCb {
int interval;
int last_fired;
PollPeriodicSinkCb() : PollSinkCb(NULL, NULL), interval(0), last_fired(0) { }
PollPeriodicSinkCb(IPollSink *s, void *c, int i) :
PollSinkCb(s, c), interval(i), last_fired(0) { }
};
int _start_time;
// StaticBuffer<PollSinkCb, 16> _msg_sinks;
StaticBuffer<PollSinkCb, 16> _loop_sinks;
// StaticBuffer<PollPeriodicSinkCb, 16> _periodic_sinks;
};
#endif

View File

@ -9,7 +9,7 @@
<PreprocessorDefinitions Condition="('$(Platform)'=='x64' Or '$(Platform)'=='ARM' Or '$(Platform)'=='ARM64')">WITH_RECOMPILER=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions Condition="('$(Platform)'=='x64' Or '$(Platform)'=='ARM64')">WITH_MMAP_FASTMEM=1;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>$(SolutionDir)dep\ggpo-x\include;$(SolutionDir)dep\tinyxml2\include;$(SolutionDir)dep\glad\include;$(SolutionDir)dep\stb\include;$(SolutionDir)dep\imgui\include;$(SolutionDir)dep\xxhash\include;$(SolutionDir)dep\zlib\include;$(SolutionDir)dep\rcheevos\include;$(SolutionDir)dep\rapidjson\include;$(SolutionDir)src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>$(SolutionDir)dep\enet\include;$(SolutionDir)dep\ggpo-x\include;$(SolutionDir)dep\tinyxml2\include;$(SolutionDir)dep\glad\include;$(SolutionDir)dep\stb\include;$(SolutionDir)dep\imgui\include;$(SolutionDir)dep\xxhash\include;$(SolutionDir)dep\zlib\include;$(SolutionDir)dep\rcheevos\include;$(SolutionDir)dep\rapidjson\include;$(SolutionDir)src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Platform)'!='ARM64'">$(SolutionDir)dep\rainterface;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories Condition="'$(Platform)'=='x64'">$(SolutionDir)dep\xbyak\xbyak;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
@ -19,7 +19,7 @@
<ItemDefinitionGroup>
<Lib>
<AdditionalDependencies>$(RootBuildDir)ggpo-x\ggpo-x.lib;$(RootBuildDir)tinyxml2\tinyxml2.lib;$(RootBuildDir)rcheevos\rcheevos.lib;$(RootBuildDir)imgui\imgui.lib;$(RootBuildDir)stb\stb.lib;$(RootBuildDir)xxhash\xxhash.lib;$(RootBuildDir)zlib\zlib.lib;$(RootBuildDir)util\util.lib;$(RootBuildDir)common\common.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalDependencies>$(RootBuildDir)ggpo-x\ggpo-x.lib;$(RootBuildDir)enet\enet.lib;$(RootBuildDir)tinyxml2\tinyxml2.lib;$(RootBuildDir)rcheevos\rcheevos.lib;$(RootBuildDir)imgui\imgui.lib;$(RootBuildDir)stb\stb.lib;$(RootBuildDir)xxhash\xxhash.lib;$(RootBuildDir)zlib\zlib.lib;$(RootBuildDir)util\util.lib;$(RootBuildDir)common\common.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalDependencies Condition="'$(Platform)'!='ARM64'">$(RootBuildDir)rainterface\rainterface.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalDependencies Condition="'$(Platform)'=='ARM64'">$(RootBuildDir)vixl\vixl.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Lib>

View File

@ -7,12 +7,14 @@
#include "common/timer.h"
#include "digital_controller.h"
#include "ggponet.h"
#include "enet/enet.h"
#include "host.h"
#include "host_settings.h"
#include "pad.h"
#include "spu.h"
#include "system.h"
#include <bitset>
#include <gsl/span>
#include <deque>
#include <xxhash.h>
Log_SetChannel(Netplay);
@ -30,6 +32,8 @@ struct Input
u32 button_data;
};
static bool InitializeEnet();
static bool NpAdvFrameCb(void* ctx, int flags);
static bool NpSaveFrameCb(void* ctx, unsigned char** buffer, int* len, int* checksum, int frame);
static bool NpLoadFrameCb(void* ctx, unsigned char* buffer, int len, int rb_frames, int frame_to_load);
@ -37,6 +41,7 @@ static bool NpBeginGameCb(void* ctx, const char* game_name);
static void NpFreeBuffCb(void* ctx, void* buffer, int frame);
static bool NpOnEventCb(void* ctx, GGPOEvent* ev);
static GGPOPlayerHandle PlayerIdToGGPOHandle(s32 player_id);
static Input ReadLocalInput();
static GGPOErrorCode AddLocalInput(Netplay::Input input);
static GGPOErrorCode SyncInput(Input inputs[2], int* disconnect_flags);
@ -44,8 +49,20 @@ static void SetInputs(Input inputs[2]);
static void SetSettings();
static bool CreateSystem(std::string game_path);
// ENet
static void ShutdownEnetHost();
static s32 GetFreePlayerId();
static s32 GetPlayerIdForPeer(const ENetPeer* peer);
static bool ConnectToLowerPeers(gsl::span<const ENetAddress> peer_addresses);
static bool WaitForPeerConnections();
static void HandleEnetEvent(const ENetEvent* event);
static void PollEnet(Common::Timer::Value until_time);
static void HandleControlPacket(s32 player_id, const ENetPacket* pkt);
// l = local, r = remote
static s32 Start(s32 lhandle, u16 lport, std::string& raddr, u16 rport, s32 ldelay, u32 pred);
static s32 Start(s32 lhandle, u16 lport, const std::string& raddr, u16 rport, s32 ldelay, u32 pred, std::string game_path);
static void AdvanceFrame();
static void RunFrame();
@ -62,13 +79,18 @@ static void Throttle();
// Desync Detection
static void GenerateChecksumForFrame(int* checksum, int frame, unsigned char* buffer, int buffer_size);
static void GenerateDesyncReport(s32 desync_frame);
//////////////////////////////////////////////////////////////////////////
// Variables
//////////////////////////////////////////////////////////////////////////
static MemorySettingsInterface s_settings_overlay;
static std::string s_game_path;
/// Enet
static ENetHost* s_enet_host;
static std::array<ENetPeer*, MAX_PLAYERS> s_enet_peers;
static s32 s_player_id;
static GGPOPlayerHandle s_local_handle = GGPO_INVALID_HANDLE;
static GGPONetworkStats s_last_net_stats{};
@ -88,11 +110,301 @@ static s32 s_next_timesync_recovery_frame = -1;
// Netplay Impl
s32 Netplay::Start(s32 lhandle, u16 lport, std::string& raddr, u16 rport, s32 ldelay, u32 pred)
bool Netplay::CreateSystem(std::string game_path)
{
// close system if its already running
if (System::IsValid())
System::ShutdownSystem(false);
// fast boot the selected game and wait for the other player
auto param = SystemBootParameters(std::move(game_path));
param.override_fast_boot = true;
return System::BootSystem(param);
}
bool Netplay::InitializeEnet()
{
static bool enet_initialized = false;
int rc;
if (!enet_initialized && (rc = enet_initialize()) != 0)
{
Log_ErrorPrintf("enet_initialize() returned %d", rc);
return false;
}
std::atexit(enet_deinitialize);
enet_initialized = true;
return true;
}
void Netplay::ShutdownEnetHost()
{
Log_DevPrint("Disconnecting all peers");
// forcefully disconnect all peers
// TODO: do we want to send disconnect requests and wait a bit?
for (u32 i = 0; i < MAX_PLAYERS; i++)
{
if (s_enet_peers[i])
{
enet_peer_reset(s_enet_peers[i]);
s_enet_peers[i] = nullptr;
}
}
enet_host_destroy(s_enet_host);
s_enet_host = nullptr;
}
GGPOPlayerHandle PlayerIdToGGPOHandle(s32 player_id)
{
return player_id + 1;
}
s32 Netplay::GetPlayerIdForPeer(const ENetPeer* peer)
{
for (s32 i = 0; i < MAX_PLAYERS; i++)
{
if (s_enet_peers[i] == peer)
return i;
}
return -1;
}
s32 Netplay::GetFreePlayerId()
{
for (s32 i = 0; i < MAX_PLAYERS; i++)
{
if (i != s_player_id && !s_enet_peers[i])
return i;
}
return -1;
}
void Netplay::HandleEnetEvent(const ENetEvent* event)
{
switch (event->type)
{
case ENET_EVENT_TYPE_CONNECT:
{
// skip when it's one we set up ourselves, we're handling it in ConnectToLowerPeers().
if (GetPlayerIdForPeer(event->peer) >= 0)
return;
// TODO: the player ID should either come from the packet (for the non-first player),
// or be auto-assigned as below, for the connection to the first host
const s32 new_player_id = GetFreePlayerId();
Log_DevPrintf("Enet connect event: New client with id %d", new_player_id);
if (new_player_id < 0)
{
Log_ErrorPrintf("No free slots, disconnecting client");
enet_peer_disconnect(event->peer, 1);
return;
}
s_enet_peers[new_player_id] = event->peer;
}
break;
case ENET_EVENT_TYPE_DISCONNECT:
{
const s32 player_id = GetPlayerIdForPeer(event->peer);
if (player_id < 0)
return;
// TODO: This one's gonna get kinda tricky... who do we orphan when they disconnect?
Log_WarningPrintf("ENet player %d disconnected", player_id);
Host::OnNetplayMessage(fmt::format("*** DISCONNECTED PLAYER {} ***", player_id));
ggpo_disconnect_player(s_ggpo, PlayerIdToGGPOHandle(player_id));
s_enet_peers[player_id] = nullptr;
}
break;
case ENET_EVENT_TYPE_RECEIVE:
{
const s32 player_id = GetPlayerIdForPeer(event->peer);
if (player_id < 0)
{
Log_WarningPrintf("Received packet from unknown player");
return;
}
if (event->channelID == ENET_CHANNEL_CONTROL)
{
HandleControlPacket(player_id, event->packet);
}
else if (event->channelID == ENET_CHANNEL_GGPO)
{
Log_TracePrintf("Received %zu ggpo bytes from player %d", event->packet->dataLength, player_id);
const int rc = ggpo_handle_packet(s_ggpo, event->peer, event->packet);
if (rc != GGPO_OK)
Log_ErrorPrintf("Failed to process GGPO packet!");
}
else
{
Log_ErrorPrintf("Unexpected packet channel %u", event->channelID);
}
}
break;
default:
{
Log_WarningPrintf("Unhandled enet event %d", event->type);
}
break;
}
}
void Netplay::PollEnet(Common::Timer::Value until_time)
{
ENetEvent event;
u64 current_time = Common::Timer::GetCurrentValue();
for (;;)
{
const u32 enet_timeout = (current_time >= until_time) ?
0 :
static_cast<u32>(Common::Timer::ConvertValueToMilliseconds(until_time - current_time));
const int res = enet_host_service(s_enet_host, &event, enet_timeout);
if (res > 0)
{
HandleEnetEvent(&event);
// make sure we get all events
current_time = Common::Timer::GetCurrentValue();
continue;
}
// exit once we're nonblocking
current_time = Common::Timer::GetCurrentValue();
if (enet_timeout == 0 || current_time >= until_time)
break;
}
}
bool Netplay::ConnectToLowerPeers(gsl::span<const ENetAddress> peer_addresses)
{
for (size_t i = 0; i < peer_addresses.size(); i++)
{
char ipstr[32];
if (enet_address_get_host_ip(&peer_addresses[i], ipstr, std::size(ipstr)) != 0)
ipstr[0] = 0;
Log_DevPrintf("Starting connection to peer %u at %s:%u", i, ipstr, peer_addresses[i].port);
DebugAssert(i != s_player_id);
s_enet_peers[i] = enet_host_connect(s_enet_host, &peer_addresses[i], NUM_ENET_CHANNELS, 0);
if (!s_enet_peers[i])
{
Log_ErrorPrintf("enet_host_connect() for peer %u failed", i);
return false;
}
}
return true;
}
bool Netplay::WaitForPeerConnections()
{
static constexpr float MAX_CONNECT_TIME = 30.0f;
Common::Timer timeout;
const u32 clients_to_connect = MAX_PLAYERS - 1;
for (;;)
{
// TODO: Handle early shutdown/cancel request.
u32 num_connected_peers = 0;
for (s32 i = 0; i < MAX_PLAYERS; i++)
{
if (i != s_player_id && s_enet_peers[i] && s_enet_peers[i]->state == ENET_PEER_STATE_CONNECTED)
num_connected_peers++;
}
if (num_connected_peers == clients_to_connect)
break;
if (timeout.GetTimeSeconds() >= MAX_CONNECT_TIME)
{
Log_ErrorPrintf("Peer connection timeout");
return false;
}
Host::PumpMessagesOnCPUThread();
Host::DisplayLoadingScreen("Connected to netplay peers", 0, clients_to_connect, num_connected_peers);
const Common::Timer::Value poll_end_time =
Common::Timer::GetCurrentValue() + Common::Timer::ConvertMillisecondsToValue(16);
PollEnet(poll_end_time);
}
Log_InfoPrint("Peer connection complete.");
return true;
}
void Netplay::HandleControlPacket(s32 player_id, const ENetPacket* pkt)
{
// TODO
Log_ErrorPrintf("Unhandled control packet from player %d of size %zu", player_id, pkt->dataLength);
}
s32 Netplay::Start(s32 lhandle, u16 lport, const std::string& raddr, u16 rport, s32 ldelay, u32 pred, std::string game_path)
{
SetSettings();
if (!InitializeEnet())
return -1;
ENetAddress host_address;
host_address.host = ENET_HOST_ANY;
host_address.port = lport;
s_enet_host = enet_host_create(&host_address, MAX_PLAYERS - 1, NUM_ENET_CHANNELS, 0, 0);
if (!s_enet_host)
{
Log_ErrorPrintf("Failed to create enet host.");
return -1;
}
// Connect to any lower-ID'ed hosts.
// Eventually we'll assign these IDs as players connect, and everyone not starting it will get their ID sent back
s_player_id = lhandle - 1;
std::array<ENetAddress, MAX_PLAYERS> peer_addresses;
const u32 num_peer_addresses = s_player_id;
DebugAssert(num_peer_addresses == 0 || num_peer_addresses == 1);
if (num_peer_addresses == 1)
{
// TODO: rewrite this when we support more players
const u32 other_player_id = (lhandle == 1) ? 1 : 0;
if (enet_address_set_host_ip(&peer_addresses[other_player_id], raddr.c_str()) != 0)
{
Log_ErrorPrintf("Failed to parse host: '%s'", raddr.c_str());
ShutdownEnetHost();
return -1;
}
peer_addresses[other_player_id].port = rport;
}
// Create system.
if (!CreateSystem(std::move(game_path)))
{
Log_ErrorPrintf("Failed to create system.");
ShutdownEnetHost();
return -1;
}
InitializeFramePacing();
// Connect to all peers.
if ((num_peer_addresses > 0 &&
!ConnectToLowerPeers(gsl::span<const ENetAddress>(peer_addresses).subspan(0, num_peer_addresses))) ||
!WaitForPeerConnections())
{
// System shutdown cleans up enet.
return -1;
}
/*
TODO: since saving every frame during rollback loses us time to do actual gamestate iterations it might be better to
hijack the update / save / load cycle to only save every confirmed frame only saving when actually needed.
@ -108,37 +420,51 @@ s32 Netplay::Start(s32 lhandle, u16 lport, std::string& raddr, u16 rport, s32 ld
GGPOErrorCode result;
result =
ggpo_start_session(&s_ggpo, &cb, "Duckstation-Netplay", 2, sizeof(Netplay::Input), lport, MAX_ROLLBACK_FRAMES);
// result = ggpo_start_synctest(&s_ggpo, &cb, (char*)"asdf", 2, sizeof(Netplay::Input), 1);
result = ggpo_start_session(&s_ggpo, &cb, "Duckstation-Netplay", MAX_PLAYERS, sizeof(Netplay::Input), lport, MAX_ROLLBACK_FRAMES);
if (!GGPO_SUCCEEDED(result))
{
Log_ErrorPrintf("ggpo_start_session() failed: %d", result);
return -1;
}
ggpo_set_disconnect_timeout(s_ggpo, 2000);
ggpo_set_disconnect_notify_start(s_ggpo, 1000);
for (int i = 1; i <= 2; i++)
for (s32 i = 0; i < MAX_PLAYERS; i++)
{
GGPOPlayer player = {};
GGPOPlayerHandle handle = 0;
if (!s_enet_peers[i] && i != s_player_id)
continue;
player.size = sizeof(GGPOPlayer);
player.player_num = i;
// This is *awful*. Need to merge the player IDs, enough of this indices-start-at-1 rubbish.
const GGPOPlayerHandle expected_handle = PlayerIdToGGPOHandle(i);
if (lhandle == i)
GGPOPlayer player = { sizeof(GGPOPlayer) };
GGPOPlayerHandle got_handle;
player.player_num = expected_handle;
if (i == s_player_id)
{
player.type = GGPOPlayerType::GGPO_PLAYERTYPE_LOCAL;
result = ggpo_add_player(s_ggpo, &player, &handle);
s_local_handle = handle;
player.type = GGPO_PLAYERTYPE_LOCAL;
result = ggpo_add_player(s_ggpo, &player, &got_handle);
if (GGPO_SUCCEEDED(result))
s_local_handle = got_handle;
}
else
{
player.type = GGPOPlayerType::GGPO_PLAYERTYPE_REMOTE;
StringUtil::Strlcpy(player.u.remote.ip_address, raddr.c_str(), std::size(player.u.remote.ip_address));
player.u.remote.port = rport;
result = ggpo_add_player(s_ggpo, &player, &handle);
player.type = GGPO_PLAYERTYPE_REMOTE;
player.u.remote.peer = s_enet_peers[i];
result = ggpo_add_player(s_ggpo, &player, &got_handle);
}
if (!GGPO_SUCCEEDED(result))
{
Log_ErrorPrintf("Failed to add player %d", i);
return -1;
}
Assert(expected_handle == got_handle);
}
ggpo_set_frame_delay(s_ggpo, s_local_handle, ldelay);
ggpo_set_manual_network_polling(s_ggpo, true);
return result;
}
@ -152,6 +478,8 @@ void Netplay::CloseSession()
s_save_buffer_pool.clear();
s_local_handle = GGPO_INVALID_HANDLE;
ShutdownEnetHost();
// Restore original settings.
Host::Internal::SetNetplaySettingsLayer(nullptr);
System::ApplySettings(false);
@ -258,8 +586,10 @@ void Netplay::Throttle()
const Common::Timer::Value sleep_period = Common::Timer::ConvertMillisecondsToValue(1);
for (;;)
{
// Poll network.
ggpo_poll_network(s_ggpo);
// TODO: make better, we can tell this function to stall until the next frame
PollEnet(0);
ggpo_network_idle(s_ggpo);
PollEnet(0);
current_time = Common::Timer::GetCurrentValue();
if (current_time >= s_next_frame_time)
@ -322,7 +652,12 @@ void Netplay::AdvanceFrame()
void Netplay::RunFrame()
{
// housekeeping
// TODO: get rid of double polling
PollEnet(0);
ggpo_network_idle(s_ggpo);
PollEnet(0);
ggpo_idle(s_ggpo);
// run game
auto result = GGPO_OK;
int disconnect_flags = 0;
@ -358,6 +693,11 @@ void Netplay::CollectInput(u32 slot, u32 bind, float value)
s_net_input[slot][bind] = value;
}
GGPOPlayerHandle Netplay::PlayerIdToGGPOHandle(s32 player_id)
{
return player_id + 1;
}
Netplay::Input Netplay::ReadLocalInput()
{
// get controller data of the first controller (0 internally)
@ -372,7 +712,6 @@ Netplay::Input Netplay::ReadLocalInput()
void Netplay::SendMsg(const char* msg)
{
ggpo_client_chat(s_ggpo, msg);
}
GGPOErrorCode Netplay::SyncInput(Netplay::Input inputs[2], int* disconnect_flags)
@ -414,13 +753,15 @@ void Netplay::StartNetplaySession(s32 local_handle, u16 local_port, std::string&
// dont want to start a session when theres already one going on.
if (IsActive())
return;
// set game path for later loading during the begin game callback
s_game_path = std::move(game_path);
// create session
int result = Netplay::Start(local_handle, local_port, remote_addr, remote_port, input_delay, MAX_ROLLBACK_FRAMES);
int result = Netplay::Start(local_handle, local_port, remote_addr, remote_port, input_delay, MAX_ROLLBACK_FRAMES, std::move(game_path));
// notify that the session failed
if (result != GGPO_OK)
{
Log_ErrorPrintf("Failed to Create Netplay Session! Error: %d", result);
System::ShutdownSystem(false);
}
else
{
// Load savestate if available
@ -465,17 +806,6 @@ void Netplay::ExecuteNetplay()
bool Netplay::NpBeginGameCb(void* ctx, const char* game_name)
{
// close system if its already running
if (System::IsValid())
System::ShutdownSystem(false);
// fast boot the selected game and wait for the other player
auto param = SystemBootParameters(s_game_path);
param.override_fast_boot = true;
if (!System::BootSystem(param))
{
StopNetplaySession();
return false;
}
SPU::SetAudioOutputMuted(true);
// Fast Forward to Game Start if needed.
while (System::GetInternalFrameNumber() < 2)
@ -554,22 +884,9 @@ bool Netplay::NpOnEventCb(void* ctx, GGPOEvent* ev)
case GGPOEventCode::GGPO_EVENTCODE_SYNCHRONIZED_WITH_PEER:
Host::OnNetplayMessage(fmt::format("Netplay Synchronized With Player: {}", ev->u.synchronized.player));
break;
case GGPOEventCode::GGPO_EVENTCODE_DISCONNECTED_FROM_PEER:
Host::OnNetplayMessage(fmt::format("Netplay Player: %d Disconnected", ev->u.disconnected.player));
break;
case GGPOEventCode::GGPO_EVENTCODE_RUNNING:
Host::OnNetplayMessage("Netplay Is Running");
break;
case GGPOEventCode::GGPO_EVENTCODE_CONNECTION_INTERRUPTED:
Host::OnNetplayMessage(fmt::format("Netplay Player: {} Connection Interupted, Timeout: {}", ev->u.connection_interrupted.player,
ev->u.connection_interrupted.disconnect_timeout));
break;
case GGPOEventCode::GGPO_EVENTCODE_CONNECTION_RESUMED:
Host::OnNetplayMessage(fmt::format("Netplay Player: {} Connection Resumed", ev->u.connection_resumed.player));
break;
case GGPOEventCode::GGPO_EVENTCODE_CHAT:
Host::OnNetplayMessage(ev->u.chat.msg);
break;
case GGPOEventCode::GGPO_EVENTCODE_TIMESYNC:
HandleTimeSyncEvent(ev->u.timesync.frames_ahead, ev->u.timesync.timeSyncPeriodInFrames);
break;

View File

@ -5,7 +5,7 @@
namespace Netplay {
enum : u32
enum : s32
{
// Maximum number of emulated controllers.
MAX_PLAYERS = 2,
@ -13,6 +13,14 @@ enum : u32
MAX_ROLLBACK_FRAMES = 8,
};
enum : u8
{
ENET_CHANNEL_CONTROL = 0,
ENET_CHANNEL_GGPO = 1,
NUM_ENET_CHANNELS,
};
void StartNetplaySession(s32 local_handle, u16 local_port, std::string& remote_addr, u16 remote_port, s32 input_delay,
std::string game_path);
void StopNetplaySession();

View File

@ -1089,6 +1089,9 @@ void EmuThread::startNetplaySession(int local_handle, quint16 local_port, const
auto remAddr = remote_addr.trimmed().toStdString();
auto gamePath = game_path.trimmed().toStdString();
Netplay::StartNetplaySession(local_handle, local_port, remAddr, remote_port, input_delay, gamePath);
// TODO: Fix this junk.. for some reason, it stays sleeping...
g_emu_thread->wakeThread();
}
void EmuThread::sendNetplayMessage(const QString& message)
@ -2243,6 +2246,9 @@ int main(int argc, char* argv[])
std::string remote = "127.0.0.1";
std::string game = "D:\\PSX\\chd\\padtest.chd";
Netplay::StartNetplaySession(h, port_base + h, remote, port_base + nh, 1, game);
// TODO: Fix this junk.. for some reason, it stays sleeping...
g_emu_thread->wakeThread();
});
}

View File

@ -466,14 +466,14 @@ void Host::DisplayLoadingScreen(const char* message, int progress_min /*= -1*/,
ImGui::Text("%s: %d/%d", message, progress_value, progress_max);
ImGui::ProgressBar(static_cast<float>(progress_value) / static_cast<float>(progress_max - progress_min),
ImVec2(-1.0f, 0.0f), "");
Log_InfoPrintf("%s: %d/%d", message, progress_value, progress_max);
Log_DebugPrintf("%s: %d/%d", message, progress_value, progress_max);
}
else
{
const ImVec2 text_size(ImGui::CalcTextSize(message));
ImGui::SetCursorPosX((width - text_size.x) / 2.0f);
ImGui::TextUnformatted(message);
Log_InfoPrintf("%s", message);
Log_DebugPrintf("%s", message);
}
}
ImGui::End();