diff --git a/rpcs3/Emu/Cell/lv2/sys_net.cpp b/rpcs3/Emu/Cell/lv2/sys_net.cpp index c0aa6d0f9b..d3c27bdd73 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net.cpp @@ -554,6 +554,27 @@ struct nt_p2p_port auto packet = generate_u2s_packet(send_hdr, nullptr, 0); sys_net.trace("Sent ack %d", final_ack); send_u2s_packet(sock, sock_id, std::move(packet), reinterpret_cast<::sockaddr_in*>(op_addr), 0, false); + + // check if polling is happening + if (sock.p2ps.data_available && sock.events.test_and_reset(lv2_socket::poll::read)) + { + bs_t events = lv2_socket::poll::read; + for (auto it = sock.queue.begin(); events && it != sock.queue.end();) + { + if (it->second(events)) + { + it = sock.queue.erase(it); + continue; + } + it++; + } + + if (sock.queue.empty()) + { + sock.events.store({}); + } + } + }; if (sock.p2ps.status == lv2_socket::p2ps_i::stream_status::stream_handshaking) @@ -749,7 +770,7 @@ struct nt_p2p_port p2p_addr.sin_len = sizeof(sys_net_sockaddr_in); p2p_addr.sin_family = SYS_NET_AF_INET; p2p_addr.sin_addr = std::bit_cast, u32>(reinterpret_cast(&native_addr)->sin_addr.s_addr); - p2p_addr.sin_vport = dst_vport; // That is weird stuff + p2p_addr.sin_vport = dst_vport; p2p_addr.sin_port = std::bit_cast, u16>(reinterpret_cast(&native_addr)->sin_port); std::vector p2p_data(recv_res - sizeof(u16)); @@ -762,6 +783,26 @@ struct nt_p2p_port sock.p2p.data.push(std::make_pair(std::move(p2p_addr), std::move(p2p_data))); sys_net.trace("Received a P2P packet for vport %d and saved it", dst_vport); + + // Check if poll is happening + if (sock.events.test_and_reset(lv2_socket::poll::read)) + { + bs_t events = lv2_socket::poll::read; + for (auto it = sock.queue.begin(); events && it != sock.queue.end();) + { + if (it->second(events)) + { + it = sock.queue.erase(it); + continue; + } + it++; + } + + if (sock.queue.empty()) + { + sock.events.store({}); + } + } }); // Should not happen in theory @@ -906,7 +947,7 @@ struct network_thread num_p2p_sockets++; } - if (num_p2p_sockets != 0) + if (num_p2p_sockets) { #ifdef _WIN32 const auto ret_p2p = WSAPoll(p2p_fd, num_p2p_sockets, 1); @@ -990,10 +1031,11 @@ struct network_thread s_to_awake.clear(); socklist.clear(); - // Obtain all active sockets - idm::select([&](u32 id, lv2_socket&) + // Obtain all non P2P active sockets + idm::select([&](u32 id, lv2_socket& s) { - socklist.emplace_back(idm::get_unlocked(id)); + if(s.type != SYS_NET_SOCK_DGRAM_P2P && s.type != SYS_NET_SOCK_STREAM_P2P) + socklist.emplace_back(idm::get_unlocked(id)); }); for (std::size_t i = 0; i < socklist.size(); i++) @@ -2971,6 +3013,7 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr fds, s32 n fds_buf[i].revents |= SYS_NET_POLLIN; } + // Data can always be written on a dgram socket if (fds[i].events & SYS_NET_POLLOUT) fds_buf[i].revents |= SYS_NET_POLLOUT; @@ -2985,12 +3028,15 @@ error_code sys_net_bnet_poll(ppu_thread& ppu, vm::ptr fds, s32 n { if ((fds[i].events & SYS_NET_POLLIN) && sock->p2ps.data_available) { - sys_net.trace("[P2P] p2p_data for vport %d contains %d elements", sock->p2p.vport, sock->p2p.data.size()); + sys_net.trace("[P2PS] p2ps has %d bytes available", sock->p2ps.data_available); fds_buf[i].revents |= SYS_NET_POLLIN; } - if (fds[i].events & SYS_NET_POLLOUT) + // Data can only be written if the socket is connected + if (fds[i].events & SYS_NET_POLLOUT && sock->p2ps.status == lv2_socket::p2ps_i::stream_status::stream_connected) + { fds_buf[i].revents |= SYS_NET_POLLOUT; + } if (fds_buf[i].revents) signaled++;