From 5a10807f912daaff890bdf2ad2137ea4daac1c0c Mon Sep 17 00:00:00 2001 From: Arisotura Date: Sat, 20 Aug 2022 02:29:42 +0200 Subject: [PATCH] shared-memory IPC that actually works (albeit Windows-only for now) --- src/Platform.h | 4 +- src/Wifi.cpp | 187 +++++++++++++++------------ src/frontend/qt_sdl/LocalMP.cpp | 211 ++++++++++++++++++++----------- src/frontend/qt_sdl/LocalMP.h | 4 +- src/frontend/qt_sdl/Platform.cpp | 8 +- 5 files changed, 248 insertions(+), 166 deletions(-) diff --git a/src/Platform.h b/src/Platform.h index 31b421f3..c05d31cc 100644 --- a/src/Platform.h +++ b/src/Platform.h @@ -156,8 +156,8 @@ void WriteGBASave(const u8* savedata, u32 savelen, u32 writeoffset, u32 writelen // packet type: DS-style TX header (12 bytes) + original 802.11 frame bool MP_Init(); void MP_DeInit(); -int MP_SendPacket(u8* data, int len); -int MP_RecvPacket(u8* data, bool block); +int MP_SendPacket(u8* data, int len, u64 timestamp); +int MP_RecvPacket(u8* data, bool block, u64* timestamp); bool MP_SendSync(u16 clientmask, u16 type, u64 val); bool MP_WaitSync(u16 clientmask, u16* type, u64* val); u16 MP_WaitMultipleSyncs(u16 type, u16 clientmask, u64 curval); diff --git a/src/Wifi.cpp b/src/Wifi.cpp index c8e3cd1e..86bad2ad 100644 --- a/src/Wifi.cpp +++ b/src/Wifi.cpp @@ -101,6 +101,7 @@ bool ForcePowerOn; bool IsMPClient; u64 TimeOffsetToHost; // clienttime - hosttime u64 NextSync; // for clients: timestamp for next forced sync +u32 NextSyncType; bool SyncBack; // for clients: whether to send the host a sync once the sync is reached const u64 kMaxRunahead = 4096; @@ -588,7 +589,7 @@ void SendMPDefaultReply() *(u16*)&reply[0xC + 0x16] = IOPORT(W_TXSeqNo) << 4; *(u32*)&reply[0xC + 0x18] = 0; - int txlen = Platform::MP_SendPacket(reply, 12+28); + int txlen = Platform::MP_SendPacket(reply, 12+28, USTimestamp); WIFI_LOG("wifi: sent %d/40 bytes of MP default reply\n", txlen); } @@ -618,7 +619,7 @@ void SendMPAck() *(u16*)&ack[0xC + 0x1A] = 0; // TODO: bitmask of which clients failed to reply *(u32*)&ack[0xC + 0x1C] = 0; - int txlen = Platform::MP_SendPacket(ack, 12+32); + int txlen = Platform::MP_SendPacket(ack, 12+32, USTimestamp); WIFI_LOG("wifi: sent %d/44 bytes of MP ack, %d %d\n", txlen, ComStatus, RXTime); } @@ -759,17 +760,14 @@ bool ProcessTX(TXSlot* slot, int num) // make sure the clients are synced up // TODO!!!! this should be for all currently connected clients regardless of the clientmask in the packet u16 clientmask = *(u16*)&RAM[slot->Addr + 0xC + 26]; - Platform::MP_SendSync(/*clientmask*/0x0002, 2, USCounter); + //Platform::MP_SendSync(/*clientmask*/0x0002, 2, USCounter); //printf("[HOST] sending CMD, sent sync2, waiting\n"); - u16 res = Platform::MP_WaitMultipleSyncs(3, /*clientmask*/0x0002, USCounter); + //u16 res = Platform::MP_WaitMultipleSyncs(3, /*clientmask*/0x0002, USCounter); //printf("[HOST] got sync3: %04X\n", res); - - //if (slot->Length < 100) - //if (*(u16*)&RAM[slot->Addr+0x28] == 0) - // *(u16*)&RAM[slot->Addr+0x28] = 0x0100; + Platform::MP_SendSync(0xFFFE, 2, USTimestamp); // send - int txlen = Platform::MP_SendPacket(&RAM[slot->Addr], 12 + slot->Length); + int txlen = Platform::MP_SendPacket(&RAM[slot->Addr], 12 + slot->Length, USTimestamp); WIFI_LOG("wifi: sent %d/%d bytes of slot%d packet, addr=%04X, framectl=%04X, %04X %04X\n", txlen, slot->Length+12, num, slot->Addr, *(u16*)&RAM[slot->Addr + 0xC], *(u16*)&RAM[slot->Addr + 0x24], *(u16*)&RAM[slot->Addr + 0x26]); @@ -778,9 +776,10 @@ bool ProcessTX(TXSlot* slot, int num) // send further sync u32 numclients = NumClients(clientmask); u32 replywait = 112 + ((10 + IOPORT(W_CmdReplyTime)) * numclients); - u32 acklen = 32 * (slot->Rate==2 ? 4:8); + //u32 acklen = 32 * (slot->Rate==2 ? 4:8); //Platform::MP_SendSync(/*clientmask*/0x0002, 1, USCounter + len + replywait + acklen);// + kMaxRunahead); - Platform::MP_SendSync(/*clientmask*/0x0002, 2, USCounter + len + replywait); + //Platform::MP_SendSync(/*clientmask*/0x0002, 2, USCounter + len + replywait); + Platform::MP_SendSync(0xFFFE, 2, USTimestamp + len + replywait); } else if (num == 5) { @@ -788,7 +787,7 @@ bool ProcessTX(TXSlot* slot, int num) *(u16*)&RAM[slot->Addr+6] = vogon; // send - int txlen = Platform::MP_SendPacket(&RAM[slot->Addr], 12 + slot->Length); + int txlen = Platform::MP_SendPacket(&RAM[slot->Addr], 12 + slot->Length, USTimestamp); WIFI_LOG("wifi: sent %d/%d bytes of slot%d packet, addr=%04X, framectl=%04X, %04X %04X\n", txlen, slot->Length+12, num, slot->Addr, *(u16*)&RAM[slot->Addr + 0xC], *(u16*)&RAM[slot->Addr + 0x24], *(u16*)&RAM[slot->Addr + 0x26]); @@ -798,7 +797,7 @@ bool ProcessTX(TXSlot* slot, int num) else //if (num != 5) { // send - int txlen = Platform::MP_SendPacket(&RAM[slot->Addr], 12 + slot->Length); + int txlen = Platform::MP_SendPacket(&RAM[slot->Addr], 12 + slot->Length, USTimestamp); WIFI_LOG("wifi: sent %d/%d bytes of slot%d packet, addr=%04X, framectl=%04X, %04X %04X\n", txlen, slot->Length+12, num, slot->Addr, *(u16*)&RAM[slot->Addr + 0xC], *(u16*)&RAM[slot->Addr + 0x24], *(u16*)&RAM[slot->Addr + 0x26]); @@ -813,12 +812,14 @@ bool ProcessTX(TXSlot* slot, int num) { // if we're sending an association response: // we are likely acting as a local MP host, and are welcoming a new client to the club - // in this case, sync them up: send them our current USCOUNT value - // which will let them understand further sync values + // in this case, sync them up: send them our current microsecond timestamp u16 aid = *(u16*)&RAM[slot->Addr + 0xC + 24 + 4]; - //printf("[HOST] syncing client %04X, sync=%016llX\n", aid, USCounter); - Platform::MP_SendSync(1<<(aid&0xF), 0, USCounter); + if (aid) + { + printf("[HOST] syncing client %04X, sync=%016llX\n", aid, USTimestamp); + Platform::MP_SendSync(1<<(aid&0xF), 0, USTimestamp); + } } WifiAP::SendPacket(&RAM[slot->Addr], 12 + slot->Length); @@ -935,14 +936,15 @@ bool ProcessTX(TXSlot* slot, int num) u16 clientmask = *(u16*)&RAM[slot->Addr + 0xC + 26]; //Platform::MP_SendSync(/*clientmask*/0x0002, 2, USCounter); //printf("[HOST] sending CMD, sent sync2, waiting\n"); - u16 res = Platform::MP_WaitMultipleSyncs(3, /*clientmask*/0x0002, USCounter); + //u16 res = Platform::MP_WaitMultipleSyncs(3, /*clientmask*/0x0002, USCounter); //printf("[HOST] got sync3: %04X\n", res); // send SendMPAck(); // send further sync - Platform::MP_SendSync(/*clientmask*/0x0002, 1, USCounter + slot->CurPhaseTime); + //Platform::MP_SendSync(/*clientmask*/0x0002, 1, USCounter + slot->CurPhaseTime); + Platform::MP_SendSync(0xFFFE, 1, USTimestamp + slot->CurPhaseTime); slot->CurPhase = 3; } @@ -1022,7 +1024,7 @@ bool CheckRX(bool block) for (;;) { - int rxlen = Platform::MP_RecvPacket(RXBuffer, block); + int rxlen = Platform::MP_RecvPacket(RXBuffer, block, nullptr); if (rxlen == 0) rxlen = WifiAP::RecvPacket(RXBuffer); if (rxlen == 0) return false; if (rxlen < 12+24) continue; @@ -1087,6 +1089,8 @@ bool CheckRX(bool block) break; } + // TODO get rid of this + // ensuring we don't receive our own crap is the responsibility of the comm layer!! if (MACEqual(&RXBuffer[12 + a_src], (u8*)&IOPORT(W_MACAddr0))) continue; // oops. we received a packet we just sent. @@ -1130,24 +1134,30 @@ bool CheckRX(bool block) { u16 aid = *(u16*)&RXBuffer[12+24+4]; - //u64 sync = Platform::MP_WaitSync(0, 1<<(aid&0xF), 0); - u64 sync = 0; - for (;;) + if (aid) { - u16 type; u64 val; - bool res = Platform::MP_WaitSync(1<<(aid&0xF), &type, &val); - if (!res) break; - if (type != 0) continue; - sync = val; - break; - } - if (sync) - { - printf("[CLIENT %01X] host sync=%016llX\n", aid&0xF, sync); + //u64 sync = Platform::MP_WaitSync(0, 1<<(aid&0xF), 0); + u64 sync = 0; + for (;;) + { + u16 type; u64 val; + bool res = Platform::MP_WaitSync(1<<(aid&0xF), &type, &val); + printf("wait sync: %d, %d, %016llX\n", res, type, val); + if (!res) break; + if (type != 0) continue; + sync = val; + break; + } + if (sync) + { + printf("[CLIENT %01X] host sync=%016llX\n", aid&0xF, sync); - IsMPClient = true; - TimeOffsetToHost = USCounter - sync; - NextSync = USCounter + kMaxRunahead; + IsMPClient = true; + //TimeOffsetToHost = USCounter - sync; + //NextSync = USCounter + kMaxRunahead; + USTimestamp = sync; + NextSync = USTimestamp; + } } } @@ -1213,6 +1223,66 @@ void MSTimer() void USTimer(u32 param) { USTimestamp++; + if (USTimestamp == NextSync) + { + /*if (SyncBack) + {//printf("[CLIENT %01X] sending sync3\n", IOPORT(W_AIDLow)); + SyncBack = false; + u16 aid = IOPORT(W_AIDLow); + Platform::MP_SendSync(1<<(aid&0xF), 3, 0); + + if (CheckRX(true)) + { + ComStatus |= 0x1; + } + }*/ + if (NextSyncType == 2) + { + if (CheckRX(true)) + { + ComStatus |= 0x1; + } + } + + //u64 sync = Platform::MP_WaitSync(1, 1<<(IOPORT(W_AIDLow)&0xF), USCounter - TimeOffsetToHost); + for (;;) + { + u16 type; u64 val; + u16 aid = IOPORT(W_AIDLow); + //printf("[CLIENT %01X] waiting for sync\n", aid); + bool res = Platform::MP_WaitSync(1<<(aid&0xF), &type, &val); + //printf("[CLIENT %01X] got sync, res=%d type=%04X val=%016llX\n", aid, res, type, val); + if (!res) break; + + // timeoffset = client - host + //val += TimeOffsetToHost; + + //if ((type == 1) && (val > USTimestamp)) + if (val > USTimestamp) + { + NextSync = val; + NextSyncType = type; + break; + } + /*else if ((type == 2) && (val > USTimestamp)) + { + NextSync = val; + + break; + } + /*else if ((type == 2) && (val > USCounter)) + {//printf("[CLIENT %01X] received sync2: %016llX\n", aid, val); + NextSync = val; + SyncBack = true; + break; + }*/ + } + /*if (sync) + { + NextSync = USCounter - sync; + }*/ + } + WifiAP::USTimer(); @@ -1243,51 +1313,6 @@ void USTimer(u32 param) } if (!uspart) MSTimer(); - - if (USCounter == NextSync) - { - if (SyncBack) - {//printf("[CLIENT %01X] sending sync3\n", IOPORT(W_AIDLow)); - SyncBack = false; - u16 aid = IOPORT(W_AIDLow); - Platform::MP_SendSync(1<<(aid&0xF), 3, 0); - - if (CheckRX(true)) - { - ComStatus |= 0x1; - } - } - - //u64 sync = Platform::MP_WaitSync(1, 1<<(IOPORT(W_AIDLow)&0xF), USCounter - TimeOffsetToHost); - for (;;) - { - u16 type; u64 val; - u16 aid = IOPORT(W_AIDLow); - //printf("[CLIENT %01X] waiting for sync\n", aid); - bool res = Platform::MP_WaitSync(1<<(aid&0xF), &type, &val); - //printf("[CLIENT %01X] got sync, res=%d type=%04X val=%016llX\n", aid, res, type, val); - if (!res) break; - - // timeoffset = client - host - val += TimeOffsetToHost; - - if ((type == 1) && (val > USCounter)) - { - NextSync = val; - break; - } - else if ((type == 2) && (val > USCounter)) - {//printf("[CLIENT %01X] received sync2: %016llX\n", aid, val); - NextSync = val; - SyncBack = true; - break; - } - } - /*if (sync) - { - NextSync = USCounter - sync; - }*/ - } } if (IOPORT(W_CmdCountCnt) & 0x0001) diff --git a/src/frontend/qt_sdl/LocalMP.cpp b/src/frontend/qt_sdl/LocalMP.cpp index e6280219..57db298e 100644 --- a/src/frontend/qt_sdl/LocalMP.cpp +++ b/src/frontend/qt_sdl/LocalMP.cpp @@ -76,8 +76,18 @@ struct MPQueueHeader struct MPPacketHeader { u32 Magic; - u32 Length; u32 SenderID; + u32 Length; + u64 Timestamp; +}; + +struct MPSync +{ + u32 Magic; + u32 SenderID; + u16 ClientMask; + u16 Type; + u64 Timestamp; }; QSharedMemory* MPQueue; @@ -101,25 +111,29 @@ const u32 kPacketEnd = 0x10000; //#ifdef _WIN32 #if 1 +bool SemInited[32]; HANDLE SemPool[32]; void SemPoolInit() { for (int i = 0; i < 32; i++) + { SemPool[i] = INVALID_HANDLE_VALUE; + SemInited[i] = false; + } } bool SemInit(int num) { - if (SemPool[num] != INVALID_HANDLE_VALUE) + if (SemInited[num]) return true; char semname[64]; - sprintf(semname, "Local\\melonNIFI_Sem%d", num); + sprintf(semname, "Local\\melonNIFI_Sem%02d", num); HANDLE sem = CreateSemaphore(nullptr, 0, 64, semname); SemPool[num] = sem; - + SemInited[num] = true; return sem != INVALID_HANDLE_VALUE; } @@ -129,6 +143,7 @@ void SemDeinit(int num) { CloseHandle(SemPool[num]); SemPool[num] = INVALID_HANDLE_VALUE; + SemInited[num] = false; } } @@ -280,10 +295,10 @@ bool Init() if (!MPQueue->attach()) { - printf("sharedmem doesn't exist. creating\n"); + printf("MP sharedmem doesn't exist. creating\n"); if (!MPQueue->create(65536)) { - printf("sharedmem create failed :(\n"); + printf("MP sharedmem create failed :(\n"); return false; } @@ -332,14 +347,16 @@ bool Init() void DeInit() { - if (MPSocket[0] >= 0) + /*if (MPSocket[0] >= 0) closesocket(MPSocket[0]); if (MPSocket[1] >= 0) closesocket(MPSocket[1]); #ifdef __WIN32__ WSACleanup(); -#endif // __WIN32__ +#endif // __WIN32__*/ + SemDeinit(InstanceID); + SemDeinit(16+InstanceID); } void PacketFIFORead(void* buf, int len) @@ -385,49 +402,48 @@ void PacketFIFOWrite(void* buf, int len) header->PacketWriteOffset = offset; } -int SendPacket(u8* packet, int len) +int SendPacket(u8* packet, int len, u64 timestamp) { MPQueue->lock(); u8* data = (u8*)MPQueue->data(); MPQueueHeader* header = (MPQueueHeader*)&data[0]; u16 mask = header->InstanceBitmask; - u32 offset = header->PacketWriteOffset; // TODO: check if the FIFO is full! MPPacketHeader pktheader; pktheader.Magic = 0x4946494E; - pktheader.Length = len; pktheader.SenderID = InstanceID; + pktheader.Length = len; + pktheader.Timestamp = timestamp; PacketFIFOWrite(&pktheader, sizeof(pktheader)); PacketFIFOWrite(packet, len); + MPQueue->unlock(); + for (int i = 0; i < 16; i++) { if (mask & (1<unlock(); return len; } -int RecvPacket(u8* packet, bool block) +int RecvPacket(u8* packet, bool block, u64* timestamp) { - MPQueue->lock(); - u8* data = (u8*)MPQueue->data(); - MPQueueHeader* header = (MPQueueHeader*)&data[0]; - for (;;) { if (!SemWait(InstanceID, block ? 500 : 0)) { - MPQueue->unlock(); return 0; } + MPQueue->lock(); + u8* data = (u8*)MPQueue->data(); + MPPacketHeader pktheader; PacketFIFORead(&pktheader, sizeof(pktheader)); @@ -445,97 +461,137 @@ int RecvPacket(u8* packet, bool block) if (PacketReadOffset >= kPacketEnd) PacketReadOffset += kPacketStart - kPacketEnd; + MPQueue->unlock(); continue; } PacketFIFORead(packet, pktheader.Length); + if (timestamp) *timestamp = pktheader.Timestamp; MPQueue->unlock(); return pktheader.Length; } } -bool SendSync(u16 clientmask, u16 type, u64 val) + +void SyncFIFORead(MPSync* sync) { - u8 syncbuf[32]; + u8* data = (u8*)MPQueue->data(); - if (MPSocket[1] < 0) - return false; + int len = sizeof(MPSync); + u32 offset = SyncReadOffset; + if ((offset + len) >= kSyncEnd) + { + u32 part1 = kSyncEnd - offset; + memcpy(sync, &data[offset], part1); + memcpy(&((u8*)sync)[part1], &data[kSyncStart], len - part1); + offset = kSyncStart + len - part1; + } + else + { + memcpy(sync, &data[offset], len); + offset += len; + } - int len = 16; - *(u32*)&syncbuf[0] = htonl(0x4946494E); // NIFI - syncbuf[4] = NIFI_VER; - syncbuf[5] = 1; - *(u16*)&syncbuf[6] = htons(len); - *(u16*)&syncbuf[8] = htons(type); - *(u16*)&syncbuf[10] = htons(clientmask); - *(u32*)&syncbuf[12] = MPUniqueID; - *(u32*)&syncbuf[16] = htonl((u32)val); - *(u32*)&syncbuf[20] = htonl((u32)(val>>32)); - - int slen = sendto(MPSocket[1], (const char*)syncbuf, len+8, 0, &MPSendAddr[1], sizeof(sockaddr_t)); - return slen == len+8; + SyncReadOffset = offset; } -bool WaitSync(u16 clientmask, u16* type, u64* val) +void SyncFIFOWrite(MPSync* sync) { - u8 syncbuf[32]; + u8* data = (u8*)MPQueue->data(); + MPQueueHeader* header = (MPQueueHeader*)&data[0]; - if (MPSocket[1] < 0) - return false; + int len = sizeof(MPSync); + u32 offset = header->SyncWriteOffset; + if ((offset + len) >= kSyncEnd) + { + u32 part1 = kSyncEnd - offset; + memcpy(&data[offset], sync, part1); + memcpy(&data[kSyncStart], &((u8*)sync)[part1], len - part1); + offset = kSyncStart + len - part1; + } + else + { + memcpy(&data[offset], sync, len); + offset += len; + } - fd_set fd; - struct timeval tv; + header->SyncWriteOffset = offset; +} +bool SendSync(u16 clientmask, u16 type, u64 timestamp) +{ + MPQueue->lock(); + u8* data = (u8*)MPQueue->data(); + MPQueueHeader* header = (MPQueueHeader*)&data[0]; + + u16 mask = header->InstanceBitmask; + + // TODO: check if the FIFO is full! + + MPSync sync; + sync.Magic = 0x434E5953; + sync.SenderID = InstanceID; + sync.ClientMask = clientmask; + sync.Type = type; + sync.Timestamp = timestamp; + + SyncFIFOWrite(&sync); + + MPQueue->unlock(); + + for (int i = 0; i < 16; i++) + { + if (mask & (1<lock(); + u8* data = (u8*)MPQueue->data(); + + MPSync sync; + SyncFIFORead(&sync); + + if (sync.Magic != 0x434E5953) + { + printf("MP: !!!! SYNC FIFO IS CRAPOED\n"); + MPQueue->unlock(); + return false; + } + + if (sync.SenderID == InstanceID) + { + MPQueue->unlock(); continue; - rlen -= 8; + } - if (ntohl(*(u32*)&syncbuf[0]) != 0x4946494E) + if (!(sync.ClientMask & clientmask)) + { + MPQueue->unlock(); continue; + } - if (syncbuf[4] != NIFI_VER || syncbuf[5] != 1) - continue; - - if (ntohs(*(u16*)&syncbuf[6]) != rlen) - continue; - - if (*(u32*)&syncbuf[12] == MPUniqueID) - continue; - - u16 clientval = ntohs(*(u16*)&syncbuf[10]); - if (!(clientmask & clientval)) - continue; - - // check the sync val, it should be ahead of the current sync val - u64 syncval = ntohl(*(u32*)&syncbuf[16]) | (((u64)ntohl(*(u32*)&syncbuf[20])) << 32); - //if (syncval <= curval) - // continue; - - if (type) *type = ntohs(*(u16*)&syncbuf[8]); - if (val) *val = syncval; - + if (type) *type = sync.Type; + if (timestamp) *timestamp = sync.Timestamp; + MPQueue->unlock(); return true; } } u16 WaitMultipleSyncs(u16 type, u16 clientmask, u64 curval) { - u8 syncbuf[32]; + /*u8 syncbuf[32]; if (!clientmask) return 0; @@ -595,7 +651,8 @@ u16 WaitMultipleSyncs(u16 type, u16 clientmask, u64 curval) return 0; } - return clientmask; + return clientmask;*/ + return 0; } } diff --git a/src/frontend/qt_sdl/LocalMP.h b/src/frontend/qt_sdl/LocalMP.h index c0b7e814..609ed1e9 100644 --- a/src/frontend/qt_sdl/LocalMP.h +++ b/src/frontend/qt_sdl/LocalMP.h @@ -26,8 +26,8 @@ namespace LocalMP bool Init(); void DeInit(); -int SendPacket(u8* data, int len); -int RecvPacket(u8* data, bool block); +int SendPacket(u8* data, int len, u64 timestamp); +int RecvPacket(u8* data, bool block, u64* timestamp); bool SendSync(u16 clientmask, u16 type, u64 val); bool WaitSync(u16 clientmask, u16* type, u64* val); u16 WaitMultipleSyncs(u16 type, u16 clientmask, u64 curval); diff --git a/src/frontend/qt_sdl/Platform.cpp b/src/frontend/qt_sdl/Platform.cpp index 4397d7cf..4a201110 100644 --- a/src/frontend/qt_sdl/Platform.cpp +++ b/src/frontend/qt_sdl/Platform.cpp @@ -369,14 +369,14 @@ void MP_DeInit() return LocalMP::DeInit(); } -int MP_SendPacket(u8* data, int len) +int MP_SendPacket(u8* data, int len, u64 timestamp) { - return LocalMP::SendPacket(data, len); + return LocalMP::SendPacket(data, len, timestamp); } -int MP_RecvPacket(u8* data, bool block) +int MP_RecvPacket(u8* data, bool block, u64* timestamp) { - return LocalMP::RecvPacket(data, block); + return LocalMP::RecvPacket(data, block, timestamp); } bool MP_SendSync(u16 clientmask, u16 type, u64 val)