Skip to content

Commit

Permalink
Kludge: disable PM debugging for JHS
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryHRich committed Dec 26, 2024
1 parent c7cdf77 commit 6afe77c
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion jsrc/cip.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ I cachedmmult(J jt,D* av,D* wv,D* zv,I m,I n,I p,I flgs){
R blockedmmult(jt,av,wv,zv,m,n,p,p,flgs); }
UI nfulltasks, nremnant, tailtasks, endtasksize, fulltasksize;
// big problem, split into tasks. The tasks may be either cached or blocked
UI nthreads=__atomic_load_n(&(*JT(jt,jobqueue))[0].nthreads,__ATOMIC_ACQUIRE)+(jt->threadpoolno!=0); // get # running threads, just once so we have a consistent view. We count our thread too, if it's not in pool 0, since it runs tasks for the job
UI nthreads=__atomic_load_n(&(*JT(jt,jobqueues))[0].nthreads,__ATOMIC_ACQUIRE)+(jt->threadpoolno!=0); // get # running threads, just once so we have a consistent view. We count our thread too, if it's not in pool 0, since it runs tasks for the job
UI ncache=(m+CACHEHEIGHT-1)>>CACHEHEIGHTX; // number of cacheblocks of a, including remnant
// The cached algorithm works on sections that are a multiple of CACHEHEIGHT high, up to MAXAROWS. For big arguments, we spread the CACHEHEIGHT sections through the threads as evenly as possible.
// for modest arguments, breaking into CACHEHEIGHT blocks may not allow use of all threads. In that case, use more threads (each of which will use the blocking algorithm)
Expand Down
16 changes: 8 additions & 8 deletions jsrc/ct.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ static void *jtthreadmain(void *arg){J jt=arg;I dummy;
__atomic_store_n(&jt->cstackinit,(UI)&dummy,__ATOMIC_RELEASE); // use a local as a surrogate for the stack pointer
__atomic_store_n(&jt->cstackmin,jt->cstackinit-(CSTACKSIZE-CSTACKRESERVE),__ATOMIC_RELEASE); // use a local as a surrogate for the stack pointer
// Note: we use cstackmin as an indication that this thread is ready to use.
JOBQ *jobq=&(*JT(jt,jobqueue))[jt->threadpoolno]; // The jobq block for the threadpool we are in - never changes
JOBQ *jobq=&(*JT(jt,jobqueues))[jt->threadpoolno]; // The jobq block for the threadpool we are in - never changes

// loop forever executing tasks. First time through, the thread-creation code holds the job lock until the initialization finishes
nexttask: ;
Expand Down Expand Up @@ -547,7 +547,7 @@ static A jttaskrun(J jt,A arg1, A arg2, A arg3){A pyx;
I dyad=!(AT(arg2)&VERB); A self=dyad?arg3:arg2; // the call is either noun self x or noun noun self. See which, select self. dyad is 0 or 1
// extract parms given to t.: threadpool number, worker-only flag
UI forcetask=((FAV(self)->localuse.lu1.forcetask>>8)&1)-1; // 0 if the user wants to force this job to queue, ~0 otherwise
JOBQ *jobq=&(*JT(jt,jobqueue))[FAV(self)->localuse.lu1.forcetask&0xff]; // bits 0-7 = threadpool number to use
JOBQ *jobq=&(*JT(jt,jobqueues))[FAV(self)->localuse.lu1.forcetask&0xff]; // bits 0-7 = threadpool number to use
if((((I)(forcetask&lda(&jobq->nuunfin))-jobq->nthreads)&(lda(&JT(jt,systemlock))-3))<0){ // more workers than unfinished jobs (ignoring # unfinished if forcetask was requested) - fast look
// in suspension (systemlock state>2) we do not start any task anywhere
// we would like to avoid realizing virtual arguments, so that the copy will be done into the core that needs the data. However, if we leave the block as virtual,
Expand Down Expand Up @@ -591,7 +591,7 @@ static A jttaskrun(J jt,A arg1, A arg2, A arg3){A pyx;
// execute an internal job made up of n tasks. f is the function to run, end is the function to call at end, ctx is parms to pass to each task
// poolno is the threadpool to use. Tasks are run on this thread and the worker threads
// Result is 0 for OK, else jerr.h error code
C jtjobrun(J jt,unsigned char(*f)(J,void*,UI4),void *ctx,UI4 n,I poolno){JOBQ *jobq=&(*JT(jt,jobqueue))[poolno];
C jtjobrun(J jt,unsigned char(*f)(J,void*,UI4),void *ctx,UI4 n,I poolno){JOBQ *jobq=&(*JT(jt,jobqueues))[poolno];
A jobA;GAT0(jobA,INT,(sizeof(JOB)+SZI-1)>>LGSZI,1); ACINITUNPUSH(jobA); // we could allocate this (aligned) on the stack, since we wait here for all tasks to finish. Must never really free!
JOB *job=(JOB*)AAV1(jobA); job->n=n; job->ns=1; job->initthread=THREADID(jt); job->internal.f=f; job->internal.ctx=ctx; job->internal.nf=0; job->internal.err=0; // by hand: allocation is short. ns=1 because we take the first task in this thread
I lastqueuedtask=-1; // if nonneg, the task# of the last task (i. e. n-1). If this task is taken here we have to leave it in the queue
Expand Down Expand Up @@ -788,7 +788,7 @@ ASSERT(0,EVNONCE)
ASSERT(AR(w)<=1,EVRANK) ASSERT(AN(w)<=1,EVLENGTH) // must be singleton
RZ(w=vi(w)) poolno=IAV(w)[0]; ASSERT(BETWEENO(poolno,0,MAXTHREADPOOLS),EVLIMIT) // extract threadpool# and audit it
}
JOBQ *jobq=&(*JT(jt,jobqueue))[poolno];
JOBQ *jobq=&(*JT(jt,jobqueues))[poolno];
GAT0(z,INT,3,1) // allocate result
JOB *oldjob=JOBLOCK(jobq); // lock the jobq to present a consistent picture
IAV1(z)[0]=jobq->waiters, IAV1(z)[1]=jobq->nuunfin, IAV1(z)[2]=jobq->nthreads; // don't allocate under lock
Expand All @@ -807,7 +807,7 @@ ASSERT(0,EVNONCE)
ASSERT(AR(w)==1,EVRANK) ASSERT(AN(w)==2,EVLENGTH) // arg is threadpool# keepwarm
if(AT(w)!=FL)RZ(w=ccvt(FL,w,0)); // make arg float type
D dpoolno=DAV(w)[0]; I poolno=(I)dpoolno; ASSERT((D)poolno==dpoolno,EVDOMAIN) ASSERT(BETWEENO(poolno,0,MAXTHREADPOOLS),EVLIMIT) // extract threadpool# and audit it
JOBQ *jobq=&(*JT(jt,jobqueue))[poolno];
JOBQ *jobq=&(*JT(jt,jobqueues))[poolno];
D oldval=jobq->keepwarmns*1e-9;
D kwtime=DAV(w)[1]; ASSERT(kwtime>=0,EVDOMAIN); if(unlikely(kwtime>MAXLINGER))kwtime=MAXLINGER; I kwtimens=(I)(kwtime*1000000000); // limit time and convert to ns
jobq->keepwarmns=kwtimens; // store new value
Expand All @@ -823,7 +823,7 @@ ASSERT(0,EVNONCE)
ASSERT(AR(w)<=1,EVRANK) ASSERT(AN(w)<=1,EVLENGTH) // must be singleton
RZ(w=vi(w)) poolno=IAV(w)[0]; ASSERT(BETWEENO(poolno,0,MAXTHREADPOOLS),EVLIMIT) // extract threadpool# and audit it
}
JOBQ *jobq=&(*JT(jt,jobqueue))[poolno];
JOBQ *jobq=&(*JT(jt,jobqueues))[poolno];
JOB *job=JOBLOCK(jobq); // must change status under lock for the threadpool
++jobq->futex; // while under lock, advance futex value to indicate that we have added work, so that if a waiter finishes its keepwarm it will start another one
JOBUNLOCK(jobq,job); // We don't add a job - we just kick all the threads
Expand Down Expand Up @@ -875,7 +875,7 @@ ASSERT(0,EVNONCE)
YIELD // let other threads run while we wait for the on-deck thread to terminate
}
// we have a lock on the overall thread info; and resthread, the slot we want to fill, is idle. keep the lock while we fill it. systemlock will not count threads until we have finished adding and starting the new one
JOBQ *jobq=&(*JT(jt,jobqueue))[poolno];
JOBQ *jobq=&(*JT(jt,jobqueues))[poolno];
ASSERTSUFF(jobq->nthreads<MAXTHREADSINPOOL,EVLIMIT,WRITEUNLOCK(JT(jt,flock)); R 0;); // error if threadpool limit exceeded. OK to CHECK outside of job lock
// We also have to lock the threadpool before changing nthreads, because jobq->nthreads is used to decide whether to start a job
JOB *job=JOBLOCK(jobq); // must modify thread info under lock on the threadpool
Expand Down Expand Up @@ -1001,7 +1001,7 @@ ASSERT(0,EVNONCE)
}
resthread=THREADIDFORWORKER(resthread); // convert worker# to thread#
ASSERTSUFF(resthread>=1,EVLIMIT,WRITEUNLOCK(JT(jt,flock)); R 0;); // error if no thread to delete
jobq=&(*JT(jt,jobqueue))[JTFORTHREAD(jt,resthread)->threadpoolno];
jobq=&(*JT(jt,jobqueues))[JTFORTHREAD(jt,resthread)->threadpoolno];
job=JOBLOCK(jobq); // must change status under lock for the threadpool
if(job==0||jobq->nthreads>1)break; // normal continuation: not last thread in a busy pool. Wait for that
JOBUNLOCK(jobq,job); // We don't add a job - we just kick all the threads
Expand Down
4 changes: 3 additions & 1 deletion jsrc/cx.c
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ bodyend: ; // we branch to here to exit with z set to result
// If, while debug is off, we hit an error in the master thread that is not going to be intercepted, add a debug frame for the private-namespace chain and leave the freeing for later
// We don't do this if jt->jerr is clear: that's the special result for coming out of debug; or when WSFULL, since there may be no memory, or if EXIT/DEBUGEND/FOLDTHROW which isn't really an error.
// Also, suppress pmdebug if an immex phrase is running or has been requested, because those would be confusing and also they call tpop
if(jt->jerr && jt->jerr!=EVWSFULL && ((jt->jerr&~0x60)!=EVEXIT) && !(jt->uflags.trace&TRACEDB1) && THREADID(jt)==0 && !(jt->emsgstate&EMSGSTATETRAPPING) && jt->iepdo==0){
// Also, disable the pmdebug feature if JHS is running. pmdebug freezes the tpop stack and expects nothing to happen until the user gives the next sentence. JHS however uses J calls for input & output to console.
// This causes problems (to wit, the test below for _ttop==jt->tnextpushp gives unreliable results) and the easiest temp kludge is to block it
if(jt->jerr && jt->jerr!=EVWSFULL && ((jt->jerr&~0x60)!=EVEXIT) && !(jt->uflags.trace&TRACEDB1) && THREADID(jt)==0 && !(jt->emsgstate&EMSGSTATETRAPPING) && jt->iepdo==0 && !JT(jt,nfe)){
// if there are any UNINCORPABLE values, they must be realized in case they are on the C stack that we are about to pop over. Only x and y are possible
UI4 yxbucks = *(UI4*)LXAV0(locsym); L *sympv=SYMORIGIN; if(a==0)yxbucks&=0xffff; if(w==0)yxbucks&=-0x10000; // get bucket indexes & addr of symbols. Mark which buckets are valid
// For each of [xy], reassign any UNINCORPABLE value to ensure it is realized and recursive. If error, the name will lose its value; that's OK. Must not take error exit!
Expand Down
6 changes: 3 additions & 3 deletions jsrc/i.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ static C jtjinit3(JS jjt){S t;JJ jt=MTHREAD(jjt);
#endif
INITJT(jjt,tssbase)=tod(); // starting time for all threads
#if PYXES
INITJT(jjt,jobqueue)=aligned_malloc(sizeof(JOBQ[MAXTHREADPOOLS]),CACHELINESIZE); // job queue, cache-line aligned
memset(INITJT(jjt,jobqueue),0,sizeof(JOBQ[MAXTHREADPOOLS]));
DO(MAXTHREADPOOLS, (*INITJT(jjt,jobqueue))[i].ht[1]=(JOB *)&(*INITJT(jjt,jobqueue))[i].ht[1];) // when q is empty, tail points to itself, as a safe NOP store
INITJT(jjt,jobqueues)=aligned_malloc(sizeof(JOBQ[MAXTHREADPOOLS]),CACHELINESIZE); // job queue, cache-line aligned
memset(INITJT(jjt,jobqueues),0,sizeof(JOBQ[MAXTHREADPOOLS]));
DO(MAXTHREADPOOLS, (*INITJT(jjt,jobqueues))[i].ht[1]=(JOB *)&(*INITJT(jjt,jobqueues))[i].ht[1];) // when q is empty, tail points to itself, as a safe NOP store
#endif
// only crashing on startup INITJT(jjt,peekdata)=1; // wake up auditing
// Initialize subsystems in order. Each initializes all threads, if there are thread variables
Expand Down
2 changes: 1 addition & 1 deletion jsrc/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ CDPROC int _stdcall JFree(JS jt){
jm->jerr=0; jm->etxn=0; /* clear old errors */
dllquit(jm); // clean up call dll
#if PYXES
aligned_free(JT(jt,jobqueue));
aligned_free(JT(jt,jobqueues));
#endif
jvmrelease(jt,sizeof(JST)); // free the initial allocation
ZEROUPPER;
Expand Down
4 changes: 2 additions & 2 deletions jsrc/jt.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ typedef struct JSTstruct {
FLOAT16 zgemm_thres; // used by cip.c: when m*n*p exceeds this, use BLAS for complex matrix product. _1 means 'never'
// 2 bytes free
#if PYXES || 1
JOBQ (*jobqueue)[MAXTHREADPOOLS]; // one JOBQ block for each threadpool
JOBQ (*jobqueues)[MAXTHREADPOOLS]; // one JOBQ block for each threadpool
I filler7[1];
#else
I filler7[2];
Expand Down Expand Up @@ -457,7 +457,7 @@ typedef JST* JS; // shared part of struct
#define MTHREAD(jjt) (&jjt->threaddata[0]) // jt for master thread. jjt is the shared jt pointer
#define MDTHREAD(jjt) (&jjt->threaddata[jjt->promptthread]) // jt for master/debug thread. jjt is the shared jt pointer
#define THREADID(jt) ((((I)(jt)&(JTALIGNBDY-1))>>LGTHREADBLKSIZE)-(offsetof(struct JSTstruct, threaddata[0])>>LGTHREADBLKSIZE)) // thread number from jt. Thread 0 is the master
#define THREADID1(jt) ((((I)(jt)&(JTALIGNBDY-1))>>LGTHREADBLKSIZE)) // unique thread #, faster to calculate
#define THREADID1(jt) ((((I)(jt)&(JTALIGNBDY-1))>>LGTHREADBLKSIZE)) // unique thread #, faster to calculate (is 1+THREADID)
#define JTTHREAD0(jt) (JJTOJ(jt)->threaddata) // the array of JTT structs
#define JTFORTHREAD(jt,n) (&(JTTHREAD0(jt)[n])) // JTT struct for thread n
#define JTFORTHREAD1(jt,n) (&(JTTHREAD0(jt)[(n)-1])) // JTT struct for thread# returned from THREADID1
Expand Down
2 changes: 1 addition & 1 deletion jsrc/p.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ void protectlocals(J jt, I ofst){PSTK *stk=jt->parserstackframe.parserstkend1; A
A jtparsea(J jt, A *queue, I nwds){F1PREFIP;PSTK *stack;A z,*v;
// whenever we execute a fragment, parserstkend1 must be set to the execution stack of the fragement; the stack will be analyzed
// to get the error token. Errors during the stacking phase will be located from this routine

// obsolete if((I)JT(jt,jobqueues)&63)SEGFAULT;
if(likely(nwds>1)) { // normal case where there is a fragment to parse
// Save info for error typeout. We save pointers to the sentence, and the executing parser-stack frame for each execution, from which we infer error position
PFRAME oframe=jt->parserstackframe; // save all the stack status
Expand Down
6 changes: 3 additions & 3 deletions jsrc/vfrom.c
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ struct mvmctx opctx; // parms to all threads, and return values
box=C(AAV(w)[4]); ASSERT(AR(box)==1,EVRANK) ASSERT(AT(box)&LIT,EVDOMAIN) C *rvtv=CAV(box); // RVT
box=C(AAV(w)[5]); ASSERT(AR(box)==1,EVRANK) ASSERT(AT(box)&LIT,EVDOMAIN) opctx.bndrowmask=DAV(box); // bndrowmask
box=C(AAV(w)[8]); ASSERT(AR(box)<=1,EVRANK) ASSERT(AT(box)&INT,EVDOMAIN) // col indexes being evaluated
I isgradmode; I nthreads=(*JT(jt,jobqueue))[0].nthreads+1; // non0 if gradient mode; ptr to output if any; #threads available for processing
I isgradmode; I nthreads=(*JT(jt,jobqueues))[0].nthreads+1; // non0 if gradient mode; ptr to output if any; #threads available for processing
unsigned char (*actionrtn)(JJ, void *, UI4); // the routine to do the operation
if(isgradmode=(AR(box)!=0)){
// gradient mode (the dominant case)
Expand Down Expand Up @@ -1886,7 +1886,7 @@ F2(jtekupdate){F2PREFIP;

// figure out how many threads to use, how many lines to take in each one
#define TASKMINATOMS ((2*2000)/10) // TUNE a cached atom takes 10 clocks to compute; an uncached one takes 15? (2022 Alder Lake). We don't want to start a task with less than 2000 clocks, so insist on twice that many
I nthreads=(*JT(jt,jobqueue))[0].nthreads+1; // the number of threads we would like to use (including master), init to # available
I nthreads=(*JT(jt,jobqueues))[0].nthreads+1; // the number of threads we would like to use (including master), init to # available
I rowsperthread=m; // will be #rows each processor should take
if(((1-m)&(1-nthreads)&(TASKMINATOMS-m*n))>=0)nthreads=1; // if only one thread, or job too small, use just one thread
else{
Expand Down Expand Up @@ -2117,7 +2117,7 @@ F1(jtfindspr){F1PREFIP;
m=AS(ck)[AR(ck)-1]; // length of a column
// figure out how many threads to use, how many lines to take in each one
#define TASKMINATOMS ((2*2000)/2) // TUNE Values will be in cache. Normal DP comp is 2 clocks per atom. We don't want to start a task with less than 2000 clocks, so insist on twice that many
I nthreads=(*JT(jt,jobqueue))[0].nthreads+1; // the number of threads we would like to use (including master), init to # available
I nthreads=(*JT(jt,jobqueues))[0].nthreads+1; // the number of threads we would like to use (including master), init to # available
I rowsperthread=m; // will be #rows each processor should take
if(((1-nthreads)&(TASKMINATOMS-rowsperthread))>=0)nthreads=1; // if only one thread, or job too small, use just one thread
else{
Expand Down

0 comments on commit 6afe77c

Please sign in to comment.