diff --git a/sys/kern/kern_sig.pas b/sys/kern/kern_sig.pas index dc95fcb8..22d9c5ed 100644 --- a/sys/kern/kern_sig.pas +++ b/sys/kern/kern_sig.pas @@ -1803,9 +1803,11 @@ begin if ((flags and TDF_NEEDRESCHED)<>0) then begin thread_lock(td); + sched_prio(td,td^.td_user_pri); - thread_unlock(td); mi_switch(SW_INVOL or SWT_NEEDRESCHED); + + thread_unlock(td); end; if ((flags and TDF_NEEDSIGCHK)<>0) or @@ -1849,9 +1851,10 @@ begin td^.td_flags:=td^.td_flags and (not TDF_SUSP_CTX); td^.td_state:=TDS_INHIBITED; td^.td_inhibitors:=td^.td_inhibitors or TDI_SUSP_CTX; - thread_unlock(td); mi_switch(SW_VOL or SWT_SUSPEND); + + thread_unlock(td); end else begin thread_unlock(td); diff --git a/sys/kern/kern_synch.pas b/sys/kern/kern_synch.pas index a2e8517b..f9ab44cd 100644 --- a/sys/kern/kern_synch.pas +++ b/sys/kern/kern_synch.pas @@ -159,7 +159,9 @@ var td:p_kthread; begin Result:=0; + td:=curkthread; + thread_lock_assert(td); if (td<>nil) then begin @@ -178,7 +180,17 @@ begin SWT_RELINQUISH, SWT_NEEDRESCHED: begin + if (td<>nil) then + begin + thread_unlock(td); + end; + md_yield; + + if (td<>nil) then + begin + thread_lock(td); + end; end; SWT_SLEEPQ, SWT_SLEEPQTIMO: @@ -210,12 +222,21 @@ begin begin thread_lock(td); if (prio=PRI_USER) then + begin prio:=td^.td_user_pri; + end; if (prio>=0) then + begin sched_prio(td, prio); + end; + end; + + mi_switch(SW_VOL or SWT_RELINQUISH); + + if (td<>nil) then + begin thread_unlock(td); end; - mi_switch(SW_VOL or SWT_RELINQUISH); end; function sys_yield():Integer; @@ -223,15 +244,24 @@ var td:p_kthread; begin td:=curkthread; + if (td<>nil) then begin thread_lock(td); if (PRI_BASE(td^.td_pri_class)=PRI_TIMESHARE) then + begin sched_prio(td, PRI_MAX_TIMESHARE); - thread_unlock(td); + end; td^.td_retval[0]:=0; end; + mi_switch(SW_VOL or SWT_RELINQUISH); + + if (td<>nil) then + begin + thread_unlock(td); + end; + Exit(0); end; diff --git a/sys/kern/kern_thr.pas b/sys/kern/kern_thr.pas index 7e52b122..fb7b3f74 100644 --- a/sys/kern/kern_thr.pas +++ b/sys/kern/kern_thr.pas @@ -9,7 +9,8 @@ uses mqueue, ucontext, signal, - signalvar; + signalvar, + kern_mtx; const TDS_INACTIVE =0; @@ -181,7 +182,7 @@ type td_umtxq :Pointer; //p_umtx_q td_handle :THandle; //nt thread td_teb :p_teb; - td_lock :Pointer; + td_lock :p_mtx; td_tid :QWORD; td_sigstk :stack_t; td_state :Integer; @@ -213,6 +214,7 @@ type td_sleepqueue :Pointer; td_slpq :TAILQ_ENTRY; td_wchan :Pointer; + td_wmesg :PChar; td_sqqueue :Integer; td_intrval :Integer; td_inhibitors :Integer; @@ -224,7 +226,10 @@ type ru_nvcsw :Int64; ru_nivcsw :Int64; end; + // td_slptick :Int64; + td_slpcallout :Pointer; + tdq_lock :mtx; // td_fpop :Pointer; td_map_def_user :Pointer; @@ -320,6 +325,7 @@ function curthread_pflags_set(flags:Integer):Integer; procedure curthread_pflags_restore(save:Integer); procedure curthread_set_pcb_onfault(v:Pointer); +procedure thread_lock_assert(td:p_kthread); external; procedure thread_lock(td:p_kthread); external; procedure thread_unlock(td:p_kthread); external; diff --git a/sys/kern/kern_thread.pas b/sys/kern/kern_thread.pas index 7bad6ab6..ed406dc9 100644 --- a/sys/kern/kern_thread.pas +++ b/sys/kern/kern_thread.pas @@ -40,6 +40,7 @@ function sys_amd64_get_gsbase(base:PPointer):Integer; procedure thread_inc_ref(td:p_kthread); procedure thread_dec_ref(td:p_kthread); +procedure thread_lock_assert(td:p_kthread); procedure thread_lock (td:p_kthread); procedure thread_unlock (td:p_kthread); function tdfind(tid:DWORD):p_kthread; @@ -184,6 +185,9 @@ begin Result:=cpu_thread_alloc(pages); + mtx_init(Result^.tdq_lock,'tdq_lock'); + Result^.td_lock:=@Result^.tdq_lock; + Result^.td_state:=TDS_INACTIVE; Result^.td_lend_user_pri:=PRI_MAX; @@ -193,6 +197,7 @@ end; procedure thread_free(td:p_kthread); begin + mtx_destroy(td^.tdq_lock); sleepq_free(td^.td_sleepqueue); umtx_thread_fini(td); cpu_thread_free(td); @@ -211,14 +216,19 @@ begin end; end; +procedure thread_lock_assert(td:p_kthread); public; +begin + mtx_assert(td^.td_lock^); +end; + procedure thread_lock(td:p_kthread); public; begin - rw_wlock(td^.td_lock); + mtx_lock(td^.td_lock^); end; procedure thread_unlock(td:p_kthread); public; begin - rw_wunlock(td^.td_lock); + mtx_unlock(td^.td_lock^); end; procedure thread_link(td:p_kthread); diff --git a/sys/kern/sched_ule.pas b/sys/kern/sched_ule.pas index ba4b00b7..865fdee1 100644 --- a/sys/kern/sched_ule.pas +++ b/sys/kern/sched_ule.pas @@ -33,12 +33,14 @@ begin if (td<>nil) then begin cpuset_setaffinity(childtd,td^.td_cpuset); - sched_priority(td,td^.td_base_pri); + sched_priority (childtd,td^.td_base_pri); end; end; -procedure sched_class(td:p_kthread;_class:Integer); inline; +procedure sched_class(td:p_kthread;_class:Integer); begin + thread_lock_assert(td); + td^.td_pri_class:=_class; end; @@ -53,8 +55,10 @@ begin sched_priority(td, prio); end; -procedure sched_user_prio(td:p_kthread;prio:Integer); inline; +procedure sched_user_prio(td:p_kthread;prio:Integer); begin + thread_lock_assert(td); + td^.td_base_user_pri:=prio; if (td^.td_lend_user_pri<=prio) then Exit; td^.td_user_pri:=prio; @@ -67,6 +71,8 @@ end; procedure sched_lend_user_prio(td:p_kthread;prio:Integer); begin + thread_lock_assert(td); + td^.td_lend_user_pri:=prio; td^.td_user_pri:=min(prio,td^.td_base_user_pri); if (td^.td_priority>td^.td_user_pri) then @@ -79,6 +85,8 @@ procedure sched_sleep(td:p_kthread;prio:Integer); const PSOCK=87; begin + thread_lock_assert(td); + if TD_IS_SUSPENDED(td) or (prio>=PSOCK) then begin td^.td_flags:=td^.td_flags or TDF_CANSWAP; @@ -91,6 +99,8 @@ end; procedure sched_wakeup(td:p_kthread); begin + thread_lock_assert(td); + td^.td_flags:=td^.td_flags and (not TDF_CANSWAP); TD_SET_RUNNING(td); @@ -102,14 +112,23 @@ function sched_switch(td:p_kthread):Integer; var slptick:Int64; begin + thread_lock_assert(td); + atomic_clear_int(@td^.td_flags,TDF_NEEDRESCHED or TDF_SLICEEND); slptick:=System.InterlockedExchange64(td^.td_slptick,0); + + thread_unlock(td); + Result:=msleep_td(slptick); + + thread_lock(td); end; function setrunnable(td:p_kthread):Integer; begin + thread_lock_assert(td); + Case td^.td_state of TDS_RUNNING, TDS_RUNQ :Exit(0); diff --git a/sys/kern/subr_sleepqueue.pas b/sys/kern/subr_sleepqueue.pas index 78af18c1..9bfcc425 100644 --- a/sys/kern/subr_sleepqueue.pas +++ b/sys/kern/subr_sleepqueue.pas @@ -8,7 +8,6 @@ interface uses mqueue, sys_sleepqueue, - hamt, kern_mtx, kern_thr, rtprio; @@ -23,18 +22,19 @@ const type p_sleepqueue=^sleepqueue; sleepqueue=packed record - sq_blocked :array[0..NR_SLEEPQS-1] of TAILQ_HEAD; + sq_blocked :array[0..NR_SLEEPQS-1] of TAILQ_HEAD; //thread sq_blockedcnt:array[0..NR_SLEEPQS-1] of DWORD; - sq_hash :LIST_ENTRY; - sq_free :LIST_HEAD; //sleepqueue; + sq_hash :LIST_ENTRY; //sleepqueue + sq_free :LIST_HEAD; //sleepqueue; sq_wchan :Pointer; + sq_lock :Pointer; //lock_object sq_type :Integer; end; p_sleepqueue_chain=^sleepqueue_chain; sleepqueue_chain=packed record - sc_hamt:TSTUB_HAMT64; - sc_lock:mtx; + sc_queues:LIST_HEAD; //sleepqueue + sc_lock :mtx; end; function sleepq_alloc:p_sleepqueue; @@ -81,6 +81,7 @@ var begin For i:=0 to SC_MASK do begin + LIST_INIT(@sleepq_chains[i].sc_queues); mtx_init(sleepq_chains[i].sc_lock,'sleepq chain'); end; end; @@ -146,28 +147,25 @@ end; function sleepq_lookup(wchan:Pointer):p_sleepqueue; var sc:p_sleepqueue_chain; - data:PPointer; + sq:p_sleepqueue; begin + Assert(wchan<>nil,'invalid NULL wait channel'); + Result:=nil; sc:=SC_LOOKUP(wchan); - data:=HAMT_search64(@sc^.sc_hamt,QWORD(wchan)); - if (data<>nil) then Result:=data^; -end; -procedure HAMT_INSERT(wchan:Pointer;sq:p_sleepqueue); inline; -var - sc:p_sleepqueue_chain; -begin - sc:=SC_LOOKUP(wchan); - HAMT_insert64(@sc^.sc_hamt,QWORD(wchan),sq); -end; + mtx_assert(sc^.sc_lock); -procedure HAMT_REMOVE(wchan:Pointer); inline; -var - sc:p_sleepqueue_chain; -begin - sc:=SC_LOOKUP(wchan); - HAMT_delete64(@sc^.sc_hamt,QWORD(wchan),nil); + sq:=LIST_FIRST(@sc^.sc_queues); + while (sq<>nil) do + begin + if (sq^.sq_wchan=wchan) then + begin + Exit(sq); + end; + // + sq:=LIST_NEXT(sq,@sq^.sq_hash); + end; end; { @@ -179,10 +177,16 @@ end; procedure sleepq_add(wchan,lock,wmesg:Pointer;flags,queue:Integer); public; var td:p_kthread; + sc:p_sleepqueue_chain; sq:p_sleepqueue; + tq:p_sleepqueue; + i:Integer; begin td:=curkthread; + sc:=SC_LOOKUP(wchan); + mtx_assert(sc^.sc_lock); + Assert(td^.td_sleepqueue<>nil); Assert(wchan<>nil,'invalid nil wait channel'); Assert((queue>=0) and (queue0) then begin td^.td_flags:=td^.td_flags or TDF_SINTR; td^.td_flags:=td^.td_flags and (not TDF_SLEEPABORT); end; + thread_unlock(td); end; @@ -228,14 +250,19 @@ end; procedure sleepq_set_timeout(wchan:Pointer;time:Int64); public; var td:p_kthread; + sc:p_sleepqueue_chain; begin td:=curkthread; + sc:=SC_LOOKUP(wchan); + mtx_assert(sc^.sc_lock); + Assert(TD_ON_SLEEPQ(td)); Assert(td^.td_sleepqueue=nil); Assert(wchan<>nil); td^.td_slptick:=time; + td^.td_slpcallout:=@sleepq_timeout; end; { @@ -271,6 +298,7 @@ begin td:=curkthread; sc:=SC_LOOKUP(wchan); + mtx_assert(sc^.sc_lock); Assert(wchan<>nil); if ((td^.td_pflags and TDP_WAKEUP)<>0) then @@ -341,7 +369,10 @@ begin if TD_ON_SLEEPQ(td) then begin sq:=sleepq_lookup(wchan); - sleepq_resume_thread(sq,td,0); + if (sleepq_resume_thread(sq,td,0)<>0) then + begin + Assert(false,'not waking up swapper'); + end; end; //sleepq_release @@ -350,6 +381,17 @@ begin Assert(td^.td_lock<>@sc^.sc_lock); end; +procedure thread_lock_set(td:p_kthread;new:p_mtx); +var + lock:p_mtx; +begin + mtx_assert(new^); + thread_lock_assert(td); + lock:=td^.td_lock; + td^.td_lock:=new; + mtx_unlock(lock^); +end; + { * Switches to another thread if we are still asleep on a sleep queue. * Returns with thread lock. @@ -362,7 +404,10 @@ var r:Integer; begin td:=curkthread; + sc:=SC_LOOKUP(wchan); + mtx_assert(sc^.sc_lock); + thread_lock_assert(td); if (td^.td_sleepqueue<>nil) then begin @@ -375,7 +420,10 @@ begin begin Assert(TD_ON_SLEEPQ(td)); sq:=sleepq_lookup(wchan); - sleepq_resume_thread(sq,td,0); + if (sleepq_resume_thread(sq,td,0)<>0) then + begin + Assert(false,'not waking up swapper'); + end; //sleepq_release mtx_unlock(sc^.sc_lock); Exit; @@ -383,23 +431,23 @@ begin Assert(td^.td_sleepqueue=nil); sched_sleep(td,pri); - TD_SET_SLEEPING(td); //unlock before wait - thread_unlock(td); // + thread_lock_set(td,@sc^.sc_lock); - //sleepq_release - mtx_unlock(sc^.sc_lock); // + TD_SET_SLEEPING(td); r:=mi_switch(SW_VOL or SWT_SLEEPQ); - //if (r=ETIMEDOUT) then - //begin + if (td^.td_slpcallout=Pointer(@sleepq_timeout)) then + begin sleepq_timeout(td); - //end; + end; + td^.td_slpcallout:=nil; //lock after wait - thread_lock(td); // + mtx_lock(td^.tdq_lock); + thread_lock_set(td,@td^.tdq_lock); Assert(TD_IS_RUNNING(td),'running but not TDS_RUNNING'); end; @@ -412,7 +460,9 @@ var td:p_kthread; begin Result:=0; + td:=curkthread; + thread_lock_assert(td); if ((td^.td_flags and TDF_TIMEOUT)<>0) then begin @@ -435,6 +485,7 @@ var td:p_kthread; begin td:=curkthread; + thread_lock_assert(td); if ((td^.td_flags and TDF_SINTR)<>0) then begin @@ -547,6 +598,9 @@ end; * runnable. } function sleepq_resume_thread(sq:p_sleepqueue;td:p_kthread;pri:Integer):Integer; public; +var + sc:p_sleepqueue_chain; + tq:p_sleepqueue; begin Result:=0; @@ -555,27 +609,27 @@ begin Assert(td^.td_wchan=sq^.sq_wchan); Assert((td^.td_sqqueue=0)); + thread_lock_assert(td); + + sc:=SC_LOOKUP(sq^.sq_wchan); + mtx_assert(sc^.sc_lock); + Dec(sq^.sq_blockedcnt[td^.td_sqqueue]); TAILQ_REMOVE(@sq^.sq_blocked[td^.td_sqqueue],td,@td^.td_slpq); if (LIST_EMPTY(@sq^.sq_free)) then begin td^.td_sleepqueue:=sq; + sq^.sq_wchan:=nil; end else begin td^.td_sleepqueue:=LIST_FIRST(@sq^.sq_free); end; - sq:=td^.td_sleepqueue; - - if ((sq^.sq_type and SLEEPQ_HAMT)<>0) then - begin - HAMT_REMOVE(sq^.sq_wchan); - end else - begin - LIST_REMOVE(sq,@sq^.sq_hash); - end; + tq:=td^.td_sleepqueue; + LIST_REMOVE(tq,@tq^.sq_hash); + td^.td_wmesg:=nil; td^.td_wchan:=nil; td^.td_flags:=td^.td_flags and (not TDF_SINTR); @@ -698,6 +752,7 @@ end; procedure sleepq_timeout(arg:Pointer); public; var td:p_kthread; + sc:p_sleepqueue_chain; sq:p_sleepqueue; wchan:Pointer; begin @@ -708,6 +763,10 @@ begin begin wchan:=td^.td_wchan; sq:=sleepq_lookup(wchan); + + sc:=SC_LOOKUP(wchan); + mtx_assert(sc^.sc_lock); + Assert(sq<>nil); td^.td_flags:=td^.td_flags or TDF_TIMEOUT; sleepq_resume_thread(sq,td,0); @@ -772,6 +831,8 @@ var sq:p_sleepqueue; wchan:Pointer; begin + thread_lock_assert(td); + Assert(TD_ON_SLEEPQ(td)); Assert((td^.td_flags and TDF_SINTR)<>0); Assert((intrval=EINTR) or (intrval=ERESTART)); @@ -791,6 +852,7 @@ begin wchan:=td^.td_wchan; Assert(wchan<>nil); + sq:=sleepq_lookup(wchan); Assert(sq<>nil); diff --git a/sys/sys_sleepqueue.pas b/sys/sys_sleepqueue.pas index d7a5e8a9..5d2c7e82 100644 --- a/sys/sys_sleepqueue.pas +++ b/sys/sys_sleepqueue.pas @@ -17,7 +17,6 @@ const SLEEPQ_LK =$04; // Used by a lockmgr. SLEEPQ_INTERRUPTIBLE=$100; // Sleep is interruptible. SLEEPQ_STOP_ON_BDRY =$200; // Stop sleeping thread - SLEEPQ_HAMT =$400; function sleepq_alloc:Pointer; external; procedure sleepq_free(sq:Pointer); external;