NetPlay: use a workaround from comex’s dc-netplay to interrupt enet

This commit is contained in:
mathieui 2015-03-09 17:31:13 +01:00
parent 27619e613f
commit 603fe25349
4 changed files with 163 additions and 109 deletions

View File

@ -416,7 +416,6 @@ unsigned int NetPlayClient::OnData(sf::Packet& packet)
spac << (MessageId)NP_MSG_PONG; spac << (MessageId)NP_MSG_PONG;
spac << ping_key; spac << ping_key;
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
Send(spac); Send(spac);
} }
break; break;
@ -480,6 +479,33 @@ void NetPlayClient::Disconnect()
m_server = nullptr; m_server = nullptr;
} }
void NetPlayClient::RunOnThread(std::function<void()> func)
{
{
std::lock_guard<std::recursive_mutex> lkq(m_crit.run_queue_write);
m_run_queue.Push(func);
}
WakeupThread(m_client);
}
void NetPlayClient::WakeupThread(ENetHost* host)
{
// Send ourselves a spurious message. This is hackier than it should be.
// comex reported this as https://github.com/lsalzman/enet/issues/23, so
// hopefully there will be a better way to do it in the future.
ENetAddress address;
if (host->address.port != 0)
address.port = host->address.port;
else
enet_socket_get_address(host->socket, &address);
address.host = 0x0100007f; // localhost
u8 byte = 0;
ENetBuffer buf;
buf.data = &byte;
buf.dataLength = 1;
enet_socket_send(host->socket, &address, &buf, 1);
}
// called from ---NETPLAY--- thread // called from ---NETPLAY--- thread
void NetPlayClient::ThreadFunc() void NetPlayClient::ThreadFunc()
{ {
@ -487,11 +513,13 @@ void NetPlayClient::ThreadFunc()
{ {
ENetEvent netEvent; ENetEvent netEvent;
int net; int net;
if (m_traversal_client)
m_traversal_client->HandleResends();
net = enet_host_service(m_client, &netEvent, 4);
while (!m_run_queue.Empty())
{ {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); m_run_queue.Front()();
if (m_traversal_client) m_run_queue.Pop();
m_traversal_client->HandleResends();
net = enet_host_service(m_client, &netEvent, 4);
} }
if (net > 0) if (net > 0)
{ {
@ -517,7 +545,6 @@ void NetPlayClient::ThreadFunc()
break; break;
} }
} }
} }
Disconnect(); Disconnect();
@ -577,57 +604,57 @@ void NetPlayClient::GetPlayers(std::vector<const Player *> &player_list)
// called from ---GUI--- thread // called from ---GUI--- thread
void NetPlayClient::SendChatMessage(const std::string& msg) void NetPlayClient::SendChatMessage(const std::string& msg)
{ {
sf::Packet spac; RunOnThread([msg, this]() {
spac << (MessageId)NP_MSG_CHAT_MESSAGE; sf::Packet spac;
spac << msg; spac << (MessageId)NP_MSG_CHAT_MESSAGE;
spac << msg;
std::lock_guard<std::recursive_mutex> lks(m_crit.send); Send(spac);
Send(spac); });
} }
// called from ---CPU--- thread // called from ---CPU--- thread
void NetPlayClient::SendPadState(const PadMapping in_game_pad, const GCPadStatus& pad) void NetPlayClient::SendPadState(const PadMapping in_game_pad, const GCPadStatus& pad)
{ {
// send to server
sf::Packet spac; sf::Packet spac;
spac << (MessageId)NP_MSG_PAD_DATA; spac << (MessageId)NP_MSG_PAD_DATA;
spac << in_game_pad; spac << in_game_pad;
spac << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight; spac << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight;
std::lock_guard<std::recursive_mutex> lks(m_crit.send); RunOnThread([spac, this]() mutable {
Send(spac); // send to server
Send(spac);
});
} }
// called from ---CPU--- thread // called from ---CPU--- thread
void NetPlayClient::SendWiimoteState(const PadMapping in_game_pad, const NetWiimote& nw) void NetPlayClient::SendWiimoteState(const PadMapping in_game_pad, const NetWiimote& nw)
{ {
// send to server RunOnThread([=]() {
sf::Packet spac; // send to server
spac << (MessageId)NP_MSG_WIIMOTE_DATA; sf::Packet spac;
spac << in_game_pad; spac << (MessageId)NP_MSG_WIIMOTE_DATA;
spac << (u8)nw.size(); spac << in_game_pad;
for (auto it : nw) spac << (u8)nw.size();
{ for (auto it : nw)
spac << it; {
} spac << it;
}
std::lock_guard<std::recursive_mutex> lks(m_crit.send); Send(spac);
Send(spac); });
} }
// called from ---GUI--- thread // called from ---GUI--- thread
bool NetPlayClient::StartGame(const std::string &path) bool NetPlayClient::StartGame(const std::string &path)
{ {
std::lock_guard<std::recursive_mutex> lkg(m_crit.game); RunOnThread([this](){
std::lock_guard<std::recursive_mutex> lkg(m_crit.game);
// tell server i started the game // tell server i started the game
sf::Packet spac; sf::Packet spac;
spac << (MessageId)NP_MSG_START_GAME; spac << (MessageId)NP_MSG_START_GAME;
spac << m_current_game; spac << m_current_game;
spac << (char *)&g_NetPlaySettings; spac << (char *)&g_NetPlaySettings;
Send(spac);
std::lock_guard<std::recursive_mutex> lks(m_crit.send); });
Send(spac);
if (m_is_running) if (m_is_running)
{ {
@ -954,6 +981,7 @@ bool NetPlayClient::StopGame()
return true; return true;
} }
// called from ---GUI--- thread
void NetPlayClient::Stop() void NetPlayClient::Stop()
{ {
if (m_is_running == false) if (m_is_running == false)
@ -976,9 +1004,11 @@ void NetPlayClient::Stop()
// tell the server to stop if we have a pad mapped in game. // tell the server to stop if we have a pad mapped in game.
if (isPadMapped) if (isPadMapped)
{ {
sf::Packet spac; RunOnThread([this](){
spac << (MessageId)NP_MSG_STOP_GAME; sf::Packet spac;
Send(spac); spac << (MessageId)NP_MSG_STOP_GAME;
Send(spac);
});
} }
} }

View File

@ -47,6 +47,8 @@ class NetPlayClient : public TraversalClientClient
{ {
public: public:
void ThreadFunc(); void ThreadFunc();
void RunOnThread(std::function<void()> func);
void WakeupThread(ENetHost* host);
NetPlayClient(const std::string& address, const u16 port, NetPlayUI* dialog, const std::string& name, bool traversal, std::string centralServer, u16 centralPort); NetPlayClient(const std::string& address, const u16 port, NetPlayUI* dialog, const std::string& name, bool traversal, std::string centralServer, u16 centralPort);
~NetPlayClient(); ~NetPlayClient();
@ -92,9 +94,12 @@ protected:
{ {
std::recursive_mutex game; std::recursive_mutex game;
// lock order // lock order
std::recursive_mutex players, send; std::recursive_mutex players;
std::recursive_mutex run_queue_write;
} m_crit; } m_crit;
Common::FifoQueue<std::function<void()>, false> m_run_queue;
Common::FifoQueue<GCPadStatus> m_pad_buffer[4]; Common::FifoQueue<GCPadStatus> m_pad_buffer[4];
Common::FifoQueue<NetWiimote> m_wiimote_buffer[4]; Common::FifoQueue<NetWiimote> m_wiimote_buffer[4];

View File

@ -92,7 +92,6 @@ NetPlayServer::NetPlayServer(const u16 port, bool traversal, std::string central
serverAddr.port = port; serverAddr.port = port;
m_server = enet_host_create(&serverAddr, 10, 3, 0, 0); m_server = enet_host_create(&serverAddr, 10, 3, 0, 0);
} }
if (m_server != nullptr) if (m_server != nullptr)
{ {
is_connected = true; is_connected = true;
@ -117,7 +116,6 @@ void NetPlayServer::ThreadFunc()
spac << (MessageId)NP_MSG_PING; spac << (MessageId)NP_MSG_PING;
spac << m_ping_key; spac << m_ping_key;
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
m_ping_timer.Start(); m_ping_timer.Start();
SendToClients(spac); SendToClients(spac);
m_update_pings = false; m_update_pings = false;
@ -125,11 +123,13 @@ void NetPlayServer::ThreadFunc()
ENetEvent netEvent; ENetEvent netEvent;
int net; int net;
if (m_traversal_client)
m_traversal_client->HandleResends();
net = enet_host_service(m_server, &netEvent, 1000);
while (!m_run_queue.Empty())
{ {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); m_run_queue.Front()();
if (m_traversal_client) m_run_queue.Pop();
m_traversal_client->HandleResends();
net = enet_host_service(m_server, &netEvent, 4);
} }
if (net > 0) if (net > 0)
{ {
@ -149,7 +149,6 @@ void NetPlayServer::ThreadFunc()
sf::Packet spac; sf::Packet spac;
spac << (MessageId)error; spac << (MessageId)error;
// don't need to lock, this client isn't in the client map // don't need to lock, this client isn't in the client map
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
Send(accept_peer, spac); Send(accept_peer, spac);
if (netEvent.peer->data) if (netEvent.peer->data)
{ {
@ -272,52 +271,46 @@ unsigned int NetPlayServer::OnConnect(ENetPeer* socket)
} }
} }
// send join message to already connected clients
sf::Packet spac;
spac << (MessageId)NP_MSG_PLAYER_JOIN;
spac << player.pid << player.name << player.revision;
SendToClients(spac);
// send new client success message with their id
spac.clear();
spac << (MessageId)0;
spac << player.pid;
Send(player.socket, spac);
// send new client the selected game
if (m_selected_game != "")
{ {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); spac.clear();
spac << (MessageId)NP_MSG_CHANGE_GAME;
spac << m_selected_game;
Send(player.socket, spac);
}
// send join message to already connected clients // send the pad buffer value
sf::Packet spac; spac.clear();
spac << (MessageId)NP_MSG_PAD_BUFFER;
spac << (u32)m_target_buffer_size;
Send(player.socket, spac);
// sync values with new client
for (const auto& p : m_players)
{
spac.clear();
spac << (MessageId)NP_MSG_PLAYER_JOIN; spac << (MessageId)NP_MSG_PLAYER_JOIN;
spac << player.pid << player.name << player.revision; spac << p.second.pid << p.second.name << p.second.revision;
SendToClients(spac);
// send new client success message with their id
spac.clear();
spac << (MessageId)0;
spac << player.pid;
Send(player.socket, spac); Send(player.socket, spac);
}
// send new client the selected game
if (m_selected_game != "")
{
spac.clear();
spac << (MessageId)NP_MSG_CHANGE_GAME;
spac << m_selected_game;
Send(player.socket, spac);
}
// send the pad buffer value
spac.clear();
spac << (MessageId)NP_MSG_PAD_BUFFER;
spac << (u32)m_target_buffer_size;
Send(player.socket, spac);
// sync values with new client
for (const auto& p : m_players)
{
spac.clear();
spac << (MessageId)NP_MSG_PLAYER_JOIN;
spac << p.second.pid << p.second.name << p.second.revision;
Send(player.socket, spac);
}
} // unlock send
// add client to the player list // add client to the player list
{ {
std::lock_guard<std::recursive_mutex> lkp(m_crit.players); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
m_players.insert(std::pair<PlayerId, Client>(*(PlayerId *)player.socket->data, player)); m_players.insert(std::pair<PlayerId, Client>(*(PlayerId *)player.socket->data, player));
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
UpdatePadMapping(); // sync pad mappings with everyone UpdatePadMapping(); // sync pad mappings with everyone
UpdateWiimoteMapping(); UpdateWiimoteMapping();
} }
@ -343,7 +336,6 @@ unsigned int NetPlayServer::OnDisconnect(Client& player)
sf::Packet spac; sf::Packet spac;
spac << (MessageId)NP_MSG_DISABLE_GAME; spac << (MessageId)NP_MSG_DISABLE_GAME;
// this thread doesn't need players lock // this thread doesn't need players lock
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac, 1); SendToClients(spac, 1);
break; break;
} }
@ -362,7 +354,6 @@ unsigned int NetPlayServer::OnDisconnect(Client& player)
m_players.erase(it); m_players.erase(it);
// alert other players of disconnect // alert other players of disconnect
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac); SendToClients(spac);
for (PadMapping& mapping : m_pad_map) for (PadMapping& mapping : m_pad_map)
@ -451,9 +442,37 @@ void NetPlayServer::AdjustPadBufferSize(unsigned int size)
spac << (MessageId)NP_MSG_PAD_BUFFER; spac << (MessageId)NP_MSG_PAD_BUFFER;
spac << (u32)m_target_buffer_size; spac << (u32)m_target_buffer_size;
std::lock_guard<std::recursive_mutex> lkp(m_crit.players); RunOnThread([spac, this]() mutable {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac); SendToClients(spac);
});
}
void NetPlayServer::RunOnThread(std::function<void()> func)
{
{
std::lock_guard<std::recursive_mutex> lkq(m_crit.run_queue_write);
m_run_queue.Push(func);
}
WakeupThread(m_server);
}
void NetPlayServer::WakeupThread(ENetHost* host)
{
// Send ourselves a spurious message. This is hackier than it should be.
// comex reported this as https://github.com/lsalzman/enet/issues/23, so
// hopefully there will be a better way to do it in the future.
ENetAddress address;
if (host->address.port != 0)
address.port = host->address.port;
else
enet_socket_get_address(host->socket, &address);
address.host = 0x0100007f; // localhost
u8 byte = 0;
ENetBuffer buf;
buf.data = &byte;
buf.dataLength = 1;
enet_socket_send(host->socket, &address, &buf, 1);
} }
// called from ---NETPLAY--- thread // called from ---NETPLAY--- thread
@ -478,10 +497,7 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player)
spac << player.pid; spac << player.pid;
spac << msg; spac << msg;
{ SendToClients(spac, player.pid);
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac, player.pid);
}
} }
break; break;
@ -505,7 +521,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player)
spac << (MessageId)NP_MSG_PAD_DATA; spac << (MessageId)NP_MSG_PAD_DATA;
spac << map << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight; spac << map << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight;
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac, player.pid); SendToClients(spac, player.pid);
} }
break; break;
@ -538,7 +553,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player)
for (const u8& byte : data) for (const u8& byte : data)
spac << byte; spac << byte;
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac, player.pid); SendToClients(spac, player.pid);
} }
break; break;
@ -559,7 +573,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player)
spac << player.pid; spac << player.pid;
spac << player.ping; spac << player.ping;
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac); SendToClients(spac);
} }
break; break;
@ -577,7 +590,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player)
spac << (MessageId)NP_MSG_STOP_GAME; spac << (MessageId)NP_MSG_STOP_GAME;
std::lock_guard<std::recursive_mutex> lkp(m_crit.players); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
std::lock_guard<std::recursive_mutex> lks(m_crit.send);
SendToClients(spac); SendToClients(spac);
m_is_running = false; m_is_running = false;
@ -601,7 +613,7 @@ void NetPlayServer::OnTraversalStateChanged()
m_dialog->Update(); m_dialog->Update();
} }
// called from ---GUI--- thread / and ---NETPLAY--- thread // called from ---GUI--- thread
void NetPlayServer::SendChatMessage(const std::string& msg) void NetPlayServer::SendChatMessage(const std::string& msg)
{ {
sf::Packet spac; sf::Packet spac;
@ -609,9 +621,10 @@ void NetPlayServer::SendChatMessage(const std::string& msg)
spac << (PlayerId)0; // server id always 0 spac << (PlayerId)0; // server id always 0
spac << msg; spac << msg;
std::lock_guard<std::recursive_mutex> lkp(m_crit.players); RunOnThread([spac, this]() mutable {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac); SendToClients(spac);
});
} }
// called from ---GUI--- thread // called from ---GUI--- thread
@ -626,9 +639,10 @@ bool NetPlayServer::ChangeGame(const std::string &game)
spac << (MessageId)NP_MSG_CHANGE_GAME; spac << (MessageId)NP_MSG_CHANGE_GAME;
spac << game; spac << game;
std::lock_guard<std::recursive_mutex> lkp(m_crit.players); RunOnThread([spac, this]() mutable {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac); SendToClients(spac);
});
return true; return true;
} }
@ -666,9 +680,10 @@ bool NetPlayServer::StartGame()
spac << (u32)g_netplay_initial_gctime; spac << (u32)g_netplay_initial_gctime;
spac << (u32)g_netplay_initial_gctime << 32; spac << (u32)g_netplay_initial_gctime << 32;
std::lock_guard<std::recursive_mutex> lkp(m_crit.players); RunOnThread([spac, this]() mutable {
std::lock_guard<std::recursive_mutex> lks(m_crit.send); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac); SendToClients(spac);
});
m_is_running = true; m_is_running = true;

View File

@ -20,6 +20,8 @@ class NetPlayServer : public TraversalClientClient
{ {
public: public:
void ThreadFunc(); void ThreadFunc();
void RunOnThread(std::function<void()> func);
void WakeupThread(ENetHost* host);
NetPlayServer(const u16 port, bool traversal, std::string centralServer, u16 centralPort); NetPlayServer(const u16 port, bool traversal, std::string centralServer, u16 centralPort);
~NetPlayServer(); ~NetPlayServer();
@ -101,11 +103,13 @@ private:
{ {
std::recursive_mutex game; std::recursive_mutex game;
// lock order // lock order
std::recursive_mutex players, send; std::recursive_mutex players;
std::recursive_mutex run_queue_write;
} m_crit; } m_crit;
std::string m_selected_game; std::string m_selected_game;
std::thread m_thread; std::thread m_thread;
Common::FifoQueue<std::function<void()>, false> m_run_queue;
ENetHost* m_server; ENetHost* m_server;
TraversalClient* m_traversal_client; TraversalClient* m_traversal_client;