diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp index fd2b1295db..27a66d4666 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp @@ -1,6 +1,5 @@ #include "stdafx.h" #include "lv2_socket.h" -#include "network_context.h" LOG_CHANNEL(sys_net); @@ -68,17 +67,6 @@ void lv2_socket::poll_queue(std::shared_ptr ppu, bs_tget(); - const u32 prev_value = nc.num_polls.fetch_add(1); - if (!prev_value) - { - nc.num_polls.notify_one(); - } - } } s32 lv2_socket::clear_queue(ppu_thread* ppu) @@ -93,14 +81,6 @@ s32 lv2_socket::clear_queue(ppu_thread* ppu) { it = queue.erase(it); cleared++; - - // Makes sure network_context thread can go back to sleep if there is no active polling - if (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM) - { - const u32 prev_value = g_fxo->get().num_polls.fetch_sub(1); - ensure(prev_value); - } - continue; } diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp index c76cc198ca..d061996abe 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp @@ -121,14 +121,16 @@ s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); + if (!nc.list_p2p_ports.contains(p2p_port)) + { + nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); + } - nc.create_p2p_port(p2p_port); auto& pport = ::at32(nc.list_p2p_ports, p2p_port); real_socket = pport.p2p_socket; - { std::lock_guard lock(pport.bound_p2p_vports_mutex); @@ -330,7 +332,7 @@ void lv2_socket_p2p::close() return; } - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); ensure(nc.list_p2p_ports.contains(port)); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp index 3aaca99c4e..f1b341f494 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp @@ -445,7 +445,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb const u64 key_connected = (reinterpret_cast(op_addr)->sin_addr.s_addr) | (static_cast(tcp_header->src_port) << 48) | (static_cast(tcp_header->dst_port) << 32); { - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); auto& pport = ::at32(nc.list_p2p_ports, port); pport.bound_p2p_streams.emplace(key_connected, new_sock_id); } @@ -593,14 +593,16 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); + if (!nc.list_p2p_ports.contains(p2p_port)) + { + nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); + } - nc.create_p2p_port(p2p_port); auto& pport = ::at32(nc.list_p2p_ports, p2p_port); real_socket = pport.p2p_socket; - { // Ensures the socket & the bound list are updated at the same time to avoid races std::lock_guard vport_lock(pport.bound_p2p_vports_mutex); @@ -681,14 +683,14 @@ std::optional lv2_socket_p2ps::connect(const sys_net_sockaddr& addr) socket_type real_socket{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); + if (!nc.list_p2p_ports.contains(port)) + nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(port), std::forward_as_tuple(port)); - nc.create_p2p_port(port); auto& pport = ::at32(nc.list_p2p_ports, port); real_socket = pport.p2p_socket; - { std::lock_guard lock(pport.bound_p2p_vports_mutex); if (vport == 0) @@ -855,7 +857,7 @@ void lv2_socket_p2ps::close() return; } - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard lock(nc.list_p2p_ports_mutex); ensure(nc.list_p2p_ports.contains(port)); diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp index ab120a1beb..6c284a14e6 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp @@ -12,13 +12,13 @@ LOG_CHANNEL(sys_net); s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& addr) { s32 res{}; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) { auto& def_port = ::at32(nc.list_p2p_ports, SCE_NP_PORT); - res = ::sendto(def_port.p2p_socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(&addr), sizeof(sockaddr_in)); + res = ::sendto(def_port.p2p_socket, reinterpret_cast(data.data()), ::size32(data), 0, reinterpret_cast(&addr), sizeof(sockaddr_in)); if (res == -1) sys_net.error("Failed to send signaling packet: %s", get_last_error(false, false)); @@ -35,7 +35,7 @@ s32 send_packet_from_p2p_port(const std::vector& data, const sockaddr_in& ad std::vector> get_rpcn_msgs() { std::vector> msgs; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) @@ -59,7 +59,7 @@ std::vector> get_rpcn_msgs() std::vector get_sign_msgs() { std::vector msgs; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); { std::lock_guard list_lock(nc.list_p2p_ports_mutex); if (nc.list_p2p_ports.contains(SCE_NP_PORT)) @@ -85,15 +85,15 @@ namespace np void init_np_handler_dependencies(); } -p2p_thread::p2p_thread() +network_thread::network_thread() { np::init_np_handler_dependencies(); } -void p2p_thread::bind_sce_np_port() +void network_thread::bind_sce_np_port() { std::lock_guard list_lock(list_p2p_ports_mutex); - create_p2p_port(SCE_NP_PORT); + list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(SCE_NP_PORT), std::forward_as_tuple(SCE_NP_PORT)); } void network_thread::operator()() @@ -109,14 +109,10 @@ void network_thread::operator()() std::vector was_connecting(lv2_socket::id_count); #endif + std::vector<::pollfd> p2p_fd(lv2_socket::id_count); + while (thread_ctrl::state() != thread_state::aborting) { - if (!num_polls) - { - thread_ctrl::wait_on(num_polls, 0); - continue; - } - ensure(socklist.size() <= lv2_socket::id_count); // Wait with 1ms timeout @@ -126,6 +122,46 @@ void network_thread::operator()() ::poll(fds.data(), socklist.size(), 1); #endif + // Check P2P sockets for incoming packets(timeout could probably be set at 0) + { + std::lock_guard lock(list_p2p_ports_mutex); + std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); + auto num_p2p_sockets = 0; + for (const auto& p2p_port : list_p2p_ports) + { + p2p_fd[num_p2p_sockets].events = POLLIN; + p2p_fd[num_p2p_sockets].revents = 0; + p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket; + num_p2p_sockets++; + } + + if (num_p2p_sockets) + { +#ifdef _WIN32 + const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1); +#else + const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1); +#endif + if (ret_p2p > 0) + { + auto fd_index = 0; + for (auto& p2p_port : list_p2p_ports) + { + if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) + { + while (p2p_port.second.recv_data()) + ; + } + fd_index++; + } + } + else if (ret_p2p < 0) + { + sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false)); + } + } + } + std::lock_guard lock(s_nw_mutex); for (usz i = 0; i < socklist.size(); i++) @@ -180,69 +216,3 @@ void network_thread::operator()() } } } - -// Must be used under list_p2p_ports_mutex lock! -void p2p_thread::create_p2p_port(u16 p2p_port) -{ - if (!list_p2p_ports.contains(p2p_port)) - { - list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port)); - const u32 prev_value = num_p2p_ports.fetch_add(1); - if (!prev_value) - { - num_p2p_ports.notify_one(); - } - } -} - -void p2p_thread::operator()() -{ - std::vector<::pollfd> p2p_fd(lv2_socket::id_count); - - while (thread_ctrl::state() != thread_state::aborting) - { - if (!num_p2p_ports) - { - thread_ctrl::wait_on(num_p2p_ports, 0); - continue; - } - - // Check P2P sockets for incoming packets - auto num_p2p_sockets = 0; - { - std::lock_guard lock(list_p2p_ports_mutex); - std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd)); - for (const auto& p2p_port : list_p2p_ports) - { - p2p_fd[num_p2p_sockets].events = POLLIN; - p2p_fd[num_p2p_sockets].revents = 0; - p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket; - num_p2p_sockets++; - } - } - -#ifdef _WIN32 - const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1); -#else - const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1); -#endif - if (ret_p2p > 0) - { - std::lock_guard lock(list_p2p_ports_mutex); - auto fd_index = 0; - for (auto& p2p_port : list_p2p_ports) - { - if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM) - { - while (p2p_port.second.recv_data()) - ; - } - fd_index++; - } - } - else if (ret_p2p < 0) - { - sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false)); - } - } -} diff --git a/rpcs3/Emu/Cell/lv2/sys_net/network_context.h b/rpcs3/Emu/Cell/lv2/sys_net/network_context.h index ad3c51a1a8..3814a4776a 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/network_context.h +++ b/rpcs3/Emu/Cell/lv2/sys_net/network_context.h @@ -11,28 +11,15 @@ struct network_thread { std::vector s_to_awake; shared_mutex s_nw_mutex; - atomic_t num_polls = 0; + + shared_mutex list_p2p_ports_mutex; + std::map list_p2p_ports{}; static constexpr auto thread_name = "Network Thread"; - void operator()(); -}; - -struct p2p_thread -{ - shared_mutex list_p2p_ports_mutex; - std::map list_p2p_ports; - atomic_t num_p2p_ports = 0; - - static constexpr auto thread_name = "Network P2P Thread"; - - p2p_thread(); - - void create_p2p_port(u16 p2p_port); - + network_thread(); void bind_sce_np_port(); void operator()(); }; using network_context = named_thread; -using p2p_context = named_thread; diff --git a/rpcs3/Emu/NP/np_handler.cpp b/rpcs3/Emu/NP/np_handler.cpp index d3780baf87..74a484658e 100644 --- a/rpcs3/Emu/NP/np_handler.cpp +++ b/rpcs3/Emu/NP/np_handler.cpp @@ -408,11 +408,11 @@ namespace np void np_handler::init_np_handler_dependencies() { - if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init() && !m_inited_np_handler_dependencies) + if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init() && !m_inited_np_handler_dependencies) { m_inited_np_handler_dependencies = true; - auto& nc = g_fxo->get(); + auto& nc = g_fxo->get(); nc.bind_sce_np_port(); std::lock_guard lock(mutex_rpcn); @@ -817,16 +817,6 @@ namespace np string_to_online_name(rpcn->get_online_name(), online_name); string_to_avatar_url(rpcn->get_avatar_url(), avatar_url); public_ip_addr = rpcn->get_addr_sig(); - - if (!public_ip_addr) - { - rsx::overlays::queue_message(rpcn::rpcn_state_to_localized_string_id(rpcn::rpcn_state::failure_other)); - rpcn_log.error("Failed to get a reply from RPCN signaling!"); - is_psn_active = false; - rpcn->terminate_connection(); - return; - } - local_ip_addr = std::bit_cast>(rpcn->get_addr_local()); break; diff --git a/rpcs3/Emu/NP/rpcn_client.cpp b/rpcs3/Emu/NP/rpcn_client.cpp index 44ce18be61..fcbddee27d 100644 --- a/rpcs3/Emu/NP/rpcn_client.cpp +++ b/rpcs3/Emu/NP/rpcn_client.cpp @@ -2594,58 +2594,4 @@ namespace rpcn return it == friend_infos.friends.end() ? std::nullopt : std::optional(*it); } - bool rpcn_client::is_connected() const - { - return connected; - } - - bool rpcn_client::is_authentified() const - { - return authentified; - } - rpcn_state rpcn_client::get_rpcn_state() const - { - return state; - } - - const std::string& rpcn_client::get_online_name() const - { - return online_name; - } - - const std::string& rpcn_client::get_avatar_url() const - { - return avatar_url; - } - - u32 rpcn_client::get_addr_sig() const - { - if (!addr_sig) - { - addr_sig.wait(0, static_cast(10'000'000'000)); - } - - return addr_sig.load(); - } - - u16 rpcn_client::get_port_sig() const - { - if (!port_sig) - { - port_sig.wait(0, static_cast(10'000'000'000)); - } - - return port_sig.load(); - } - - u32 rpcn_client::get_addr_local() const - { - return local_addr_sig.load(); - } - - void rpcn_client::update_local_addr(u32 addr) - { - local_addr_sig = std::bit_cast>(addr); - } - } // namespace rpcn diff --git a/rpcs3/Emu/NP/rpcn_client.h b/rpcs3/Emu/NP/rpcn_client.h index c1f9f1e11a..af432d787a 100644 --- a/rpcs3/Emu/NP/rpcn_client.h +++ b/rpcs3/Emu/NP/rpcn_client.h @@ -437,9 +437,18 @@ namespace rpcn void remove_message_cb(message_cb_func cb_func, void* cb_param); void mark_message_used(u64 id); - bool is_connected() const; - bool is_authentified() const; - rpcn_state get_rpcn_state() const; + bool is_connected() const + { + return connected; + } + bool is_authentified() const + { + return authentified; + } + rpcn_state get_rpcn_state() const + { + return state; + } void server_infos_updated(); @@ -486,13 +495,44 @@ namespace rpcn bool tus_delete_multislot_data(u32 req_id, SceNpCommunicationId& communication_id, const SceNpOnlineId& targetNpId, vm::cptr slotIdArray, s32 arrayNum, bool vuser); bool send_presence(const SceNpCommunicationId& pr_com_id, const std::string& pr_title, const std::string& pr_status, const std::string& pr_comment, const std::vector& pr_data); - const std::string& get_online_name() const; - const std::string& get_avatar_url() const; + const std::string& get_online_name() const + { + return online_name; + } + const std::string& get_avatar_url() const + { + return avatar_url; + } - u32 get_addr_sig() const; - u16 get_port_sig() const; - u32 get_addr_local() const; - void update_local_addr(u32 addr); + u32 get_addr_sig() const + { + if (!addr_sig) + { + addr_sig.wait(0, static_cast(10'000'000'000)); + } + + return addr_sig.load(); + } + + u16 get_port_sig() const + { + if (!port_sig) + { + port_sig.wait(0, static_cast(10'000'000'000)); + } + + return port_sig.load(); + } + + u32 get_addr_local() const + { + return local_addr_sig.load(); + } + + void update_local_addr(u32 addr) + { + local_addr_sig = std::bit_cast>(addr); + } private: bool get_reply(u64 expected_id, std::vector& data);