Skip to content

Commit

Permalink
automaticly determining number of coprocs to spawn is finally working!!!
Browse files Browse the repository at this point in the history
  • Loading branch information
jkool702 committed Sep 5, 2024
1 parent ed5d10a commit a45b678
Showing 1 changed file with 89 additions and 62 deletions.
151 changes: 89 additions & 62 deletions forkrun.bash
Original file line number Diff line number Diff line change
Expand Up @@ -1038,10 +1038,6 @@ p_PID+=(\${p{<#>}_PID})""" )"
: >"${tmpDir}"/.spawned

(( ${verboseLevel} > 1 )) && printf '\n\n%s WORKER COPROCS FORKED\n\n' "${nProcs}" >&${fd_stderr}
(( ${verboseLevel} > 3 )) && {
printf '\n\nDYNAMICALLY GENERATED COPROC CODE:\n\n%s\n\n' "${coprocSrcCode}"
declare -p fd_continue fd_inotify fd_nAuto fd_nOrder fd_nOrder0 fd_nQueue fd_read fd_write fd_stdin fd_stdout fd_stderr
} >&${fd_stderr}

# setup dynamically coproc to spawn new workers based on read queue length
${nQueueFlag} && ! [[ -f "${tmpDir}"/.quit ]] && {
Expand All @@ -1059,19 +1055,21 @@ p_PID+=(\${p{<#>}_PID})""" )"
kkProcs=${nProcs}

p_PID=()
pLOADA=()

nQueue=0
nQueueLastCount=0
nQueueLastCount=0

(( "${nQueueMin}" <= 0 )) && nQueueMin=1

: "${pLOAD_max:=9500}" "${nProcsMax:=$((2*${nCPU}))}"
: "${pLOAD_max:=9500}" "${nProcsMax:=$((2*${nCPU}))}" "${nQueueLastCountGoal:=5}"

{ read -r pLOAD; read -r cpu_ALL; read -r cpu_LOAD; } < <(_forkrun_get_load -i)
mapfile -t pLOADA < <(_forkrun_get_load -i)

(( ${verboseLevel} > 2 )) && printf 'pLOADA = ( %s %s %s %s )\n' "${pLOADA[@]}" >&${fd_stderr}

# (( ${nProcsMax} > ( ${nProcs} + ( ${nProcs} * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} ) ) )) && nProcsMax=$(( ${nProcs} + ( ${nProcs} * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} ) ))

until [[ -f "${tmpDir}"/.quit ]] || (( ${kkProcs} >= ${nProcsMax} )) || (( ${pLOAD} >= ${pLOAD_max} )); do
until [[ -f "${tmpDir}"/.quit ]] || (( ${kkProcs} >= ${nProcsMax} )); do
nQueueLast=${nQueue}

# read from fd_queue pipe. =${
Expand All @@ -1089,24 +1087,22 @@ p_PID+=(\${p{<#>}_PID})""" )"

#(( ${verboseLevel} > 3 )) && { printf '\nnQueue = %s (nProcs = %s)\n' "${nQueue}" "${kkProcs}"; cat /proc/self/schedstat; } >&${fd_stderr}

# dont trigger spawning more workers until the main thread is done spawning the initial $nProcs workers
if (( ( ${nQueue} + ${nQueueLast} ) < ( 2 * ${nQueueMin} ) )); then

if [[ -f "${tmpDir}"/.spawned ]] && (( ( ( 2 * ${nQueue} ) + ${nQueueLast} + 1 ) < ( 3 * ${nQueueMin} ) )); then

if (( ${nQueueLastCount} < 3 )); then
if (( ${nQueueLastCount} < ( ${nQueueLastCountGoal} * ( 1 + ( kkProcs / nCPU ) ) ) )); then
((nQueueLastCount++))
else
nQueueLastCount=0

{ read -r pLOAD; read -r cpu_ALL; read -r cpu_LOAD; } < <(_forkrun_get_load "${pLOAD}" "${cpu_ALL}" "${cpu_LOAD}")
mapfile -t pLOADA < <(_forkrun_get_load "${pLOADA[@]}")

(( ${verboseLevel} > 2 )) && printf 'pLOAD=%s\n' "${pLOAD}" >&${fd_stderr}
(( ${verboseLevel} > 2 )) && printf 'pLOADA = ( %s %s %s %s )\n' "${pLOADA[@]}" >&${fd_stderr}

(( ${pLOAD} >= ${pLOAD_max} )) || {
(( ${pLOADA} >= ${pLOAD_max} )) || {

if (( ${nCPU} > ${kkProcs} )); then
pAdd=$(( 1 + ( ( ${nCPU} - ${kkProcs} ) * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} ) ))
(( ${verboseLevel} > 3 )) && printf '(pLOAD=%s -- initial pAdd: %s ' "${pLOAD}" "${pAdd}" >&${fd_stderr}
pAdd=$(( 1 + ( ( ${nCPU} - ${kkProcs} ) * ( ${pLOAD_max} - ${pLOADA} ) ) / ( 1 + ${pLOADA} ) ))
(( ${verboseLevel} > 3 )) && printf '(pLOAD=%s -- initial pAdd: %s ' "${pLOADA}" "${pAdd}" >&${fd_stderr}


(( ${pAdd} > ( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ) )) && pAdd=$(( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ))
Expand All @@ -1127,7 +1123,8 @@ p_PID+=(\${p{<#>}_PID})""" )"
echo "${kkProcs}" >"${tmpDir}"/.nWorkers
}
fi
[[ -f "${tmpDir}"/.done ]] && (( ${kkProcs} >= ${nCPU} )) && break
else
nQueueLastCount=0
fi

done
Expand All @@ -1142,8 +1139,13 @@ p_PID+=(\${p{<#>}_PID})""" )"

}

(( ${verboseLevel} > 3 )) && {
printf '\n\nDYNAMICALLY GENERATED COPROC CODE:\n\n%s\n\n' "${coprocSrcCode}"
declare -p fd_continue fd_inotify fd_nAuto fd_nOrder fd_nOrder0 fd_nQueue fd_read fd_write fd_stdin fd_stdout fd_stderr
} >&${fd_stderr}

# # # # # WAIT FOR THEM TO FINISH # # # # #
# # PRINT OUTPUT IF ORDERED # #
# # # PRINT OUTPUT IF ORDERED # # #

if ${nOrderFlag}; then
# initialize real-time printing of ordered output as forkrun runs
Expand Down Expand Up @@ -1723,88 +1725,113 @@ _forkrun_lseek_setup() {

_forkrun_lseek_setup

export -p | grep -F '_forkrun_get_load' && export -nf _forkrun_get_load

export -fp _forkrun_get_load &>/dev/null && export -nf _forkrun_get_load

_forkrun_get_load() (
## computes a "smoothed average system CPU load" using info gathered from /proc/stat
#
# USAGE: { read -r pLOAD; read -r cpu_ALL; read -r cpu_LOAD; } < <(_forkrun_get_load [-i|--init] [-m|--max|--max-load maxLoadNum] pLOAD cpu_ALL cpu_LOAD)
# USAGE:
# mapfile -t -n 4 pLOADA < <(_forkrun_get_load [-i|--init] [-e|--echo] [-m|--max|--max-load maxLoadNum] )
# mapfile -t -n 4 pLOADA < <(_forkrun_get_load [-e|--echo] [-m|--max|--max-load maxLoadNum] "${pLOADA[@]}")
#
# FLAGS:
# '-i'|'--init': initialize/reset load calculation.
# '-m'|'--max'|'--max-load' maxloadNum: positive integer (maxLoadNum) that replaces 10000 as the number that repesents 100% load.
# '-i'|'--init': initialize/reset load calculation.
# '-e'|'--echo': print average load to stderr in addition to printing pLOAD + cpu_ALL + cpu_LOAD to stdout
# '-m'|'--max'|'--max-load' maxloadNum: positive integer (maxLoadNum) that replaces 10000 as the number that repesents 100% load.
#
# OUTPUTS: pLOAD cpu_ALL cpu_LOAD
# --> pLOAD: represents the current average load level estimate between all logical CPU cores ( scaled between 0 - 10000, or (if set) between 0 - $maxLoadNum )
# --> cpu_ALL: total sum of ALL components from /proc/stats when the last pLOAD was computed
# --> cpu_LOAD: total sum of the components that represent CPU load (everything except idle time and IOwait time) when the last pLOAD was computed
# OUTPUTS: pLOAD cpu_ALL cpu_LOAD tALL
# --> pLOAD: represents the current average load level estimate between all logical CPU cores ( scaled between 0 - 10000, or (if set) between 0 - $maxLoadNum )
# --> cpu_ALL: total sum of ALL components from /proc/stats when the last pLOAD was computed
# --> cpu_LOAD: total sum of the components that represent CPU load (everything except idle time and IOwait time) when the last pLOAD was computed
# --> tALL: total time difference used in the last call to _forkrun_get_load (i.e., $(( CPU_ALL - CPU_ALL0 )) from previous run)
#
# INPUTS: pLOAD cpu_ALL cpu_LOAD
# --> Input the 3 values that were output last time _forkrun_get_load was called.
# --> Not required if using -i flag. If any of these 3 values are not given then `-i` flag is implied
# INPUTS: pLOADA=( $pLOAD $cpu_ALL $cpu_LOAD $tALL )
# --> Input the 3 values that were output last time _forkrun_get_load was called.
# --> Not required if using -i flag. If any of these 3 values are not given then `-i` flag is implied

local -i loadMaxVal cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice cpu_LOAD cpu_ALL cpu_LOAD cpu_ALL tLOAD tALL pLOAD cpu_LOAD0 cpu_ALL0 tALL0 cpu_LOAD0 pLOAD pLOAD0
local nn initFlag
unset IFS

local -i loadMaxVal cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice tLOAD tLOAD0 tALL tALL0 cpu_ALL cpu_ALL0 cpu_LOAD cpu_LOAD0 pLOAD pLOAD0 argCount
local nn initFlag echoFlag

loadMaxVal=10000
initFlag=false
pLOAD0=''
cpu_LOAD0=''
cpu_ALL0=''
echoFlag=false
argCount=0

pLOAD0="${pLOADA[0]}"
cpu_ALL0="${pLOADA[1]}"
cpu_LOAD0="${pLOADA[2]}"
tALL0="${pLOADA[3]}"


while (( ${#} > 0 )); do
case "${1}" in
-i|--init)
'-i'|'--init')
initFlag=true
;;
-m|--max|--max-load)
'-e'|'--echo')
echoFlag=true
;;
'-m'|'--max'|'--max-load')
[[ "${2}" == [0-9]* ]] && {
loadMaxVal="${2}"
(( ${loadMaxVal} > 0 )) || loadMaxVal=10000
shift 1
}
;;
[0-9]*)
if [[ ${pLOAD0} == 0 ]]; then
[[ ${1} == 0 ]] && pLOAD0=1 || pLOAD0="${1}"
elif [[ ${cpu_ALL0} == 0 ]]; then
cpu_ALL0="${1}"
elif [[ ${cpu_LOAD0} == 0 ]]; then
cpu_LOAD0="${1}"
fi
case "${argCount}" in
0) [[ ${1} == 0 ]] && pLOAD0=1 || pLOAD0="${1}" ;;
1) cpu_ALL0="${1}" ;;
2) cpu_LOAD0="${1}" ;;
3) tALL0="${1}" ;;
esac
((argCount++))
;;
esac
shift 1
done

if ${initFlag} || [[ ${pLOAD0} == 0 ]] || [[ ${cpu_ALL0} == 0 ]] || [[ ${cpu_LOAD0} == 0 ]] || [[ -z ${pLOAD0} ]] || [[ -z ${cpu_ALL0} ]] || [[ -z ${cpu_LOAD0} ]] ; then
pLOAD0=''
cpu_LOAD0=''
cpu_ALL0=''
initFlag=true
fi
# if [[ ${pLOAD0} == 0 ]] || [[ ${cpu_ALL0} == 0 ]] || [[ ${cpu_LOAD0} == 0 ]] || [[ ${tALL0} == 0 ]] || [[ -z ${pLOAD0} ]] || [[ -z ${cpu_ALL0} ]] || [[ -z ${cpu_LOAD0} ]] || [[ -z ${tALL0} ]]; then
# initFlag=true
# fi

read -r _ cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice </proc/stat

cpu_LOAD=$(( cpu_user + cpu_nice + cpu_system + cpu_irq + cpu_softirq + cpu_steal + cpu_guest + cpu_guestnice ))
cpu_ALL=$(( cpu_LOAD + cpu_idle + cpu_IOwait ))

${initFlag} && {
cpu_ALL0="${cpu_ALL}"
cpu_LOAD0="${cpu_LOAD}"

( read -r -u $fd_sleep -t 0.01; ) {fd_sleep}<><(:)

read -r _ cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice </proc/stat;
read -r _ cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice </proc/stat

cpu_LOAD=$(( cpu_user + cpu_nice + cpu_system + cpu_irq + cpu_softirq + cpu_steal + cpu_guest + cpu_guestnice ));
cpu_ALL=$(( cpu_LOAD + cpu_idle + cpu_IOwait ));
cpu_LOAD=$(( cpu_user + cpu_nice + cpu_system + cpu_irq + cpu_softirq + cpu_steal + cpu_guest + cpu_guestnice ))
cpu_ALL=$(( cpu_LOAD + cpu_idle + cpu_IOwait ))
}

tALL=$(( cpu_ALL - cpu_ALL0 ))

pLOAD=$(( ( loadMaxVal * ( cpu_LOAD - ${cpu_LOAD0:-${cpu_LOAD}} ) ) / ( 1 + cpu_ALL - ${cpu_ALL0:-${cpu_ALL}} ) ))
pLOAD=$(( ( loadMaxVal * ( cpu_LOAD - cpu_LOAD0 ) ) / ( 1 + cpu_ALL - cpu_ALL0 ) ))

${initFlag} || {

tLOAD=$(( cpu_LOAD - ${cpu_LOAD0:-${cpu_LOAD}} ))
tALL=$(( cpu_ALL - ${cpu_ALL0:-${cpu_ALL}} ))
tLOAD=$(( cpu_LOAD - cpu_LOAD0 ))

(( ${tALL0:-${tALL}} > ( 10 * ${tALL} ) )) && tALL0=$(( 10 * tALL ))
(( tALL0 > ( 10 * tALL ) )) && tALL0=$(( 10 * tALL ))

pLOAD=$(( ( loadMaxVal * tLOAD ) / ( 1 + tALL ) ))
pLOAD=$(( ( ( ( ${tALL0:-${tALL}} + tALL ) * pLOAD ) + ( ${tALL0:-${tALL}} * ${pLOAD0:-${pLOAD}} ) ) / ( 1 + tALL + ( 2 * ${tALL0:-${tALL}} ) ) ))
pLOAD=$(( ( ( ( 1 + tALL + tALL0 ) * pLOAD ) + ( tALL0 * pLOAD0 ) ) / ( 1 + tALL + ( 2 * tALL0 ) ) ))

}

printf '%s\n%s\n%s\n\n' "${pLOAD}" "${cpu_ALL}" "${cpu_LOAD}"

pLOADA=("${pLOAD}" "${cpu_ALL}" "${cpu_LOAD}" "${tALL}")
printf '%s\n' "${pLOADA[@]}"
)

export -f _forkrun_get_load
export -f _forkrun_get_load

0 comments on commit a45b678

Please sign in to comment.