This commit is contained in:
Pavel 2024-04-07 00:13:11 +03:00
parent 265116c46b
commit 5669ce29f8
7 changed files with 187 additions and 58 deletions

View File

@ -1803,9 +1803,11 @@ begin
if ((flags and TDF_NEEDRESCHED)<>0) then
begin
thread_lock(td);
sched_prio(td,td^.td_user_pri);
thread_unlock(td);
mi_switch(SW_INVOL or SWT_NEEDRESCHED);
thread_unlock(td);
end;
if ((flags and TDF_NEEDSIGCHK)<>0) or
@ -1849,9 +1851,10 @@ begin
td^.td_flags:=td^.td_flags and (not TDF_SUSP_CTX);
td^.td_state:=TDS_INHIBITED;
td^.td_inhibitors:=td^.td_inhibitors or TDI_SUSP_CTX;
thread_unlock(td);
mi_switch(SW_VOL or SWT_SUSPEND);
thread_unlock(td);
end else
begin
thread_unlock(td);

View File

@ -159,7 +159,9 @@ var
td:p_kthread;
begin
Result:=0;
td:=curkthread;
thread_lock_assert(td);
if (td<>nil) then
begin
@ -178,7 +180,17 @@ begin
SWT_RELINQUISH,
SWT_NEEDRESCHED:
begin
if (td<>nil) then
begin
thread_unlock(td);
end;
md_yield;
if (td<>nil) then
begin
thread_lock(td);
end;
end;
SWT_SLEEPQ,
SWT_SLEEPQTIMO:
@ -210,12 +222,21 @@ begin
begin
thread_lock(td);
if (prio=PRI_USER) then
begin
prio:=td^.td_user_pri;
end;
if (prio>=0) then
begin
sched_prio(td, prio);
end;
end;
mi_switch(SW_VOL or SWT_RELINQUISH);
if (td<>nil) then
begin
thread_unlock(td);
end;
mi_switch(SW_VOL or SWT_RELINQUISH);
end;
function sys_yield():Integer;
@ -223,15 +244,24 @@ var
td:p_kthread;
begin
td:=curkthread;
if (td<>nil) then
begin
thread_lock(td);
if (PRI_BASE(td^.td_pri_class)=PRI_TIMESHARE) then
begin
sched_prio(td, PRI_MAX_TIMESHARE);
thread_unlock(td);
end;
td^.td_retval[0]:=0;
end;
mi_switch(SW_VOL or SWT_RELINQUISH);
if (td<>nil) then
begin
thread_unlock(td);
end;
Exit(0);
end;

View File

@ -9,7 +9,8 @@ uses
mqueue,
ucontext,
signal,
signalvar;
signalvar,
kern_mtx;
const
TDS_INACTIVE =0;
@ -181,7 +182,7 @@ type
td_umtxq :Pointer; //p_umtx_q
td_handle :THandle; //nt thread
td_teb :p_teb;
td_lock :Pointer;
td_lock :p_mtx;
td_tid :QWORD;
td_sigstk :stack_t;
td_state :Integer;
@ -213,6 +214,7 @@ type
td_sleepqueue :Pointer;
td_slpq :TAILQ_ENTRY;
td_wchan :Pointer;
td_wmesg :PChar;
td_sqqueue :Integer;
td_intrval :Integer;
td_inhibitors :Integer;
@ -224,7 +226,10 @@ type
ru_nvcsw :Int64;
ru_nivcsw :Int64;
end;
//
td_slptick :Int64;
td_slpcallout :Pointer;
tdq_lock :mtx;
//
td_fpop :Pointer;
td_map_def_user :Pointer;
@ -320,6 +325,7 @@ function curthread_pflags_set(flags:Integer):Integer;
procedure curthread_pflags_restore(save:Integer);
procedure curthread_set_pcb_onfault(v:Pointer);
procedure thread_lock_assert(td:p_kthread); external;
procedure thread_lock(td:p_kthread); external;
procedure thread_unlock(td:p_kthread); external;

View File

@ -40,6 +40,7 @@ function sys_amd64_get_gsbase(base:PPointer):Integer;
procedure thread_inc_ref(td:p_kthread);
procedure thread_dec_ref(td:p_kthread);
procedure thread_lock_assert(td:p_kthread);
procedure thread_lock (td:p_kthread);
procedure thread_unlock (td:p_kthread);
function tdfind(tid:DWORD):p_kthread;
@ -184,6 +185,9 @@ begin
Result:=cpu_thread_alloc(pages);
mtx_init(Result^.tdq_lock,'tdq_lock');
Result^.td_lock:=@Result^.tdq_lock;
Result^.td_state:=TDS_INACTIVE;
Result^.td_lend_user_pri:=PRI_MAX;
@ -193,6 +197,7 @@ end;
procedure thread_free(td:p_kthread);
begin
mtx_destroy(td^.tdq_lock);
sleepq_free(td^.td_sleepqueue);
umtx_thread_fini(td);
cpu_thread_free(td);
@ -211,14 +216,19 @@ begin
end;
end;
procedure thread_lock_assert(td:p_kthread); public;
begin
mtx_assert(td^.td_lock^);
end;
procedure thread_lock(td:p_kthread); public;
begin
rw_wlock(td^.td_lock);
mtx_lock(td^.td_lock^);
end;
procedure thread_unlock(td:p_kthread); public;
begin
rw_wunlock(td^.td_lock);
mtx_unlock(td^.td_lock^);
end;
procedure thread_link(td:p_kthread);

View File

@ -33,12 +33,14 @@ begin
if (td<>nil) then
begin
cpuset_setaffinity(childtd,td^.td_cpuset);
sched_priority(td,td^.td_base_pri);
sched_priority (childtd,td^.td_base_pri);
end;
end;
procedure sched_class(td:p_kthread;_class:Integer); inline;
procedure sched_class(td:p_kthread;_class:Integer);
begin
thread_lock_assert(td);
td^.td_pri_class:=_class;
end;
@ -53,8 +55,10 @@ begin
sched_priority(td, prio);
end;
procedure sched_user_prio(td:p_kthread;prio:Integer); inline;
procedure sched_user_prio(td:p_kthread;prio:Integer);
begin
thread_lock_assert(td);
td^.td_base_user_pri:=prio;
if (td^.td_lend_user_pri<=prio) then Exit;
td^.td_user_pri:=prio;
@ -67,6 +71,8 @@ end;
procedure sched_lend_user_prio(td:p_kthread;prio:Integer);
begin
thread_lock_assert(td);
td^.td_lend_user_pri:=prio;
td^.td_user_pri:=min(prio,td^.td_base_user_pri);
if (td^.td_priority>td^.td_user_pri) then
@ -79,6 +85,8 @@ procedure sched_sleep(td:p_kthread;prio:Integer);
const
PSOCK=87;
begin
thread_lock_assert(td);
if TD_IS_SUSPENDED(td) or (prio>=PSOCK) then
begin
td^.td_flags:=td^.td_flags or TDF_CANSWAP;
@ -91,6 +99,8 @@ end;
procedure sched_wakeup(td:p_kthread);
begin
thread_lock_assert(td);
td^.td_flags:=td^.td_flags and (not TDF_CANSWAP);
TD_SET_RUNNING(td);
@ -102,14 +112,23 @@ function sched_switch(td:p_kthread):Integer;
var
slptick:Int64;
begin
thread_lock_assert(td);
atomic_clear_int(@td^.td_flags,TDF_NEEDRESCHED or TDF_SLICEEND);
slptick:=System.InterlockedExchange64(td^.td_slptick,0);
thread_unlock(td);
Result:=msleep_td(slptick);
thread_lock(td);
end;
function setrunnable(td:p_kthread):Integer;
begin
thread_lock_assert(td);
Case td^.td_state of
TDS_RUNNING,
TDS_RUNQ :Exit(0);

View File

@ -8,7 +8,6 @@ interface
uses
mqueue,
sys_sleepqueue,
hamt,
kern_mtx,
kern_thr,
rtprio;
@ -23,18 +22,19 @@ const
type
p_sleepqueue=^sleepqueue;
sleepqueue=packed record
sq_blocked :array[0..NR_SLEEPQS-1] of TAILQ_HEAD;
sq_blocked :array[0..NR_SLEEPQS-1] of TAILQ_HEAD; //thread
sq_blockedcnt:array[0..NR_SLEEPQS-1] of DWORD;
sq_hash :LIST_ENTRY;
sq_free :LIST_HEAD; //sleepqueue;
sq_hash :LIST_ENTRY; //sleepqueue
sq_free :LIST_HEAD; //sleepqueue;
sq_wchan :Pointer;
sq_lock :Pointer; //lock_object
sq_type :Integer;
end;
p_sleepqueue_chain=^sleepqueue_chain;
sleepqueue_chain=packed record
sc_hamt:TSTUB_HAMT64;
sc_lock:mtx;
sc_queues:LIST_HEAD; //sleepqueue
sc_lock :mtx;
end;
function sleepq_alloc:p_sleepqueue;
@ -81,6 +81,7 @@ var
begin
For i:=0 to SC_MASK do
begin
LIST_INIT(@sleepq_chains[i].sc_queues);
mtx_init(sleepq_chains[i].sc_lock,'sleepq chain');
end;
end;
@ -146,28 +147,25 @@ end;
function sleepq_lookup(wchan:Pointer):p_sleepqueue;
var
sc:p_sleepqueue_chain;
data:PPointer;
sq:p_sleepqueue;
begin
Assert(wchan<>nil,'invalid NULL wait channel');
Result:=nil;
sc:=SC_LOOKUP(wchan);
data:=HAMT_search64(@sc^.sc_hamt,QWORD(wchan));
if (data<>nil) then Result:=data^;
end;
procedure HAMT_INSERT(wchan:Pointer;sq:p_sleepqueue); inline;
var
sc:p_sleepqueue_chain;
begin
sc:=SC_LOOKUP(wchan);
HAMT_insert64(@sc^.sc_hamt,QWORD(wchan),sq);
end;
mtx_assert(sc^.sc_lock);
procedure HAMT_REMOVE(wchan:Pointer); inline;
var
sc:p_sleepqueue_chain;
begin
sc:=SC_LOOKUP(wchan);
HAMT_delete64(@sc^.sc_hamt,QWORD(wchan),nil);
sq:=LIST_FIRST(@sc^.sc_queues);
while (sq<>nil) do
begin
if (sq^.sq_wchan=wchan) then
begin
Exit(sq);
end;
//
sq:=LIST_NEXT(sq,@sq^.sq_hash);
end;
end;
{
@ -179,10 +177,16 @@ end;
procedure sleepq_add(wchan,lock,wmesg:Pointer;flags,queue:Integer); public;
var
td:p_kthread;
sc:p_sleepqueue_chain;
sq:p_sleepqueue;
tq:p_sleepqueue;
i:Integer;
begin
td:=curkthread;
sc:=SC_LOOKUP(wchan);
mtx_assert(sc^.sc_lock);
Assert(td^.td_sleepqueue<>nil);
Assert(wchan<>nil,'invalid nil wait channel');
Assert((queue>=0) and (queue<NR_SLEEPQS));
@ -194,30 +198,48 @@ begin
if (sq=nil) then
begin
sq:=td^.td_sleepqueue;
For i:=0 to NR_SLEEPQS-1 do
begin
Assert(TAILQ_EMPTY(@sq^.sq_blocked[i]),'threads sleep queue %d is not empty');
Assert(sq^.sq_blockedcnt[i]=0 ,'threads sleep queue %d count mismatches');
end;
Assert(LIST_EMPTY(@sq^.sq_free),'threads sleep queue has a non-empty free list');
Assert(sq^.sq_wchan=nil ,'stale sq_wchan pointer');
sq^.sq_lock:=lock;
sq:=td^.td_sleepqueue;
LIST_INSERT_HEAD(@sc^.sc_queues, sq, @sq^.sq_hash);
sq^.sq_wchan:=wchan;
sq^.sq_type :=(flags and SLEEPQ_TYPE) or SLEEPQ_HAMT;
HAMT_INSERT(wchan,sq);
sq^.sq_type :=flags and SLEEPQ_TYPE;
end else
begin
Assert(wchan=sq^.sq_wchan);
Assert(lock =sq^.sq_lock);
Assert((flags and SLEEPQ_TYPE)=(sq^.sq_type and SLEEPQ_TYPE));
sq:=td^.td_sleepqueue;
LIST_INSERT_HEAD(@sq^.sq_free,sq,@sq^.sq_hash);
tq:=td^.td_sleepqueue;
LIST_INSERT_HEAD(@sq^.sq_free,tq,@tq^.sq_hash);
end;
thread_lock(td);
TAILQ_INSERT_TAIL(@sq^.sq_blocked[queue],td,@td^.td_slpq);
Inc(sq^.sq_blockedcnt[queue]);
td^.td_sleepqueue:=nil;
td^.td_sqqueue :=queue;
td^.td_wchan :=wchan;
td^.td_wmesg :=wmesg;
if ((flags and SLEEPQ_INTERRUPTIBLE)<>0) then
begin
td^.td_flags:=td^.td_flags or TDF_SINTR;
td^.td_flags:=td^.td_flags and (not TDF_SLEEPABORT);
end;
thread_unlock(td);
end;
@ -228,14 +250,19 @@ end;
procedure sleepq_set_timeout(wchan:Pointer;time:Int64); public;
var
td:p_kthread;
sc:p_sleepqueue_chain;
begin
td:=curkthread;
sc:=SC_LOOKUP(wchan);
mtx_assert(sc^.sc_lock);
Assert(TD_ON_SLEEPQ(td));
Assert(td^.td_sleepqueue=nil);
Assert(wchan<>nil);
td^.td_slptick:=time;
td^.td_slpcallout:=@sleepq_timeout;
end;
{
@ -271,6 +298,7 @@ begin
td:=curkthread;
sc:=SC_LOOKUP(wchan);
mtx_assert(sc^.sc_lock);
Assert(wchan<>nil);
if ((td^.td_pflags and TDP_WAKEUP)<>0) then
@ -341,7 +369,10 @@ begin
if TD_ON_SLEEPQ(td) then
begin
sq:=sleepq_lookup(wchan);
sleepq_resume_thread(sq,td,0);
if (sleepq_resume_thread(sq,td,0)<>0) then
begin
Assert(false,'not waking up swapper');
end;
end;
//sleepq_release
@ -350,6 +381,17 @@ begin
Assert(td^.td_lock<>@sc^.sc_lock);
end;
procedure thread_lock_set(td:p_kthread;new:p_mtx);
var
lock:p_mtx;
begin
mtx_assert(new^);
thread_lock_assert(td);
lock:=td^.td_lock;
td^.td_lock:=new;
mtx_unlock(lock^);
end;
{
* Switches to another thread if we are still asleep on a sleep queue.
* Returns with thread lock.
@ -362,7 +404,10 @@ var
r:Integer;
begin
td:=curkthread;
sc:=SC_LOOKUP(wchan);
mtx_assert(sc^.sc_lock);
thread_lock_assert(td);
if (td^.td_sleepqueue<>nil) then
begin
@ -375,7 +420,10 @@ begin
begin
Assert(TD_ON_SLEEPQ(td));
sq:=sleepq_lookup(wchan);
sleepq_resume_thread(sq,td,0);
if (sleepq_resume_thread(sq,td,0)<>0) then
begin
Assert(false,'not waking up swapper');
end;
//sleepq_release
mtx_unlock(sc^.sc_lock);
Exit;
@ -383,23 +431,23 @@ begin
Assert(td^.td_sleepqueue=nil);
sched_sleep(td,pri);
TD_SET_SLEEPING(td);
//unlock before wait
thread_unlock(td); //
thread_lock_set(td,@sc^.sc_lock);
//sleepq_release
mtx_unlock(sc^.sc_lock); //
TD_SET_SLEEPING(td);
r:=mi_switch(SW_VOL or SWT_SLEEPQ);
//if (r=ETIMEDOUT) then
//begin
if (td^.td_slpcallout=Pointer(@sleepq_timeout)) then
begin
sleepq_timeout(td);
//end;
end;
td^.td_slpcallout:=nil;
//lock after wait
thread_lock(td); //
mtx_lock(td^.tdq_lock);
thread_lock_set(td,@td^.tdq_lock);
Assert(TD_IS_RUNNING(td),'running but not TDS_RUNNING');
end;
@ -412,7 +460,9 @@ var
td:p_kthread;
begin
Result:=0;
td:=curkthread;
thread_lock_assert(td);
if ((td^.td_flags and TDF_TIMEOUT)<>0) then
begin
@ -435,6 +485,7 @@ var
td:p_kthread;
begin
td:=curkthread;
thread_lock_assert(td);
if ((td^.td_flags and TDF_SINTR)<>0) then
begin
@ -547,6 +598,9 @@ end;
* runnable.
}
function sleepq_resume_thread(sq:p_sleepqueue;td:p_kthread;pri:Integer):Integer; public;
var
sc:p_sleepqueue_chain;
tq:p_sleepqueue;
begin
Result:=0;
@ -555,27 +609,27 @@ begin
Assert(td^.td_wchan=sq^.sq_wchan);
Assert((td^.td_sqqueue<NR_SLEEPQS) and (td^.td_sqqueue>=0));
thread_lock_assert(td);
sc:=SC_LOOKUP(sq^.sq_wchan);
mtx_assert(sc^.sc_lock);
Dec(sq^.sq_blockedcnt[td^.td_sqqueue]);
TAILQ_REMOVE(@sq^.sq_blocked[td^.td_sqqueue],td,@td^.td_slpq);
if (LIST_EMPTY(@sq^.sq_free)) then
begin
td^.td_sleepqueue:=sq;
sq^.sq_wchan:=nil;
end else
begin
td^.td_sleepqueue:=LIST_FIRST(@sq^.sq_free);
end;
sq:=td^.td_sleepqueue;
if ((sq^.sq_type and SLEEPQ_HAMT)<>0) then
begin
HAMT_REMOVE(sq^.sq_wchan);
end else
begin
LIST_REMOVE(sq,@sq^.sq_hash);
end;
tq:=td^.td_sleepqueue;
LIST_REMOVE(tq,@tq^.sq_hash);
td^.td_wmesg:=nil;
td^.td_wchan:=nil;
td^.td_flags:=td^.td_flags and (not TDF_SINTR);
@ -698,6 +752,7 @@ end;
procedure sleepq_timeout(arg:Pointer); public;
var
td:p_kthread;
sc:p_sleepqueue_chain;
sq:p_sleepqueue;
wchan:Pointer;
begin
@ -708,6 +763,10 @@ begin
begin
wchan:=td^.td_wchan;
sq:=sleepq_lookup(wchan);
sc:=SC_LOOKUP(wchan);
mtx_assert(sc^.sc_lock);
Assert(sq<>nil);
td^.td_flags:=td^.td_flags or TDF_TIMEOUT;
sleepq_resume_thread(sq,td,0);
@ -772,6 +831,8 @@ var
sq:p_sleepqueue;
wchan:Pointer;
begin
thread_lock_assert(td);
Assert(TD_ON_SLEEPQ(td));
Assert((td^.td_flags and TDF_SINTR)<>0);
Assert((intrval=EINTR) or (intrval=ERESTART));
@ -791,6 +852,7 @@ begin
wchan:=td^.td_wchan;
Assert(wchan<>nil);
sq:=sleepq_lookup(wchan);
Assert(sq<>nil);

View File

@ -17,7 +17,6 @@ const
SLEEPQ_LK =$04; // Used by a lockmgr.
SLEEPQ_INTERRUPTIBLE=$100; // Sleep is interruptible.
SLEEPQ_STOP_ON_BDRY =$200; // Stop sleeping thread
SLEEPQ_HAMT =$400;
function sleepq_alloc:Pointer; external;
procedure sleepq_free(sq:Pointer); external;