sys_event improved

This commit is contained in:
Nekotekina 2015-07-19 15:58:11 +03:00
parent 2f7fe35f5c
commit bc91ad0f4d
5 changed files with 103 additions and 111 deletions

View File

@ -1316,9 +1316,9 @@ void SPUThread::stop_and_signal(u32 code)
{
group->state = SPU_THREAD_GROUP_STATUS_WAITING;
for (auto& t : group->threads)
for (auto& thread : group->threads)
{
if (t) t->sleep(); // trigger status check
if (thread) thread->sleep(); // trigger status check
}
}
else
@ -1326,35 +1326,37 @@ void SPUThread::stop_and_signal(u32 code)
throw EXCEPTION("Unexpected SPU Thread Group state (%d)", group->state);
}
// protocol is ignored in current implementation
queue->waiters++;
// wait on the event queue
while (queue->events.empty() && !queue->cancelled)
if (queue->events.empty() || !queue->sq.empty())
{
CHECK_EMU_STATUS;
// add waiter; protocol is ignored in current implementation
sleep_queue_entry_t waiter(*this, queue->sq);
// wait on the event queue
while (!unsignal())
{
CHECK_EMU_STATUS;
if (is_stopped()) throw CPUThreadStop{};
queue->cv.wait_for(lv2_lock, std::chrono::milliseconds(1));
cv.wait(lv2_lock);
}
}
if (queue->cancelled)
if (queue->events.empty())
{
if (Emu.GetIdManager().check_id<lv2_event_queue_t>(queue->id))
{
throw EXCEPTION("Unexpected");
}
ch_in_mbox.set_values(1, CELL_ECANCELED);
}
else
{
auto& event = queue->events.front();
ch_in_mbox.set_values(4, CELL_OK, static_cast<u32>(event.data1), static_cast<u32>(event.data2), static_cast<u32>(event.data3));
ch_in_mbox.set_values(4, CELL_OK, static_cast<u32>(std::get<1>(event)), static_cast<u32>(std::get<2>(event)), static_cast<u32>(std::get<3>(event)));
queue->events.pop_front();
queue->waiters--;
if (queue->events.size())
{
queue->cv.notify_one();
}
}
// restore thread group status
@ -1371,9 +1373,9 @@ void SPUThread::stop_and_signal(u32 code)
throw EXCEPTION("Unexpected SPU Thread Group state (%d)", group->state);
}
for (auto& t : group->threads)
for (auto& thread : group->threads)
{
if (t) t->awake(); // untrigger status check
if (thread) thread->awake(); // untrigger status check
}
group->cv.notify_all();
@ -1408,11 +1410,11 @@ void SPUThread::stop_and_signal(u32 code)
throw EXCEPTION("Invalid SPU Thread Group");
}
for (auto t : group->threads)
for (auto thread : group->threads)
{
if (t && t.get() != this)
if (thread && thread.get() != this)
{
t->stop();
thread->stop();
}
}

View File

@ -43,7 +43,7 @@ enum
using sleep_queue_t = std::deque<std::shared_ptr<CPUThread>>;
static struct defer_sleep_t{} const defer_sleep;
static struct defer_sleep_t{} const defer_sleep{};
// automatic object handling a thread entry in the sleep queue
class sleep_queue_entry_t final

View File

@ -6,7 +6,6 @@
#include "Emu/Cell/PPUThread.h"
#include "Emu/Event.h"
#include "sleep_queue.h"
#include "sys_process.h"
#include "sys_event.h"
@ -21,11 +20,25 @@ lv2_event_queue_t::lv2_event_queue_t(u32 protocol, s32 type, u64 name, u64 key,
, name(name)
, key(key)
, size(size)
, cancelled(false)
, waiters(0)
{
}
void lv2_event_queue_t::push(lv2_lock_t& lv2_lock, u64 source, u64 data1, u64 data2, u64 data3)
{
CHECK_LV2_LOCK(lv2_lock);
events.emplace_back(source, data1, data2, data3);
// notify waiter; protocol is ignored in current implementation
for (auto& waiter : sq)
{
if (waiter->signal())
{
return;
}
}
}
s32 sys_event_queue_create(vm::ptr<u32> equeue_id, vm::ptr<sys_event_queue_attribute_t> attr, u64 event_queue_key, s32 size)
{
sys_event.Warning("sys_event_queue_create(equeue_id=*0x%x, attr=*0x%x, event_queue_key=0x%llx, size=%d)", equeue_id, attr, event_queue_key, size);
@ -37,20 +50,18 @@ s32 sys_event_queue_create(vm::ptr<u32> equeue_id, vm::ptr<sys_event_queue_attri
const u32 protocol = attr->protocol;
switch (protocol)
if (protocol != SYS_SYNC_FIFO && protocol != SYS_SYNC_PRIORITY)
{
case SYS_SYNC_FIFO: break;
case SYS_SYNC_PRIORITY: break;
default: sys_event.Error("sys_event_queue_create(): unknown protocol (0x%x)", protocol); return CELL_EINVAL;
sys_event.Error("sys_event_queue_create(): unknown protocol (0x%x)", protocol);
return CELL_EINVAL;
}
const u32 type = attr->type;
switch (type)
if (type != SYS_PPU_QUEUE && type != SYS_SPU_QUEUE)
{
case SYS_PPU_QUEUE: break;
case SYS_SPU_QUEUE: break;
default: sys_event.Error("sys_event_queue_create(): unknown type (0x%x)", type); return CELL_EINVAL;
sys_event.Error("sys_event_queue_create(): unknown type (0x%x)", type);
return CELL_EINVAL;
}
const auto queue = Emu.GetEventManager().MakeEventQueue(event_queue_key, protocol, type, attr->name_u64, event_queue_key, size);
@ -83,24 +94,21 @@ s32 sys_event_queue_destroy(u32 equeue_id, s32 mode)
return CELL_EINVAL;
}
if (!mode && queue->waiters)
if (!mode && queue->sq.size())
{
return CELL_EBUSY;
}
if (queue->cancelled.exchange(true))
{
throw EXCEPTION("Unexpected value");
}
if (queue->waiters)
{
queue->cv.notify_all();
}
// cleanup
Emu.GetEventManager().UnregisterKey(queue->key);
Emu.GetIdManager().remove<lv2_event_queue_t>(equeue_id);
// signal all threads to return CELL_ECANCELED
for (auto& thread : queue->sq)
{
thread->signal();
}
return CELL_OK;
}
@ -129,15 +137,11 @@ s32 sys_event_queue_tryreceive(u32 equeue_id, vm::ptr<sys_event_t> event_array,
s32 count = 0;
while (!queue->waiters && count < size && queue->events.size())
while (queue->sq.empty() && count < size && queue->events.size())
{
auto& dest = event_array[count++];
auto& event = queue->events.front();
dest.source = event.source;
dest.data1 = event.data1;
dest.data2 = event.data2;
dest.data3 = event.data3;
std::tie(dest.source, dest.data1, dest.data2, dest.data3) = queue->events.front();
queue->events.pop_front();
}
@ -147,9 +151,9 @@ s32 sys_event_queue_tryreceive(u32 equeue_id, vm::ptr<sys_event_t> event_array,
return CELL_OK;
}
s32 sys_event_queue_receive(PPUThread& CPU, u32 equeue_id, vm::ptr<sys_event_t> dummy_event, u64 timeout)
s32 sys_event_queue_receive(PPUThread& ppu, u32 equeue_id, vm::ptr<sys_event_t> dummy_event, u64 timeout)
{
sys_event.Log("sys_event_queue_receive(equeue_id=0x%x, event=*0x%x, timeout=0x%llx)", equeue_id, dummy_event, timeout);
sys_event.Log("sys_event_queue_receive(equeue_id=0x%x, *0x%x, timeout=0x%llx)", equeue_id, dummy_event, timeout);
const u64 start_time = get_system_time();
@ -167,36 +171,47 @@ s32 sys_event_queue_receive(PPUThread& CPU, u32 equeue_id, vm::ptr<sys_event_t>
return CELL_EINVAL;
}
// protocol is ignored in current implementation
queue->waiters++;
while (queue->events.empty())
if (queue->events.empty() || !queue->sq.empty())
{
CHECK_EMU_STATUS;
// add waiter; protocol is ignored in current implementation
sleep_queue_entry_t waiter(ppu, queue->sq);
if (queue->cancelled)
while (!ppu.unsignal())
{
return CELL_ECANCELED;
}
CHECK_EMU_STATUS;
if (timeout && get_system_time() - start_time > timeout)
{
queue->waiters--;
return CELL_ETIMEDOUT;
}
if (timeout)
{
const u64 passed = get_system_time() - start_time;
queue->cv.wait_for(lv2_lock, std::chrono::milliseconds(1));
if (passed >= timeout)
{
return CELL_ETIMEDOUT;
}
ppu.cv.wait_for(lv2_lock, std::chrono::microseconds(timeout - passed));
}
else
{
ppu.cv.wait(lv2_lock);
}
}
}
// event data is returned in registers (second arg is not used)
auto& event = queue->events.front();
CPU.GPR[4] = event.source;
CPU.GPR[5] = event.data1;
CPU.GPR[6] = event.data2;
CPU.GPR[7] = event.data3;
if (queue->events.empty())
{
if (Emu.GetIdManager().check_id<lv2_event_queue_t>(equeue_id))
{
throw EXCEPTION("Unexpected");
}
return CELL_ECANCELED;
}
// event data is returned in registers (dummy_event is not used)
std::tie(ppu.GPR[4], ppu.GPR[5], ppu.GPR[6], ppu.GPR[7]) = queue->events.front();
queue->events.pop_front();
queue->waiters--;
return CELL_OK;
}
@ -225,7 +240,7 @@ s32 sys_event_port_create(vm::ptr<u32> eport_id, s32 port_type, u64 name)
if (port_type != SYS_EVENT_PORT_LOCAL)
{
sys_event.Error("sys_event_port_create(): invalid port_type (%d)", port_type);
sys_event.Error("sys_event_port_create(): unknown port type (%d)", port_type);
return CELL_EINVAL;
}

View File

@ -1,5 +1,7 @@
#pragma once
#include "sleep_queue.h"
namespace vm { using namespace ps3; }
// Event Queue Type
@ -68,22 +70,6 @@ struct sys_event_t
be_t<u64> data3;
};
struct lv2_event_t
{
const u64 source;
const u64 data1;
const u64 data2;
const u64 data3;
lv2_event_t(u64 source, u64 data1, u64 data2, u64 data3)
: source(source)
, data1(data1)
, data2(data2)
, data3(data3)
{
}
};
struct lv2_event_queue_t
{
const u32 id;
@ -93,26 +79,14 @@ struct lv2_event_queue_t
const u64 key;
const s32 size;
std::deque<lv2_event_t> events;
std::atomic<bool> cancelled;
// tuple elements: source, data1, data2, data3
std::deque<std::tuple<u64, u64, u64, u64>> events;
// TODO: use sleep queue, possibly remove condition variable
std::condition_variable cv;
std::atomic<u32> waiters;
sleep_queue_t sq;
lv2_event_queue_t(u32 protocol, s32 type, u64 name, u64 key, s32 size);
void push(lv2_lock_t& lv2_lock, u64 source, u64 data1, u64 data2, u64 data3)
{
CHECK_LV2_LOCK(lv2_lock);
events.emplace_back(source, data1, data2, data3);
if (waiters)
{
cv.notify_one();
}
}
void push(lv2_lock_t& lv2_lock, u64 source, u64 data1, u64 data2, u64 data3);
};
REG_ID_TYPE(lv2_event_queue_t, 0x8D); // SYS_EVENT_QUEUE_OBJECT
@ -121,6 +95,7 @@ struct lv2_event_port_t
{
const s32 type; // port type, must be SYS_EVENT_PORT_LOCAL
const u64 name; // passed as event source (generated from id and process id if not set)
std::weak_ptr<lv2_event_queue_t> queue; // event queue this port is connected to
lv2_event_port_t(s32 type, u64 name)
@ -137,7 +112,7 @@ class PPUThread;
// SysCalls
s32 sys_event_queue_create(vm::ptr<u32> equeue_id, vm::ptr<sys_event_queue_attribute_t> attr, u64 event_queue_key, s32 size);
s32 sys_event_queue_destroy(u32 equeue_id, s32 mode);
s32 sys_event_queue_receive(PPUThread& CPU, u32 equeue_id, vm::ptr<sys_event_t> dummy_event, u64 timeout);
s32 sys_event_queue_receive(PPUThread& ppu, u32 equeue_id, vm::ptr<sys_event_t> dummy_event, u64 timeout);
s32 sys_event_queue_tryreceive(u32 equeue_id, vm::ptr<sys_event_t> event_array, s32 size, vm::ptr<u32> number);
s32 sys_event_queue_drain(u32 event_queue_id);

View File

@ -55,7 +55,7 @@ u64 get_timebased_time()
LARGE_INTEGER count;
if (!QueryPerformanceCounter(&count))
{
throw EXCEPTION("System error 0x%x", GetLastError());
throw EXCEPTION("Unexpected");
}
const u64 time = count.QuadPart;
@ -82,7 +82,7 @@ u64 get_system_time()
LARGE_INTEGER count;
if (!QueryPerformanceCounter(&count))
{
throw EXCEPTION("System error 0x%x", GetLastError());
throw EXCEPTION("Unexpected");
}
const u64 time = count.QuadPart;
@ -122,7 +122,7 @@ s32 sys_time_get_current_time(vm::ptr<s64> sec, vm::ptr<s64> nsec)
LARGE_INTEGER count;
if (!QueryPerformanceCounter(&count))
{
throw EXCEPTION("System error 0x%x", GetLastError());
throw EXCEPTION("Unexpected");
}
// get time difference in nanoseconds