diff --git a/dep/ggpo-x/include/ggponet.h b/dep/ggpo-x/include/ggponet.h index cfce0b07e..f0e9b3f3a 100644 --- a/dep/ggpo-x/include/ggponet.h +++ b/dep/ggpo-x/include/ggponet.h @@ -201,12 +201,6 @@ typedef struct { * functions during the game. All callback functions must be implemented. */ typedef struct { - /* - * begin_game callback - This callback has been deprecated. You must - * implement it, but should ignore the 'game' parameter. - */ - bool (__cdecl *begin_game)(void* context, const char *game); - /* * save_game_state - The client should allocate a buffer, copy the * entire contents of the current game state into it, and copy the @@ -340,10 +334,8 @@ typedef struct GGPONetworkStats { */ GGPO_API GGPOErrorCode __cdecl ggpo_start_session(GGPOSession **session, GGPOSessionCallbacks *cb, - const char *game, int num_players, int input_size, - unsigned short localport, int maxPrediction); @@ -389,7 +381,6 @@ GGPO_API GGPOErrorCode __cdecl ggpo_add_player(GGPOSession *session, */ GGPO_API GGPOErrorCode __cdecl ggpo_start_synctest(GGPOSession **session, GGPOSessionCallbacks *cb, - char *game, int num_players, int input_size, int frames); @@ -421,12 +412,10 @@ GGPO_API GGPOErrorCode __cdecl ggpo_start_synctest(GGPOSession **session, */ GGPO_API GGPOErrorCode __cdecl ggpo_start_spectating(GGPOSession **session, GGPOSessionCallbacks *cb, - const char *game, int num_players, int input_size, - unsigned short local_port, - char *host_ip, - unsigned short host_port); + ENetPeer* host); + /* * ggpo_close_session -- diff --git a/dep/ggpo-x/src/backends/p2p.cpp b/dep/ggpo-x/src/backends/p2p.cpp index cfd1a3e1e..b02fc3ad3 100644 --- a/dep/ggpo-x/src/backends/p2p.cpp +++ b/dep/ggpo-x/src/backends/p2p.cpp @@ -10,8 +10,6 @@ static const int RECOMMENDATION_INTERVAL = 120; Peer2PeerBackend::Peer2PeerBackend(GGPOSessionCallbacks *cb, - const char *gamename, - uint16 localport, int num_players, int input_size, int nframes) : _num_players(num_players), @@ -39,11 +37,6 @@ Peer2PeerBackend::Peer2PeerBackend(GGPOSessionCallbacks *cb, for (int i = 0; i < ARRAY_SIZE(_local_connect_status); i++) { _local_connect_status[i].last_frame = -1; } - - /* - * Preload the ROM - */ - _callbacks.begin_game(_callbacks.context, gamename); } Peer2PeerBackend::~Peer2PeerBackend() @@ -299,6 +292,11 @@ Peer2PeerBackend::AddPlayer(GGPOPlayer *player, if (player->type == GGPO_PLAYERTYPE_REMOTE) { AddRemotePlayer(player->u.remote.peer, queue); } + + // no other players in this session? + if (player->type == GGPO_PLAYERTYPE_LOCAL && _num_players == 1) + _synchronizing = false; + return GGPO_OK; } diff --git a/dep/ggpo-x/src/backends/p2p.h b/dep/ggpo-x/src/backends/p2p.h index d11ad7094..fef75010d 100644 --- a/dep/ggpo-x/src/backends/p2p.h +++ b/dep/ggpo-x/src/backends/p2p.h @@ -17,7 +17,7 @@ class Peer2PeerBackend final : public GGPOSession { public: - Peer2PeerBackend(GGPOSessionCallbacks *cb, const char *gamename, uint16 localport, int num_players, int input_size, int nframes); + Peer2PeerBackend(GGPOSessionCallbacks *cb, int num_players, int input_size, int nframes); virtual ~Peer2PeerBackend(); diff --git a/dep/ggpo-x/src/backends/spectator.cpp b/dep/ggpo-x/src/backends/spectator.cpp index ebf7c6955..e69661cd2 100644 --- a/dep/ggpo-x/src/backends/spectator.cpp +++ b/dep/ggpo-x/src/backends/spectator.cpp @@ -7,16 +7,8 @@ #include "spectator.h" -SpectatorBackend::SpectatorBackend(GGPOSessionCallbacks *cb, - const char* gamename, - uint16 localport, - int num_players, - int input_size, - char *hostip, - u_short hostport) : - _num_players(num_players), - _input_size(input_size), - _next_input_to_send(0) +SpectatorBackend::SpectatorBackend(GGPOSessionCallbacks* cb, int num_players, int input_size, ENetPeer* peer) + : _num_players(num_players), _input_size(input_size), _next_input_to_send(0) { _callbacks = *cb; _synchronizing = true; @@ -37,11 +29,6 @@ SpectatorBackend::SpectatorBackend(GGPOSessionCallbacks *cb, */ //_host.Init(&_udp, _poll, 0, hostip, hostport, NULL); _host.Synchronize(); - - /* - * Preload the ROM - */ - _callbacks.begin_game(_callbacks.context, gamename); } SpectatorBackend::~SpectatorBackend() diff --git a/dep/ggpo-x/src/backends/spectator.h b/dep/ggpo-x/src/backends/spectator.h index a0e39c2ef..d9324421c 100644 --- a/dep/ggpo-x/src/backends/spectator.h +++ b/dep/ggpo-x/src/backends/spectator.h @@ -18,7 +18,7 @@ class SpectatorBackend final : public GGPOSession { public: - SpectatorBackend(GGPOSessionCallbacks *cb, const char *gamename, uint16 localport, int num_players, int input_size, char *hostip, u_short hostport); + SpectatorBackend(GGPOSessionCallbacks *cb, int num_players, int input_size, ENetPeer* peer); virtual ~SpectatorBackend(); diff --git a/dep/ggpo-x/src/backends/synctest.cpp b/dep/ggpo-x/src/backends/synctest.cpp index 39c858aef..cdb511c8e 100644 --- a/dep/ggpo-x/src/backends/synctest.cpp +++ b/dep/ggpo-x/src/backends/synctest.cpp @@ -8,7 +8,6 @@ #include "synctest.h" SyncTestBackend::SyncTestBackend(GGPOSessionCallbacks *cb, - char *gamename, int frames, int num_players) : _sync(NULL, MAX_PREDICTION_FRAMES) @@ -21,7 +20,6 @@ SyncTestBackend::SyncTestBackend(GGPOSessionCallbacks *cb, _running = false; _logfp = NULL; _current_input.erase(); - strcpy_s(_game, gamename); /* * Initialize the synchronziation layer @@ -30,11 +28,6 @@ SyncTestBackend::SyncTestBackend(GGPOSessionCallbacks *cb, config.callbacks = _callbacks; config.num_prediction_frames = MAX_PREDICTION_FRAMES; _sync.Init(config); - - /* - * Preload the ROM - */ - _callbacks.begin_game(_callbacks.context, gamename); } SyncTestBackend::~SyncTestBackend() diff --git a/dep/ggpo-x/src/backends/synctest.h b/dep/ggpo-x/src/backends/synctest.h index e425d9c5f..a9e4f64ee 100644 --- a/dep/ggpo-x/src/backends/synctest.h +++ b/dep/ggpo-x/src/backends/synctest.h @@ -15,7 +15,7 @@ class SyncTestBackend final : public GGPOSession { public: - SyncTestBackend(GGPOSessionCallbacks *cb, char *gamename, int frames, int num_players); + SyncTestBackend(GGPOSessionCallbacks *cb, int frames, int num_players); virtual ~SyncTestBackend(); virtual GGPOErrorCode DoPoll(); @@ -52,7 +52,6 @@ protected: bool _rollingback; bool _running; FILE *_logfp; - char _game[128]; GameInput _current_input; GameInput _last_input; diff --git a/dep/ggpo-x/src/log.cpp b/dep/ggpo-x/src/log.cpp index 2f84cf98f..ea8acc796 100644 --- a/dep/ggpo-x/src/log.cpp +++ b/dep/ggpo-x/src/log.cpp @@ -28,6 +28,7 @@ void Log(const char *fmt, ...) void Logv(const char *fmt, va_list args) { +#if 1 if (!Platform::GetConfigBool("ggpo.log") || Platform::GetConfigBool("ggpo.log.ignore")) { return; } @@ -36,6 +37,9 @@ void Logv(const char *fmt, va_list args) fopen_s(&logfile, logbuf, "w"); } Logv(logfile, fmt, args); +#else + vfprintf(stderr, fmt, args); +#endif } void Logv(FILE *fp, const char *fmt, va_list args) diff --git a/dep/ggpo-x/src/main.cpp b/dep/ggpo-x/src/main.cpp index e1ac2ce47..5f5f36124 100644 --- a/dep/ggpo-x/src/main.cpp +++ b/dep/ggpo-x/src/main.cpp @@ -40,15 +40,11 @@ ggpo_logv(GGPOSession *ggpo, const char *fmt, va_list args) GGPOErrorCode ggpo_start_session(GGPOSession **session, GGPOSessionCallbacks *cb, - const char *game, int num_players, int input_size, - unsigned short localport, int maxPrediction) { *session= new Peer2PeerBackend(cb, - game, - localport, num_players, input_size, maxPrediction); @@ -71,12 +67,11 @@ ggpo_add_player(GGPOSession *ggpo, GGPOErrorCode ggpo_start_synctest(GGPOSession **ggpo, GGPOSessionCallbacks *cb, - char *game, int num_players, int input_size, int frames) { - *ggpo = new SyncTestBackend(cb, game, frames, num_players); + *ggpo = new SyncTestBackend(cb, frames, num_players); return GGPO_OK; } @@ -209,19 +204,10 @@ ggpo_set_disconnect_notify_start(GGPOSession *ggpo, int timeout) GGPOErrorCode ggpo_start_spectating(GGPOSession **session, GGPOSessionCallbacks *cb, - const char *game, int num_players, int input_size, - unsigned short local_port, - char *host_ip, - unsigned short host_port) + ENetPeer* host) { - *session= new SpectatorBackend(cb, - game, - local_port, - num_players, - input_size, - host_ip, - host_port); - return GGPO_OK; + *session = new SpectatorBackend(cb, num_players, input_size, host); + return GGPO_OK; } diff --git a/src/core/netplay.cpp b/src/core/netplay.cpp index 09667027e..74b0aa48a 100644 --- a/src/core/netplay.cpp +++ b/src/core/netplay.cpp @@ -1,30 +1,101 @@ #include "netplay.h" #include "common/byte_stream.h" +#include "common/file_system.h" #include "common/gpu_texture.h" #include "common/log.h" #include "common/memory_settings_interface.h" +#include "common/path.h" #include "common/string_util.h" #include "common/timer.h" #include "digital_controller.h" -#include "ggponet.h" -#include "enet/enet.h" #include "host.h" #include "host_settings.h" #include "pad.h" +#include "save_state_version.h" #include "spu.h" #include "system.h" #include -#include #include +#include #include Log_SetChannel(Netplay); #ifdef _WIN32 +#include "common/windows_headers.h" #pragma comment(lib, "ws2_32.lib") #endif +// TODO: windows.h getting picked up somewhere here... +#include "enet/enet.h" +#include "ggponet.h" + namespace Netplay { +// TODO: Put this in a header.. +enum class SessionState +{ + Inactive, + Initializing, + Connecting, + Synchronizing, + Running, +}; + +enum class ControlMessage : u32 +{ + ConnectResponse, + SynchronizeSession, + SynchronizeComplete, +}; + +#pragma pack(push, 1) +struct ControlMessageHeader +{ + ControlMessage type; + u32 size; +}; + +struct ControlConnectResponseMessage +{ + enum class Result : u32 + { + Success = 0, + ServerFull, + PlayerIDInUse, + }; + + Result result; + s32 player_id; + + static ControlMessage MessageType() { return ControlMessage::ConnectResponse; } +}; + +struct ControlSynchronizeSessionMessage +{ + struct PlayerAddress + { + u32 host; + u16 port; + s16 controller_port; // -1 if not present + }; + + ControlMessageHeader header; + s32 num_players; + PlayerAddress players[MAX_PLAYERS]; + u32 state_data_size; + // state_data_size bytes of state data follows + + static ControlMessage MessageType() { return ControlMessage::SynchronizeSession; } +}; + +struct ControlSynchronizeCompleteMessage +{ + ControlMessageHeader header; + + static ControlMessage MessageType() { return ControlMessage::SynchronizeComplete; } +}; +#pragma pack(pop) + using SaveStateBuffer = std::unique_ptr; struct Input @@ -32,12 +103,9 @@ struct Input u32 button_data; }; -static bool InitializeEnet(); - static bool NpAdvFrameCb(void* ctx, int flags); static bool NpSaveFrameCb(void* ctx, unsigned char** buffer, int* len, int* checksum, int frame); static bool NpLoadFrameCb(void* ctx, unsigned char* buffer, int len, int rb_frames, int frame_to_load); -static bool NpBeginGameCb(void* ctx, const char* game_name); static void NpFreeBuffCb(void* ctx, void* buffer, int frame); static bool NpOnEventCb(void* ctx, GGPOEvent* ev); @@ -51,18 +119,42 @@ static void SetSettings(); static bool CreateSystem(std::string game_path); +// TODO: Fatal error and shutdown helper + // ENet +static bool InitializeEnet(); static void ShutdownEnetHost(); -static s32 GetFreePlayerId(); -static s32 GetPlayerIdForPeer(const ENetPeer* peer); -static bool ConnectToLowerPeers(gsl::span peer_addresses); -static bool WaitForPeerConnections(); +static std::string PeerAddressString(const ENetPeer* peer); static void HandleEnetEvent(const ENetEvent* event); static void PollEnet(Common::Timer::Value until_time); -static void HandleControlPacket(s32 player_id, const ENetPacket* pkt); + +// Player management +static bool IsHost(); +static bool IsValidPlayerId(s32 player_id); +static s32 GetFreePlayerId(); +static ENetPeer* GetPeerForPlayer(s32 player_id); +static s32 GetPlayerIdForPeer(const ENetPeer* peer); +static bool WaitForPeerConnections(); + +// Controlpackets +static void HandleControlMessage(s32 player_id, const ENetPacket* pkt); +static void HandleConnectResponseMessage(s32 player_id, const ENetPacket* pkt); +static void HandleSynchronizeSessionMessage(s32 player_id, const ENetPacket* pkt); +static void HandleSynchronizeCompleteMessage(s32 player_id, const ENetPacket* pkt); // l = local, r = remote -static s32 Start(s32 lhandle, u16 lport, const std::string& raddr, u16 rport, s32 ldelay, u32 pred, std::string game_path); +static bool CreateGGPOSession(); +static void DestroyGGPOSession(); +static bool Start(s32 lhandle, u16 lport, const std::string& raddr, u16 rport, s32 ldelay, u32 pred, + std::string game_path); + +// Host functions. +static void HandlePeerConnectionAsHost(ENetPeer* peer, s32 claimed_player_id); +static void HandlePeerConnectionAsNonHost(ENetPeer* peer, s32 claimed_player_id); +static void HandlePeerDisconnectionAsHost(s32 player_id); +static void HandlePeerDisconnectionAsNonHost(s32 player_id); +static void Resynchronize(); +static void CheckForCompleteResynchronize(); static void AdvanceFrame(); static void RunFrame(); @@ -85,14 +177,23 @@ static void GenerateDesyncReport(s32 desync_frame); ////////////////////////////////////////////////////////////////////////// static MemorySettingsInterface s_settings_overlay; - +static SessionState s_state; /// Enet -static ENetHost* s_enet_host; -static std::array s_enet_peers; -static s32 s_player_id; +struct Peer +{ + ENetPeer* peer; + GGPOPlayerHandle ggpo_handle; +}; +static ENetHost* s_enet_host = nullptr; +static std::array s_peers; +static s32 s_host_player_id = 0; +static s32 s_player_id = 0; +static s32 s_num_players = 0; +static s32 s_synchronized_players = 0; // only valid on host static GGPOPlayerHandle s_local_handle = GGPO_INVALID_HANDLE; +static s32 s_local_delay = 0; static GGPONetworkStats s_last_net_stats{}; static GGPOSession* s_ggpo = nullptr; @@ -106,10 +207,150 @@ static Common::Timer::Value s_frame_period = 0; static Common::Timer::Value s_next_frame_time = 0; static s32 s_next_timesync_recovery_frame = -1; +////////////////////////////////////////////////////////////////////////// +// Packet helpers +////////////////////////////////////////////////////////////////////////// + +template +struct PacketWrapper +{ + ENetPacket* pkt; + + ALWAYS_INLINE const T* operator->() const { return reinterpret_cast(pkt->data); } + ALWAYS_INLINE T* operator->() { return reinterpret_cast(pkt->data); } +}; +template +static PacketWrapper NewWrappedPacket(u32 size = sizeof(T), u32 flags = 0) +{ + return PacketWrapper{enet_packet_create(nullptr, size, flags)}; +} +template +static PacketWrapper NewControlPacket(u32 size = sizeof(T), u32 flags = ENET_PACKET_FLAG_RELIABLE) +{ + PacketWrapper ret = NewWrappedPacket(size, flags); + ControlMessageHeader* hdr = reinterpret_cast(ret.pkt->data); + hdr->type = T::MessageType(); + hdr->size = size; + return ret; +} +template +static bool SendControlPacket(ENetPeer* peer, const PacketWrapper& pkt) +{ + const int rc = enet_peer_send(peer, ENET_CHANNEL_CONTROL, pkt.pkt); + if (rc != 0) + { + Log_ErrorPrintf("enet_peer_send() failed: %d", rc); + enet_packet_destroy(pkt.pkt); + return false; + } + + return true; +} +template +static bool SendControlPacket(s32 player_id, const PacketWrapper& pkt) +{ + DebugAssert(player_id >= 0 && player_id < MAX_PLAYERS && s_peers[player_id].peer); + return SendControlPacket(s_peers[player_id].peer, pkt); +} + } // namespace Netplay // Netplay Impl +bool Netplay::Start(s32 lhandle, u16 lport, const std::string& raddr, u16 rport, s32 ldelay, u32 pred, + std::string game_path) +{ + s_state = SessionState::Initializing; + + SetSettings(); + + if (!CreateSystem(std::move(game_path))) + { + Log_ErrorPrintf("Failed to create system."); + return false; + } + + if (!InitializeEnet()) + { + Log_ErrorPrintf("Failed to initialize Enet."); + return false; + } + + // Create our "host" (which is basically just our port). + ENetAddress server_address; + server_address.host = ENET_HOST_ANY; + server_address.port = lport; + s_enet_host = enet_host_create(&server_address, MAX_PLAYERS - 1, NUM_ENET_CHANNELS, 0, 0); + if (!s_enet_host) + { + Log_ErrorPrintf("Failed to create enet host."); + return false; + } + + s_host_player_id = 0; + s_local_delay = ldelay; + + // If we're the host, we can just continue on our merry way, the others will join later. + if (lhandle == 1) + { + // Starting session with a single player. + s_player_id = 0; + s_num_players = 1; + s_synchronized_players = 1; + + if (!CreateGGPOSession()) + { + Log_ErrorPrintf("Failed to create GGPO session for host."); + return false; + } + + Log_InfoPrintf("Netplay session started as host."); + s_state = SessionState::Running; + return true; + } + + // for non-hosts, we don't know our player id yet until after we connect... + s_player_id = -1; + + // Connect to host. + ENetAddress host_address; + host_address.port = rport; + if (enet_address_set_host_ip(&host_address, raddr.c_str()) != 0) + { + Log_ErrorPrintf("Failed to parse host: '%s'", raddr.c_str()); + return false; + } + + s_peers[s_host_player_id].peer = + enet_host_connect(s_enet_host, &host_address, NUM_ENET_CHANNELS, static_cast(s_player_id)); + if (!s_peers[s_host_player_id].peer) + { + Log_ErrorPrintf("Failed to start connection to host."); + return false; + } + + // Wait until we're connected to the main host. They'll send us back state to load and a full player list. + s_state = SessionState::Connecting; + return true; +} + +void Netplay::CloseSession() +{ + Assert(IsActive()); + + DestroyGGPOSession(); + ShutdownEnetHost(); + + // Restore original settings. + Host::Internal::SetNetplaySettingsLayer(nullptr); + System::ApplySettings(false); +} + +bool Netplay::IsActive() +{ + return (s_state != SessionState::Inactive); +} + bool Netplay::CreateSystem(std::string game_path) { // close system if its already running @@ -119,7 +360,18 @@ bool Netplay::CreateSystem(std::string game_path) // fast boot the selected game and wait for the other player auto param = SystemBootParameters(std::move(game_path)); param.override_fast_boot = true; - return System::BootSystem(param); + if (!System::BootSystem(param)) + return false; + +#if 1 + // Fast Forward to Game Start if needed. + SPU::SetAudioOutputMuted(true); + while (System::GetInternalFrameNumber() < 2) + System::RunFrame(); + SPU::SetAudioOutputMuted(false); +#endif + + return true; } bool Netplay::InitializeEnet() @@ -145,10 +397,10 @@ void Netplay::ShutdownEnetHost() // TODO: do we want to send disconnect requests and wait a bit? for (u32 i = 0; i < MAX_PLAYERS; i++) { - if (s_enet_peers[i]) + if (s_peers[i].peer) { - enet_peer_reset(s_enet_peers[i]); - s_enet_peers[i] = nullptr; + enet_peer_reset(s_peers[i].peer); + s_peers[i].peer = nullptr; } } @@ -156,31 +408,13 @@ void Netplay::ShutdownEnetHost() s_enet_host = nullptr; } -GGPOPlayerHandle PlayerIdToGGPOHandle(s32 player_id) +std::string Netplay::PeerAddressString(const ENetPeer* peer) { - return player_id + 1; -} + char buf[128]; + if (enet_address_get_host_ip(&peer->address, buf, std::size(buf))) + buf[0] = 0; -s32 Netplay::GetPlayerIdForPeer(const ENetPeer* peer) -{ - for (s32 i = 0; i < MAX_PLAYERS; i++) - { - if (s_enet_peers[i] == peer) - return i; - } - - return -1; -} - -s32 Netplay::GetFreePlayerId() -{ - for (s32 i = 0; i < MAX_PLAYERS; i++) - { - if (i != s_player_id && !s_enet_peers[i]) - return i; - } - - return -1; + return fmt::format("{}:{}", buf, peer->address.port); } void Netplay::HandleEnetEvent(const ENetEvent* event) @@ -189,66 +423,68 @@ void Netplay::HandleEnetEvent(const ENetEvent* event) { case ENET_EVENT_TYPE_CONNECT: { - // skip when it's one we set up ourselves, we're handling it in ConnectToLowerPeers(). - if (GetPlayerIdForPeer(event->peer) >= 0) - return; + if (IsHost()) + HandlePeerConnectionAsHost(event->peer, static_cast(event->data)); + else + HandlePeerConnectionAsNonHost(event->peer, static_cast(event->data)); - // TODO: the player ID should either come from the packet (for the non-first player), - // or be auto-assigned as below, for the connection to the first host - const s32 new_player_id = GetFreePlayerId(); - Log_DevPrintf("Enet connect event: New client with id %d", new_player_id); - - if (new_player_id < 0) - { - Log_ErrorPrintf("No free slots, disconnecting client"); - enet_peer_disconnect(event->peer, 1); - return; - } - - s_enet_peers[new_player_id] = event->peer; + return; } break; case ENET_EVENT_TYPE_DISCONNECT: { const s32 player_id = GetPlayerIdForPeer(event->peer); - if (player_id < 0) + if (s_state == SessionState::Connecting) + { + Assert(player_id == s_host_player_id); + Panic("Failed to connect to host"); return; + } + else if (s_state == SessionState::Synchronizing) + { + // let the timeout deal with it + Log_DevPrintf("Ignoring disconnection from %d while synchronizing", player_id); + return; + } - // TODO: This one's gonna get kinda tricky... who do we orphan when they disconnect? Log_WarningPrintf("ENet player %d disconnected", player_id); Host::OnNetplayMessage(fmt::format("*** DISCONNECTED PLAYER {} ***", player_id)); - ggpo_disconnect_player(s_ggpo, PlayerIdToGGPOHandle(player_id)); - s_enet_peers[player_id] = nullptr; + + Assert(IsValidPlayerId(player_id)); + if (IsHost()) + HandlePeerDisconnectionAsHost(player_id); + else + HandlePeerDisconnectionAsNonHost(player_id); } break; case ENET_EVENT_TYPE_RECEIVE: + { + const s32 player_id = GetPlayerIdForPeer(event->peer); + if (player_id < 0) { - const s32 player_id = GetPlayerIdForPeer(event->peer); - if (player_id < 0) - { - Log_WarningPrintf("Received packet from unknown player"); - return; - } - - if (event->channelID == ENET_CHANNEL_CONTROL) - { - HandleControlPacket(player_id, event->packet); - } - else if (event->channelID == ENET_CHANNEL_GGPO) - { - Log_TracePrintf("Received %zu ggpo bytes from player %d", event->packet->dataLength, player_id); - const int rc = ggpo_handle_packet(s_ggpo, event->peer, event->packet); - if (rc != GGPO_OK) - Log_ErrorPrintf("Failed to process GGPO packet!"); - } - else - { - Log_ErrorPrintf("Unexpected packet channel %u", event->channelID); - } + Log_WarningPrintf("Received packet from unknown player"); + return; } - break; + + if (event->channelID == ENET_CHANNEL_CONTROL) + { + HandleControlMessage(player_id, event->packet); + } + else if (event->channelID == ENET_CHANNEL_GGPO) + { + Log_TracePrintf("Received %zu ggpo bytes from player %d", event->packet->dataLength, player_id); + const int rc = ggpo_handle_packet(s_ggpo, event->peer, event->packet); + if (rc != GGPO_OK) + Log_ErrorPrintf("Failed to process GGPO packet!"); + } + else + { + Log_ErrorPrintf("Unexpected packet channel %u", event->channelID); + } + } + break; default: { @@ -286,25 +522,48 @@ void Netplay::PollEnet(Common::Timer::Value until_time) } } -bool Netplay::ConnectToLowerPeers(gsl::span peer_addresses) +bool Netplay::IsHost() { - for (size_t i = 0; i < peer_addresses.size(); i++) - { - char ipstr[32]; - if (enet_address_get_host_ip(&peer_addresses[i], ipstr, std::size(ipstr)) != 0) - ipstr[0] = 0; - Log_DevPrintf("Starting connection to peer %u at %s:%u", i, ipstr, peer_addresses[i].port); + return s_player_id == s_host_player_id; +} - DebugAssert(i != s_player_id); - s_enet_peers[i] = enet_host_connect(s_enet_host, &peer_addresses[i], NUM_ENET_CHANNELS, 0); - if (!s_enet_peers[i]) - { - Log_ErrorPrintf("enet_host_connect() for peer %u failed", i); - return false; - } +GGPOPlayerHandle Netplay::PlayerIdToGGPOHandle(s32 player_id) +{ + DebugAssert(player_id >= 0 && player_id < MAX_PLAYERS); + return s_peers[player_id].ggpo_handle; +} + +ENetPeer* Netplay::GetPeerForPlayer(s32 player_id) +{ + DebugAssert(player_id >= 0 && player_id < MAX_PLAYERS); + return s_peers[player_id].peer; +} + +s32 Netplay::GetPlayerIdForPeer(const ENetPeer* peer) +{ + for (s32 i = 0; i < MAX_PLAYERS; i++) + { + if (s_peers[i].peer == peer) + return i; } - return true; + return -1; +} + +bool Netplay::IsValidPlayerId(s32 player_id) +{ + return s_player_id == player_id || (player_id >= 0 && player_id < MAX_PLAYERS && s_peers[player_id].peer); +} + +s32 Netplay::GetFreePlayerId() +{ + for (s32 i = 0; i < MAX_PLAYERS; i++) + { + if (i != s_player_id && !s_peers[i].peer) + return i; + } + + return -1; } bool Netplay::WaitForPeerConnections() @@ -312,15 +571,16 @@ bool Netplay::WaitForPeerConnections() static constexpr float MAX_CONNECT_TIME = 30.0f; Common::Timer timeout; - const u32 clients_to_connect = MAX_PLAYERS - 1; + const s32 clients_to_connect = s_num_players - 1; + Log_VerbosePrintf("Waiting for connection to %d peers", clients_to_connect); for (;;) { // TODO: Handle early shutdown/cancel request. - u32 num_connected_peers = 0; + s32 num_connected_peers = 0; for (s32 i = 0; i < MAX_PLAYERS; i++) { - if (i != s_player_id && s_enet_peers[i] && s_enet_peers[i]->state == ENET_PEER_STATE_CONNECTED) + if (i != s_player_id && s_peers[i].peer && s_peers[i].peer->state == ENET_PEER_STATE_CONNECTED) num_connected_peers++; } if (num_connected_peers == clients_to_connect) @@ -344,150 +604,415 @@ bool Netplay::WaitForPeerConnections() return true; } -void Netplay::HandleControlPacket(s32 player_id, const ENetPacket* pkt) +bool Netplay::CreateGGPOSession() { - // TODO - Log_ErrorPrintf("Unhandled control packet from player %d of size %zu", player_id, pkt->dataLength); -} - -s32 Netplay::Start(s32 lhandle, u16 lport, const std::string& raddr, u16 rport, s32 ldelay, u32 pred, std::string game_path) -{ - SetSettings(); - if (!InitializeEnet()) - return -1; - - ENetAddress host_address; - host_address.host = ENET_HOST_ANY; - host_address.port = lport; - s_enet_host = enet_host_create(&host_address, MAX_PLAYERS - 1, NUM_ENET_CHANNELS, 0, 0); - if (!s_enet_host) - { - Log_ErrorPrintf("Failed to create enet host."); - return -1; - } - - // Connect to any lower-ID'ed hosts. - // Eventually we'll assign these IDs as players connect, and everyone not starting it will get their ID sent back - s_player_id = lhandle - 1; - - std::array peer_addresses; - const u32 num_peer_addresses = s_player_id; - DebugAssert(num_peer_addresses == 0 || num_peer_addresses == 1); - if (num_peer_addresses == 1) - { - // TODO: rewrite this when we support more players - const u32 other_player_id = (lhandle == 1) ? 1 : 0; - if (enet_address_set_host_ip(&peer_addresses[other_player_id], raddr.c_str()) != 0) - { - Log_ErrorPrintf("Failed to parse host: '%s'", raddr.c_str()); - ShutdownEnetHost(); - return -1; - } - - peer_addresses[other_player_id].port = rport; - } - - // Create system. - if (!CreateSystem(std::move(game_path))) - { - Log_ErrorPrintf("Failed to create system."); - ShutdownEnetHost(); - return -1; - } - InitializeFramePacing(); - - // Connect to all peers. - if ((num_peer_addresses > 0 && - !ConnectToLowerPeers(gsl::span(peer_addresses).subspan(0, num_peer_addresses))) || - !WaitForPeerConnections()) - { - // System shutdown cleans up enet. - return -1; - } - /* TODO: since saving every frame during rollback loses us time to do actual gamestate iterations it might be better to hijack the update / save / load cycle to only save every confirmed frame only saving when actually needed. */ - GGPOSessionCallbacks cb{}; - + GGPOSessionCallbacks cb = {}; cb.advance_frame = NpAdvFrameCb; cb.save_game_state = NpSaveFrameCb; cb.load_game_state = NpLoadFrameCb; - cb.begin_game = NpBeginGameCb; cb.free_buffer = NpFreeBuffCb; cb.on_event = NpOnEventCb; - GGPOErrorCode result; - - result = ggpo_start_session(&s_ggpo, &cb, "Duckstation-Netplay", MAX_PLAYERS, sizeof(Netplay::Input), lport, MAX_ROLLBACK_FRAMES); - if (!GGPO_SUCCEEDED(result)) - { - Log_ErrorPrintf("ggpo_start_session() failed: %d", result); - return -1; - } - - ggpo_set_disconnect_timeout(s_ggpo, 2000); - ggpo_set_disconnect_notify_start(s_ggpo, 1000); + ggpo_start_session(&s_ggpo, &cb, s_num_players, sizeof(Netplay::Input), MAX_ROLLBACK_FRAMES); + int player_num = 1; for (s32 i = 0; i < MAX_PLAYERS; i++) { - if (!s_enet_peers[i] && i != s_player_id) + // slot filled? + if (!s_peers[i].peer && i != s_player_id) continue; - // This is *awful*. Need to merge the player IDs, enough of this indices-start-at-1 rubbish. - const GGPOPlayerHandle expected_handle = PlayerIdToGGPOHandle(i); - - GGPOPlayer player = { sizeof(GGPOPlayer) }; - GGPOPlayerHandle got_handle; - player.player_num = expected_handle; + GGPOPlayer player = {sizeof(GGPOPlayer)}; + GGPOErrorCode result; + player.player_num = player_num++; if (i == s_player_id) { player.type = GGPO_PLAYERTYPE_LOCAL; - result = ggpo_add_player(s_ggpo, &player, &got_handle); + result = ggpo_add_player(s_ggpo, &player, &s_peers[i].ggpo_handle); if (GGPO_SUCCEEDED(result)) - s_local_handle = got_handle; + s_local_handle = s_peers[i].ggpo_handle; } else { player.type = GGPO_PLAYERTYPE_REMOTE; - player.u.remote.peer = s_enet_peers[i]; - result = ggpo_add_player(s_ggpo, &player, &got_handle); + player.u.remote.peer = s_peers[i].peer; + result = ggpo_add_player(s_ggpo, &player, &s_peers[i].ggpo_handle); } if (!GGPO_SUCCEEDED(result)) { Log_ErrorPrintf("Failed to add player %d", i); - return -1; + return false; } - - Assert(expected_handle == got_handle); } - ggpo_set_frame_delay(s_ggpo, s_local_handle, ldelay); - - return result; + ggpo_set_frame_delay(s_ggpo, s_local_handle, s_local_delay); + InitializeFramePacing(); + return true; } -void Netplay::CloseSession() +void Netplay::DestroyGGPOSession() { - Assert(IsActive()); + if (!s_ggpo) + return; + Log_DevPrintf("Destroying GGPO session..."); ggpo_close_session(s_ggpo); s_ggpo = nullptr; s_save_buffer_pool.clear(); s_local_handle = GGPO_INVALID_HANDLE; - ShutdownEnetHost(); - - // Restore original settings. - Host::Internal::SetNetplaySettingsLayer(nullptr); - System::ApplySettings(false); + for (Peer& p : s_peers) + p.ggpo_handle = GGPO_INVALID_HANDLE; } -bool Netplay::IsActive() +void Netplay::HandleControlMessage(s32 player_id, const ENetPacket* pkt) { - return s_ggpo != nullptr; + if (pkt->dataLength < sizeof(ControlMessageHeader)) + { + Log_ErrorPrintf("Invalid control packet from player %d of size %zu", player_id, pkt->dataLength); + return; + } + + const ControlMessageHeader* hdr = reinterpret_cast(pkt->data); + switch (hdr->type) + { + case ControlMessage::ConnectResponse: + HandleConnectResponseMessage(player_id, pkt); + break; + + case ControlMessage::SynchronizeSession: + HandleSynchronizeSessionMessage(player_id, pkt); + break; + + case ControlMessage::SynchronizeComplete: + HandleSynchronizeCompleteMessage(player_id, pkt); + break; + + default: + Log_ErrorPrintf("Unhandled control packet %u from player %d of size %zu", hdr->type, player_id, pkt->dataLength); + break; + } +} + +void Netplay::HandlePeerConnectionAsHost(ENetPeer* peer, s32 claimed_player_id) +{ + Log_VerbosePrint( + fmt::format("New host peer connection from {} claiming player ID {}", PeerAddressString(peer), claimed_player_id) + .c_str()); + + PacketWrapper response = NewControlPacket(); + response->player_id = -1; + + // Player is attempting to reconnect. + // Hopefully both sides have disconnected completely before they reconnect. + // If not, too bad. + if (claimed_player_id >= 0 && IsValidPlayerId(claimed_player_id)) + { + Log_ErrorPrintf("Player ID %d is already in use, rejecting connection.", claimed_player_id); + response->result = ControlConnectResponseMessage::Result::PlayerIDInUse; + SendControlPacket(peer, response); + return; + } + + // Any free slots? + const s32 new_player_id = (claimed_player_id >= 0) ? claimed_player_id : GetFreePlayerId(); + if (new_player_id < 0) + { + Log_ErrorPrintf("Server full, rejecting connection."); + response->result = ControlConnectResponseMessage::Result::ServerFull; + SendControlPacket(peer, response); + return; + } + + Log_VerbosePrint( + fmt::format("New connection from {} assigned to player ID {}", PeerAddressString(peer), new_player_id).c_str()); + response->result = ControlConnectResponseMessage::Result::Success; + response->player_id = new_player_id; + SendControlPacket(peer, response); + + // Set up their player slot. + Assert(s_num_players < MAX_PLAYERS); + s_peers[new_player_id].peer = peer; + s_num_players++; + + // Force everyone to resynchronize with the new player. + Resynchronize(); +} + +void Netplay::HandlePeerConnectionAsNonHost(ENetPeer* peer, s32 claimed_player_id) +{ + Log_VerbosePrint( + fmt::format("New peer connection from {} claiming player ID {}", PeerAddressString(peer), claimed_player_id) + .c_str()); + + // shouldn't ever get a non-host connection without a valid ID + if (claimed_player_id < 0 || claimed_player_id >= MAX_PLAYERS) + { + Log_ErrorPrintf("Invalid claimed_player_id %d", claimed_player_id); + enet_peer_disconnect_now(peer, 0); + return; + } + + // the peer should match up, unless we're the one receiving the connection + Assert(s_peers[claimed_player_id].peer == peer || claimed_player_id > s_player_id); + if (s_peers[claimed_player_id].peer == peer) + { + // WaitForPeerConnections handles this case. + // If this was the host, we still need to wait for the synchronization. + Log_DevPrintf("Connection complete with player %d%s", claimed_player_id, + (claimed_player_id == s_host_player_id) ? "[HOST]" : ""); + return; + } + + Log_DevPrintf("Connection received from peer %d", claimed_player_id); + s_peers[claimed_player_id].peer = peer; +} + +void Netplay::HandleConnectResponseMessage(s32 player_id, const ENetPacket* pkt) +{ + // TODO: error handling + Assert(player_id == s_host_player_id && pkt->dataLength >= sizeof(ControlConnectResponseMessage)); + + const ControlConnectResponseMessage* msg = reinterpret_cast(pkt->data); + if (msg->result != ControlConnectResponseMessage::Result::Success) + { + Log_ErrorPrintf("Connection failed %u", msg->result); + Panic("Connection failed"); + return; + } + + // Still need to wait for synchronization + Log_InfoPrintf("Connected to host, we were assigned player ID %d", msg->player_id); + s_player_id = msg->player_id; +} + +void Netplay::HandlePeerDisconnectionAsHost(s32 player_id) +{ + Log_InfoPrintf("Player %d disconnected from host, reclaiming their slot", player_id); + + Assert(s_peers[player_id].peer); + enet_peer_disconnect_now(s_peers[player_id].peer, 0); + s_peers[player_id].peer = nullptr; + s_num_players--; + + // TODO: We *could* avoid the resync here and just tell all players to drop old mate + Resynchronize(); +} + +void Netplay::HandlePeerDisconnectionAsNonHost(s32 player_id) +{ + Panic("Disconnected non server peer, FIXME"); +} + +void Netplay::Resynchronize() +{ + Assert(IsHost()); + + Log_VerbosePrintf("Resynchronizing..."); + + // Use the current system state, whatever that is. + // TODO: This save state has the bloody path to the disc in it. We need a new save state format. + GrowableMemoryByteStream state(nullptr, System::MAX_SAVE_STATE_SIZE); + if (!System::SaveStateToStream(&state, 0, SAVE_STATE_HEADER::COMPRESSION_TYPE_ZSTD)) + Panic("Failed to save state..."); + + const u32 state_data_size = static_cast(state.GetPosition()); + ControlSynchronizeSessionMessage header = {}; + header.header.type = ControlMessage::SynchronizeSession; + header.header.size = sizeof(ControlSynchronizeSessionMessage) + state_data_size; + header.state_data_size = state_data_size; + + // Fill in player info. + header.num_players = s_num_players; + for (s32 i = 0; i < MAX_PLAYERS; i++) + { + if (!IsValidPlayerId(i)) + { + header.players[i].controller_port = -1; + continue; + } + + // TODO: This is wrong, so wrong.... + header.players[i].controller_port = static_cast(i); + + if (i == s_player_id) + { + // TODO: not valid... listening on any address. + header.players[i].host = s_enet_host->address.host; + header.players[i].port = s_enet_host->address.port; + } + else + { + header.players[i].host = s_peers[i].peer->address.host; + header.players[i].port = s_peers[i].peer->address.port; + } + } + + // Distribute sync request to all peers, then wait for them to reload back. + // Any GGPO packets will get dropped, since the session's gone temporarily. + DestroyGGPOSession(); + s_state = SessionState::Synchronizing; + s_synchronized_players = 1; + + for (s32 i = 0; i < MAX_PLAYERS; i++) + { + if (!s_peers[i].peer) + continue; + + ENetPacket* pkt = enet_packet_create(nullptr, sizeof(header) + state_data_size, ENET_PACKET_FLAG_RELIABLE); + std::memcpy(pkt->data, &header, sizeof(header)); + std::memcpy(pkt->data + sizeof(header), state.GetMemoryPointer(), state_data_size); + const int rc = enet_peer_send(s_peers[i].peer, ENET_CHANNEL_CONTROL, pkt); + if (rc != 0) + { + // TODO: probably just drop them.. but the others already know their address :/ + Log_ErrorPrintf("Failed to send synchronization request to player %d", i); + Panic("Failed to send sync packet"); + enet_packet_destroy(pkt); + } + } + + // Do a full state reload on the host as well, that way everything (including the GPU) + // has a clean slate, reducing the chance of desyncs. Looking at you, mister rec, somehow + // having a different number of cycles. + // CPU::CodeCache::Flush(); + state.SeekAbsolute(0); + if (!System::LoadStateFromStream(&state, true)) + Panic("Failed to reload host state"); + + // Might be done already if there's only one player. + CheckForCompleteResynchronize(); +} + +void Netplay::HandleSynchronizeSessionMessage(s32 player_id, const ENetPacket* pkt) +{ + static bool is_synchronizing = false; + if (is_synchronizing) + { + // TODO: this might happen if someone drops during sync... + Panic("Double sync"); + } + is_synchronizing = true; + + // TODO: This should check that the message only comes from the host/player 1. + Assert(s_host_player_id == player_id); + + const ControlSynchronizeSessionMessage* msg = reinterpret_cast(pkt->data); + if (pkt->dataLength < sizeof(ControlSynchronizeSessionMessage) || + pkt->dataLength < (sizeof(ControlSynchronizeSessionMessage) + msg->state_data_size)) + { + // TODO: Shut down the session + Panic("Invalid synchronize session packet"); + return; + } + + DestroyGGPOSession(); + + // Make sure we're connected to all players. + Assert(msg->num_players > 1); + Log_DevPrintf("Checking connections"); + s_num_players = msg->num_players; + s_state = SessionState::Synchronizing; + for (s32 i = 0; i < MAX_PLAYERS; i++) + { + Peer& p = s_peers[i]; + if (msg->players[i].controller_port < 0) + { + // If we had a client here, it must've dropped or something.. + if (p.peer) + { + Log_WarningPrintf("Dropping connection to player %d", i); + enet_peer_reset(p.peer); + p.peer = nullptr; + } + + continue; + } + + // Can't connect to ourselves! Or the host, which may not contain a correct address, since it's from itself. + if (i == s_player_id || i == s_host_player_id) + continue; + + // The host should fall into the category where we can reuse. + if (p.peer && p.peer->address.host == msg->players[i].host && p.peer->address.port == msg->players[i].port) + { + Log_DevPrintf("Preserving connection to player %d", i); + continue; + } + + Assert(i != s_host_player_id); + if (p.peer) + { + enet_peer_reset(p.peer); + p.peer = nullptr; + } + + // If this player has a higher ID than us, then they're responsible for connecting to us, not the other way around. + // i.e. player 2 is responsible for connecting to players 0 and 1, player 1 is responsible for connecting to player + // 0. + if (i > s_player_id) + { + Log_DevPrintf("Not connecting to player %d, as they have a higher ID than us (%d)", i); + continue; + } + + const ENetAddress address = {msg->players[i].host, msg->players[i].port}; + p.peer = enet_host_connect(s_enet_host, &address, NUM_ENET_CHANNELS, static_cast(s_player_id)); + if (!p.peer) + Panic("Failed to connect to peer on resynchronize"); + } + + if (!WaitForPeerConnections()) + { + // TODO: proper error handling here + Panic("Failed to reconnect to all peers"); + } + + if (!CreateGGPOSession()) + Panic("Failed to create GGPO session"); + + // Load state from packet. + ReadOnlyMemoryByteStream stream(pkt->data + sizeof(ControlSynchronizeSessionMessage), msg->state_data_size); + if (!System::LoadStateFromStream(&stream, true)) + Panic("Failed to load state from host"); + + // We're done here. + if (!SendControlPacket(player_id, NewControlPacket())) + Panic("Failed to send sync complete control packet"); + + s_state = SessionState::Running; + is_synchronizing = false; +} + +void Netplay::HandleSynchronizeCompleteMessage(s32 player_id, const ENetPacket* pkt) +{ + if (s_state != SessionState::Synchronizing || player_id == s_host_player_id) + { + Log_ErrorPrintf("Received unexpected synchronization complete from player %d", player_id); + return; + } + + // TODO: we should check that one player isn't sending multiple sync done packets... + Log_DevPrintf("Player %d completed synchronization", player_id); + s_synchronized_players++; + CheckForCompleteResynchronize(); +} + +void Netplay::CheckForCompleteResynchronize() +{ + if (s_synchronized_players == s_num_players) + { + Log_VerbosePrintf("All players synchronized, resuming!"); + if (!CreateGGPOSession()) + Panic("Failed to create GGPO session after sync"); + + s_state = SessionState::Running; + } } ////////////////////////////////////////////////////////////////////////// @@ -584,12 +1109,15 @@ void Netplay::Throttle() // Poll at 2ms throughout the sleep. // This way the network traffic comes through as soon as possible. const Common::Timer::Value sleep_period = Common::Timer::ConvertMillisecondsToValue(1); - for (;;) + while (s_state == SessionState::Running) { // TODO: make better, we can tell this function to stall until the next frame PollEnet(0); - ggpo_network_idle(s_ggpo); - PollEnet(0); + if (s_ggpo) + { + ggpo_network_idle(s_ggpo); + PollEnet(0); + } current_time = Common::Timer::GetCurrentValue(); if (current_time >= s_next_frame_time) @@ -612,7 +1140,7 @@ void Netplay::GenerateChecksumForFrame(int* checksum, int frame, unsigned char* // Log_VerbosePrintf("Netplay Checksum: f:%d wf:%d c:%u", frame, frame % num_group_of_pages, *checksum); } -void Netplay::GenerateDesyncReport(s32 desync_frame) +void Netplay::GenerateDesyncReport(s32 desync_frame) { std::string path = "\\netplaylogs\\desync_frame_" + std::to_string(desync_frame) + "_p" + std::to_string(s_local_handle) + "_" + System::GetRunningSerial() + "_.txt"; @@ -635,15 +1163,14 @@ void Netplay::GenerateDesyncReport(s32 desync_frame) stream->Discard(); return; } - /* stream->Write(s_save_buffer_pool.back().get()->state_stream.get()->GetMemoryPointer(), - s_save_buffer_pool.back().get()->state_stream.get()->GetMemorySize());*/ + /* stream->Write(s_save_buffer_pool.back().get()->state_stream.get()->GetMemoryPointer(), + s_save_buffer_pool.back().get()->state_stream.get()->GetMemorySize());*/ stream->Commit(); Log_VerbosePrintf("desync log created for frame %d", desync_frame); } - void Netplay::AdvanceFrame() { ggpo_advance_frame(s_ggpo, 0); @@ -654,8 +1181,14 @@ void Netplay::RunFrame() // housekeeping // TODO: get rid of double polling PollEnet(0); + if (!s_ggpo) + return; + ggpo_network_idle(s_ggpo); PollEnet(0); + if (!s_ggpo) + return; + ggpo_idle(s_ggpo); // run game @@ -693,11 +1226,6 @@ void Netplay::CollectInput(u32 slot, u32 bind, float value) s_net_input[slot][bind] = value; } -GGPOPlayerHandle Netplay::PlayerIdToGGPOHandle(s32 player_id) -{ - return player_id + 1; -} - Netplay::Input Netplay::ReadLocalInput() { // get controller data of the first controller (0 internally) @@ -710,9 +1238,7 @@ Netplay::Input Netplay::ReadLocalInput() return inp; } -void Netplay::SendMsg(const char* msg) -{ -} +void Netplay::SendMsg(const char* msg) {} GGPOErrorCode Netplay::SyncInput(Netplay::Input inputs[2], int* disconnect_flags) { @@ -755,11 +1281,11 @@ void Netplay::StartNetplaySession(s32 local_handle, u16 local_port, std::string& return; // create session - int result = Netplay::Start(local_handle, local_port, remote_addr, remote_port, input_delay, MAX_ROLLBACK_FRAMES, std::move(game_path)); - // notify that the session failed - if (result != GGPO_OK) + if (!Netplay::Start(local_handle, local_port, remote_addr, remote_port, input_delay, MAX_ROLLBACK_FRAMES, + std::move(game_path))) { - Log_ErrorPrintf("Failed to Create Netplay Session! Error: %d", result); + // this'll call back to us to shut everything netplay-related down + Log_ErrorPrint("Failed to Create Netplay Session!"); System::ShutdownSystem(false); } else @@ -788,34 +1314,57 @@ void Netplay::NetplayAdvanceFrame(Netplay::Input inputs[], int disconnect_flags) void Netplay::ExecuteNetplay() { - while (System::IsRunning()) + while (s_state != SessionState::Inactive) { - Netplay::RunFrame(); + switch (s_state) + { + case SessionState::Connecting: + { + // still waiting for connection to host.. + // TODO: add a timeout here. + PollEnet(Common::Timer::GetCurrentValue() + Common::Timer::ConvertMillisecondsToValue(16)); + Host::DisplayLoadingScreen("Connecting to host..."); + Host::PumpMessagesOnCPUThread(); + continue; + } - // this can shut us down - Host::PumpMessagesOnCPUThread(); - if (!System::IsValid()) - break; + case SessionState::Synchronizing: + { + // only happens on host, when waiting for clients to report back + PollEnet(Common::Timer::GetCurrentValue() + Common::Timer::ConvertMillisecondsToValue(16)); + Host::DisplayLoadingScreen("Netplay synchronizing", 0, s_synchronized_players, s_num_players); + Host::PumpMessagesOnCPUThread(); + continue; + } - System::PresentFrame(); - System::UpdatePerformanceCounters(); + case SessionState::Running: + { + Netplay::RunFrame(); - Throttle(); + // this can shut us down + Host::PumpMessagesOnCPUThread(); + if (!System::IsValid()) + break; + + System::PresentFrame(); + System::UpdatePerformanceCounters(); + + Throttle(); + continue; + } + + default: + case SessionState::Initializing: + case SessionState::Inactive: + { + // shouldn't be here + Panic("Invalid state"); + continue; + } + } } } -bool Netplay::NpBeginGameCb(void* ctx, const char* game_name) -{ - SPU::SetAudioOutputMuted(true); - // Fast Forward to Game Start if needed. - while (System::GetInternalFrameNumber() < 2) - System::RunFrame(); - SPU::SetAudioOutputMuted(false); - // Set Initial Frame Pacing - InitializeFramePacing(); - return true; -} - bool Netplay::NpAdvFrameCb(void* ctx, int flags) { Netplay::Input inputs[2] = {}; @@ -846,10 +1395,21 @@ bool Netplay::NpSaveFrameCb(void* ctx, unsigned char** buffer, int* len, int* ch } // desync detection - const u32 state_size = our_buffer.get()->state_stream.get()->GetMemorySize(); + const u32 state_size = static_cast(our_buffer.get()->state_stream.get()->GetPosition()); unsigned char* state = reinterpret_cast(our_buffer.get()->state_stream.get()->GetMemoryPointer()); GenerateChecksumForFrame(checksum, frame, state, state_size); +#if 0 + if (frame > 100 && frame < 150 && s_num_players > 1) + { + std::string filename = + Path::Combine(EmuFolders::Dumps, fmt::format("f{}_c{}_p{}.bin", frame, (u32)*checksum, s_local_handle)); + FileSystem::WriteBinaryFile(filename.c_str(), state, state_size); + } + + Log_VerbosePrintf("Saved real frame %u as GGPO frame %d CS %u", System::GetFrameNumber(), frame, *checksum); +#endif + *len = sizeof(System::MemorySaveState); *buffer = reinterpret_cast(our_buffer.release()); @@ -861,7 +1421,16 @@ bool Netplay::NpLoadFrameCb(void* ctx, unsigned char* buffer, int len, int rb_fr // Disable Audio For upcoming rollback SPU::SetAudioOutputMuted(true); - return System::LoadMemoryState(*reinterpret_cast(buffer)); + const u32 prev_frame = System::GetFrameNumber(); + if (!System::LoadMemoryState(*reinterpret_cast(buffer))) + return false; + +#if 0 + Log_VerbosePrintf("Loaded real frame %u from GGPO frame %d [prev %u]", System::GetFrameNumber(), frame_to_load, + prev_frame); +#endif + + return true; } void Netplay::NpFreeBuffCb(void* ctx, void* buffer, int frame) @@ -879,7 +1448,8 @@ bool Netplay::NpOnEventCb(void* ctx, GGPOEvent* ev) Host::OnNetplayMessage(fmt::format("Netplay Connected To Player: {}", ev->u.connected.player)); break; case GGPOEventCode::GGPO_EVENTCODE_SYNCHRONIZING_WITH_PEER: - Host::OnNetplayMessage(fmt::format("Netplay Synchronzing: {}/{}", ev->u.synchronizing.count, ev->u.synchronizing.total)); + Host::OnNetplayMessage( + fmt::format("Netplay Synchronzing: {}/{}", ev->u.synchronizing.count, ev->u.synchronizing.total)); break; case GGPOEventCode::GGPO_EVENTCODE_SYNCHRONIZED_WITH_PEER: Host::OnNetplayMessage(fmt::format("Netplay Synchronized With Player: {}", ev->u.synchronized.player)); @@ -891,9 +1461,10 @@ bool Netplay::NpOnEventCb(void* ctx, GGPOEvent* ev) HandleTimeSyncEvent(ev->u.timesync.frames_ahead, ev->u.timesync.timeSyncPeriodInFrames); break; case GGPOEventCode::GGPO_EVENTCODE_DESYNC: - Host::OnNetplayMessage(fmt::format("Desync Detected: Current Frame: {}, Desync Frame: {}, Diff: {}, L:{}, R:{}", CurrentFrame(), - ev->u.desync.nFrameOfDesync, CurrentFrame() - ev->u.desync.nFrameOfDesync, ev->u.desync.ourCheckSum, - ev->u.desync.remoteChecksum)); + Host::OnNetplayMessage(fmt::format("Desync Detected: Current Frame: {}, Desync Frame: {}, Diff: {}, L:{}, R:{}", + CurrentFrame(), ev->u.desync.nFrameOfDesync, + CurrentFrame() - ev->u.desync.nFrameOfDesync, ev->u.desync.ourCheckSum, + ev->u.desync.remoteChecksum)); GenerateDesyncReport(ev->u.desync.nFrameOfDesync); break; default: diff --git a/src/core/system.cpp b/src/core/system.cpp index 3704ba65f..3b1dd2e7c 100644 --- a/src/core/system.cpp +++ b/src/core/system.cpp @@ -78,8 +78,6 @@ SystemBootParameters::~SystemBootParameters() = default; namespace System { static std::optional InternalGetExtendedSaveStateInfo(ByteStream* stream); -static bool InternalSaveState(ByteStream* state, u32 screenshot_size = 256, - u32 compression_method = SAVE_STATE_HEADER::COMPRESSION_TYPE_NONE); static bool LoadEXE(const char* filename); @@ -95,7 +93,6 @@ static void InternalReset(); static void ClearRunningGame(); static void DestroySystem(); static std::string GetMediaPathFromSaveState(const char* path); -static bool DoLoadState(ByteStream* stream, bool force_software_renderer, bool update_display); static bool DoState(StateWrapper& sw, GPUTexture** host_texture, bool update_display, bool is_memory_state); static void WrappedRunFrame(); static void RunFramesToNow(); @@ -1019,7 +1016,7 @@ bool System::LoadState(const char* filename) SaveUndoLoadState(); - if (!DoLoadState(stream.get(), false, true)) + if (!LoadStateFromStream(stream.get(), true)) { Host::ReportFormattedErrorAsync( "Load State Error", Host::TranslateString("OSDMessage", "Loading state from '%s' failed. Resetting."), filename); @@ -1057,7 +1054,7 @@ bool System::SaveState(const char* filename, bool backup_existing_save) Log_InfoPrintf("Saving state to '%s'...", filename); const u32 screenshot_size = 256; - const bool result = InternalSaveState(stream.get(), screenshot_size, + const bool result = SaveStateToStream(stream.get(), screenshot_size, g_settings.compress_save_states ? SAVE_STATE_HEADER::COMPRESSION_TYPE_ZSTD : SAVE_STATE_HEADER::COMPRESSION_TYPE_NONE); if (!result) @@ -1312,7 +1309,7 @@ bool System::BootSystem(SystemBootParameters parameters) return false; } - if (!DoLoadState(stream.get(), false, true)) + if (!LoadStateFromStream(stream.get(), true)) { DestroySystem(); return false; @@ -1573,7 +1570,7 @@ void System::RecreateSystem() const bool was_paused = System::IsPaused(); std::unique_ptr stream = ByteStream::CreateGrowableMemoryStream(nullptr, 8 * 1024); - if (!System::InternalSaveState(stream.get(), 0, SAVE_STATE_HEADER::COMPRESSION_TYPE_NONE) || !stream->SeekAbsolute(0)) + if (!System::SaveStateToStream(stream.get(), 0, SAVE_STATE_HEADER::COMPRESSION_TYPE_NONE) || !stream->SeekAbsolute(0)) { Host::ReportErrorAsync("Error", "Failed to save state before system recreation. Shutting down."); DestroySystem(); @@ -1589,7 +1586,7 @@ void System::RecreateSystem() return; } - if (!DoLoadState(stream.get(), false, false)) + if (!LoadStateFromStream(stream.get(), false)) { DestroySystem(); return; @@ -1890,7 +1887,7 @@ std::string System::GetMediaPathFromSaveState(const char* path) return ret; } -bool System::DoLoadState(ByteStream* state, bool force_software_renderer, bool update_display) +bool System::LoadStateFromStream(ByteStream* state, bool update_display) { Assert(IsValid()); @@ -2043,7 +2040,7 @@ bool System::DoLoadState(ByteStream* state, bool force_software_renderer, bool u return true; } -bool System::InternalSaveState(ByteStream* state, u32 screenshot_size /* = 256 */, +bool System::SaveStateToStream(ByteStream* state, u32 screenshot_size /* = 256 */, u32 compression_method /* = SAVE_STATE_HEADER::COMPRESSION_TYPE_NONE*/) { if (IsShutdown()) @@ -3801,7 +3798,7 @@ bool System::UndoLoadState() Assert(IsValid()); m_undo_load_state->SeekAbsolute(0); - if (!DoLoadState(m_undo_load_state.get(), false, true)) + if (!LoadStateFromStream(m_undo_load_state.get(), true)) { Host::ReportErrorAsync("Error", "Failed to load undo state, resetting system."); m_undo_load_state.reset(); @@ -3820,7 +3817,7 @@ bool System::SaveUndoLoadState() m_undo_load_state.reset(); m_undo_load_state = ByteStream::CreateGrowableMemoryStream(nullptr, System::MAX_SAVE_STATE_SIZE); - if (!InternalSaveState(m_undo_load_state.get())) + if (!SaveStateToStream(m_undo_load_state.get())) { Host::AddOSDMessage(Host::TranslateStdString("OSDMessage", "Failed to save undo load state."), 15.0f); m_undo_load_state.reset(); diff --git a/src/core/system.h b/src/core/system.h index d0c27602d..6233b1dda 100644 --- a/src/core/system.h +++ b/src/core/system.h @@ -235,6 +235,8 @@ struct MemorySaveState }; bool SaveMemoryState(MemorySaveState* mss); bool LoadMemoryState(const MemorySaveState& mss); +bool LoadStateFromStream(ByteStream* stream, bool update_display); +bool SaveStateToStream(ByteStream* state, u32 screenshot_size = 256, u32 compression_method = 0); /// Runs the VM until the CPU execution is canceled. void Execute();