define(LSSPEC, MATRIX *ordinary; MATRIX *special; ) define(LSDEC, `DECVAR($1,1,LSSPEC)' ) define(LSINIT, `MONINIT($1,1) $1.ordinary = gm->limbo; $1.special = gm->mat_buffer;' ) define(RSSPEC, int master; ) define(RSDEC, `DECVAR($1,1,RSSPEC)' ) define(RSINIT, `MONINIT($1,1) $1.master = -1;' ) define(MQSPEC, int delayed; ) define(MQDEC, `DECVAR($1,1,MQSPEC)' ) define(MQINIT, `MONINIT($1,1) $1.delayed = 0;' ) /***************************************************************/ /** MONITORS_RESEt ** ** Just re-initialise the guarded variables in GS and RS **/ define(MONITORS_RESET, `MENTER(gm->GS) gm->GS.sub = 0; MEXIT(gm->GS) MENTER(gm->RS) gm->RS.master = -1; MEXIT(gm->RS)' ) /** Q_POp( , , , ) ** ** Pop the first matrix off a queue or a stack. **/ define(Q_POP, `MENTER($1) $3 = $2; if ( $2 ) { $2 = $2->nextmat; $3->nextmat = 0; } else if ( !$4 ) { DELAY($1,0) CONTINUE($1,0) } MEXIT($1)' ) /** Q_PUSh( , , , ) ** ** Push an object onto a queue. **/ define(Q_PUSH, `MENTER($1) $4->nextmat = 0; if ( $2 ) { if ( $3 ) $3->nextmat = $4; } else $2 = $4; $3 = $4; CONTINUE($1,0) MEXIT($1)' ) /** S_PUSh( , , ) ** ** Push an object onto a push-down stack. **/ define(S_PUSH, `MENTER($1) $3->nextmat = $2; $2 = $3; if (1) { CONTINUE($1,0) } MEXIT($1)' ) /** READy ** ** Record the fact that the process has exhausted its problem ** and release the master process if it might be hanging. ** Note that MQ is entered from RS, but never vice versa. **/ define(READY, `MENTER(gm->RS) if ( !gm->ready[offset] ) gm->mat_list[offset] = 0; gm->ready[offset] = 2; if ( gm->RS.master == offset ) { CONTINUE(gm->RS,0) } MEXIT(gm->RS)' ) /** LOOk( ) ** ** Slave process records its job and gets the master's position. ** If the master is delayed in RS(0), it is released. **/ define(LOOK, `if ( !gm->ready[offset] ) { gm->mat_list[offset] = 0; gm->ready[offset] = 1; } MENTER(gm->RS) $1 = ( gm->RS.master == offset ); if ( $1 ) { CONTINUE(gm->RS,0) } MEXIT(gm->RS)' ) /** PEEk( ) ** ** Similar doings in reverse for the master process only **/ define(PEEK, `MENTER(gm->RS) if ( gm->RS.master < offset ) { gm->RS.master = offset; MENTER(gm->LS) CONTINUE(gm->LS,0) MEXIT(gm->LS) if ( gm->ready[offset] < 2 ) { DELAY(gm->RS,0) } } $1 = gm->ready[offset]; MEXIT(gm->RS)' ) /** TAKe( , ) ** ** Get the next available matrix from the appropriate queue. ** This is "ordinary" unless the master is right behind you. **/ define(TAKE, `LOOK($2) do { if ( $2 ) { Q_POP(gm->LS, gm->LS.special, $1, 0) } else { Q_POP(gm->LS, gm->LS.ordinary, $1, 0) } } while ( !$1 );' ) /** GIVe ** ** The opposite of TAKE: push the matrix back on the ** queue it came from. **/ define(GIVE, `if ( mymat >= gm->limbo && mymat < gm->limbo+MMAX ) { S_PUSH(gm->LS, gm->LS.ordinary, mymat) } else { S_PUSH(gm->LS, gm->LS.special, mymat) }' ) /** PUSHMAt( , ) ** ** Add a matrix to the list of those ready for printing. ** If master is elsewhere do not use the monitor. **/ define(PUSHMAT, `if ( $2 ) { Q_PUSH(gm->MQ,gm->mat_list[offset],mymat,$1) } else { $1->nextmat = 0; if ( gm->mat_list[offset] ) { if ( mymat ) mymat->nextmat = $1; } else gm->mat_list[offset] = $1; mymat = $1; }' ) /** POPMAt ** ** Again the opposite: get the first matrix in order to print ** it out. If the slave has already gone don't use Q_POp. **/ define(POPMAT, `{ int state; do { PEEK(state) if ( state > 1) { mymat = gm->mat_list[offset]; if ( mymat ) gm->mat_list[offset] = gm->mat_list[offset]->nextmat; } else { Q_POP(gm->MQ,gm->mat_list[offset],mymat,1) } if ( state==1 && !mymat ) catnap(1); } while ( state==1 && !mymat ); }' ) /* create(,<>,) */ define(Create, `{ long rc; fflush(stdout); $3 = 0; LOCK(xx_cmem->proc_table) if ((rc = fork()) == 0) { LOCK(xx_cmem->proc_table) WHO_AM_I(&xx_my_id) UNLOCK(xx_cmem->proc_table) $1(); exit(0); } else if (rc == -1) { printf("*** failure in create ***\n"); $3 = 1; } else { /* now put in the entry for the new process */ xx_local_slv_cnt++; { struct xx_process *p1; int xx_i; PROC_ID xx_proc; p1 = (struct xx_process *) G_MALLOC(sizeof(struct xx_process)); MONINIT(p1->msg_q_monitor,1) p1->first_msg = p1->last_msg = 0; p1->upid = rc; p1->socket_id = 0; p1->port = 0; xx_proc.port = htonl(0); gethostname(p1->host,64); for (xx_i=1; xx_cmem->active_processes[xx_i]; xx_i++) ; xx_cmem->active_processes[xx_i] = p1; xx_proc.proc_table_indx = htonl(xx_i); ifelse($2,,,xx_copy_id($2,&xx_proc);) } } UNLOCK(xx_cmem->proc_table) }' ) /* kreate(,) */ define(KREATE, ` Create($1,,$2) ') /* dead_slaves */ define(DEAD_SLAVES, `{ int xx_ds_i; xx_pt_upid = xx_upid; for (xx_ds_i=1; xx_ds_i < MAX_PROCS; xx_ds_i++) { *(xx_pt_upid++) = 0; xx_cmem->active_processes[xx_ds_i] = 0; } xx_pt_upid = xx_upid; }' )