Skip to content

Commit

Permalink
modified routine to get CPU usage for automtic determination of numbe…
Browse files Browse the repository at this point in the history
…r of coprocs to spawn
  • Loading branch information
jkool702 committed Sep 4, 2024
1 parent b35fef9 commit ed5d10a
Showing 1 changed file with 169 additions and 144 deletions.
313 changes: 169 additions & 144 deletions forkrun.bash
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ forkrun() {

# make all variables local
local tmpDir fPath outStr delimiterVal delimiterReadStr delimiterRemoveStr exitTrapStr exitTrapStr_kill nLines0 nOrder nProcs nProcsMax nBytes tTimeout coprocSrcCode outCur tmpDirRoot returnVal tmpVar t0 readBytesProg nullDelimiterProg ddQuietStr trailingNullFlag inotifyFlag lseekFlag fallocateFlag nLinesAutoFlag nQueueFlag substituteStringFlag substituteStringIDFlag nOrderFlag readBytesFlag readBytesExactFlag nullDelimiterFlag subshellRunFlag stdinRunFlag pipeReadFlag rmTmpDirFlag exportOrderFlag noFuncFlag unescapeFlag optParseFlag continueFlag doneIndicatorFlag FORCE_allowCarriageReturnsFlag ddAvailableFlag fd_continue fd_inotify fd_inotify0 fd_nAuto fd_nAuto0 fd_nOrder fd_nOrder0 fd_read fd_read0 fd_write fd_stdout fd_stdin fd_stderr pWrite pOrder pAuto pQueue pWrite_PID pNotify_PID pOrder_PID pAuto_PID pQueue_PID fd_read_pos fd_read_pos_old fd_write_pos DEBUG_FORKRUN
local -i PID0 nLines nLinesCur nLinesNew nLinesMax nRead nWait nOrder0 nBytesRead nQueue nQueueLast nQueueMin nCPU v9 kkMax kkCur kk kkProcs kkProcs0 verboseLevel cpu_LOAD0 cpu_ALL0 pLOAD pLOAD0 pLOAD_max pAdd
local -i PID0 nLines nLinesCur nLinesNew nLinesMax nRead nWait nOrder0 nBytesRead nQueue nQueueLast nQueueMin nQueueLastCount nCPU v9 kkMax kkCur kk kkProcs verboseLevel pLOAD_max pAdd
local -a A p_PID runCmd outHave

# # # # # PARSE OPTIONS # # # # #
Expand Down Expand Up @@ -262,8 +262,6 @@ forkrun() {

shopt -s nullglob

_forkrun_get_load -i

# dynamically set defaults for a few flags
: "${noFuncFlag:=false}" "${FORCE_allowCarriageReturnsFlag:=false}" "${readBytesFlag:=false}" "${readBytesExactFlag:=false}" "${nullDelimiterFlag:=false}"

Expand Down Expand Up @@ -502,11 +500,6 @@ forkrun() {

# # # # # FORK "HELPER" PROCESSES # # # # #

# get baseline cpuload
${nQueueFlag} && {
_forkrun_get_load
}

# start building exit trap string
exitTrapStr=': >"'"${tmpDir}"'"/.done;
: >"'"${tmpDir}"'"/.quit;
Expand Down Expand Up @@ -988,90 +981,6 @@ done
} 2>&${fd_stderr} {fd_nAuto0}>&${fd_nAuto}
} 2>/dev/null
p_PID+=(\${p{<#>}_PID})""" )"

# setup dynamically coproc to spawn new workers based on read queue length
${nQueueFlag} && ! [[ -f "${tmpDir}"/.quit ]] && {
{ coproc pQueue {

export LC_ALL=C LANG=C IFS=

trap '[[ -f "'"${tmpDir}"'"/.run/pQueue ]] && \rm -f "'"${tmpDir}"'"/.run/pQueue' EXIT
trap 'trap - TERM INT HUP USR1; kill -USR1 "${p_PID[@]}"; kill -INT '"${PID0}"' ${BASHPID} "${p_PID[@]}"' INT
trap 'trap - TERM INT HUP USR1; kill -USR1 "${p_PID[@]}"; kill -TERM '"${PID0}"' ${BASHPID} "${p_PID[@]}"' TERM
trap 'trap - TERM INT HUP USR1; kill -USR1 "${p_PID[@]}"; kill -HUP '"${PID0}"' ${BASHPID} "${p_PID[@]}"' HUP
trap 'trap - TERM INT HUP USR1' USR1

# start spawning after nProcs workers already forked
kkProcs=${nProcs}

p_PID=()

nQueue=0

(( "${nQueueMin}" <= 0 )) && nQueueMin=1

: "${pLOAD_max:=900}"

pLOAD=${pLOAD}
pLOAD0=${pLOAD0}
cpu_LOAD0=${cpu_LOAD0}
cpu_ALL0=${cpu_ALL0}

_forkrun_get_load
(( ${nProcsMax} > ( ${nProcs} + ( ${nProcs} * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} - ${pLOAD0} ) ) )) && nProcsMax=$(( ${nProcs} + ( ${nProcs} * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} - ${pLOAD0} ) ))
kkProcs0=${nProcs}

until [[ -f "${tmpDir}"/.quit ]] || (( ${kkProcs} >= ${nProcsMax} )); do
nQueueLast=${nQueue}

# read from fd_queue pipe. =${
# '+' --> increase queue depth by 1.
# '-' --> decrease queue depth by 1.
# '0' --> quit
read -r -u ${fd_nQueue} -N 1

case "${REPLY}" in
'+') ((nQueue++)) ;;
'-') ((nQueue--)) ;;
0) break ;;
*) continue ;;
esac

#(( ${verboseLevel} > 3 )) && { printf '\nnQueue = %s (nProcs = %s)\n' "${nQueue}" "${kkProcs}"; cat /proc/self/schedstat; } >&${fd_stderr}

# dont trigger spawning more workers until the main thread is done spawning the initial $nProcs workers

[[ -f "${tmpDir}"/.spawned ]] && (( ( ( 2 * ${nQueue} ) + ${nQueueLast} + 1 ) < ( 3 * ${nQueueMin:=1} ) )) && {
_forkrun_get_load
(( ${pLOAD} >= ${pLOAD_max} )) && break
pAdd=$(( 1 + ( ( ${kkProcs} - ${kkProcs0} ) * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} - ${pLOAD0} ) ))
kkProcs0=${kkProcs}

(( ${pAdd} > ( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ) )) && pAdd=$(( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ))
(( ${verboseLevel} > 3 )) && printf '(pLOAD=%s, pLOAD0=%s -- initial pAdd: %s -- ' "${pLOAD}" "${pLOAD0}" "${pAdd}" >&${fd_stderr}
(( ${pAdd} > ( 1 + ( ${nCPU} / 8 ) ) )) && pAdd=$(( 1 + ( ${nCPU} / 8 ) ))
(( ${pAdd} <= 0 )) && pAdd=1
(( ${verboseLevel} > 3 )) && printf 'final pAdd: %s \n' "${pAdd}" >&${fd_stderr}

for (( kk=0; kk<${pAdd}; kk++ )); do
source /proc/self/fd/0 <<<"${coprocSrcCode//'{<#>}'/"${kkProcs}"}"
(( ${verboseLevel} > 2 )) && printf '\nSPAWNING A NEW WORKER COPROC (%s/%s). There are now %s coprocs. (read queue depth = %s)\n' "${kk}" "${pAdd}" "${kkProcs}" "${nQueue}" >&${fd_stderr}
((kkProcs++))
done
echo "${kkProcs}" >"${tmpDir}"/.nWorkers
}

done

[[ ${#p_PID[@]} == 0 ]] || wait "${p_PID[@]}"

} 2>&${fd_stderr}
} 2>/dev/null

exitTrapStr+='echo "0" >&'"${fd_nQueue}"'; '$'\n'
printf '%s\n' "${pQueue_PID}" > "${tmpDir}"/.run/pQueue

}

# set traps (dynamically determined based on which option flags were active)

Expand Down Expand Up @@ -1106,7 +1015,7 @@ p_PID+=(\${p{<#>}_PID})""" )"
kill -USR1 $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null);
kill -HUP $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PID0}" HUP

#trap 'source /proc/self/fd/0 <<<"${coprocSrcCode//'"'"'{<#>}'"'"'/"${kkProcs}"}"' USR2
#trap 'source /proc/self/fd/0 <<<"${coprocSrcCode//'"'"'{<#>}'"'"'/"${kkProcs}"}"' USR2

(( ${verboseLevel} > 1 )) && printf '\n\nALL HELPER COPROCS FORKED\n\n' >&${fd_stderr}
(( ${verboseLevel} > 3 )) && { printf '\nSET TRAPS:\n\n'; trap -p; } >&${fd_stderr}
Expand Down Expand Up @@ -1134,6 +1043,105 @@ p_PID+=(\${p{<#>}_PID})""" )"
declare -p fd_continue fd_inotify fd_nAuto fd_nOrder fd_nOrder0 fd_nQueue fd_read fd_write fd_stdin fd_stdout fd_stderr
} >&${fd_stderr}

# setup dynamically coproc to spawn new workers based on read queue length
${nQueueFlag} && ! [[ -f "${tmpDir}"/.quit ]] && {
{ coproc pQueue {

export LC_ALL=C LANG=C IFS=

trap '[[ -f "'"${tmpDir}"'"/.run/pQueue ]] && \rm -f "'"${tmpDir}"'"/.run/pQueue' EXIT
trap 'trap - TERM INT HUP USR1; kill -USR1 "${p_PID[@]}"; kill -INT '"${PID0}"' ${BASHPID} "${p_PID[@]}"' INT
trap 'trap - TERM INT HUP USR1; kill -USR1 "${p_PID[@]}"; kill -TERM '"${PID0}"' ${BASHPID} "${p_PID[@]}"' TERM
trap 'trap - TERM INT HUP USR1; kill -USR1 "${p_PID[@]}"; kill -HUP '"${PID0}"' ${BASHPID} "${p_PID[@]}"' HUP
trap 'trap - TERM INT HUP USR1' USR1

# start spawning after nProcs workers already forked
kkProcs=${nProcs}

p_PID=()

nQueue=0
nQueueLastCount=0

(( "${nQueueMin}" <= 0 )) && nQueueMin=1

: "${pLOAD_max:=9500}" "${nProcsMax:=$((2*${nCPU}))}"

{ read -r pLOAD; read -r cpu_ALL; read -r cpu_LOAD; } < <(_forkrun_get_load -i)

# (( ${nProcsMax} > ( ${nProcs} + ( ${nProcs} * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} ) ) )) && nProcsMax=$(( ${nProcs} + ( ${nProcs} * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} ) ))

until [[ -f "${tmpDir}"/.quit ]] || (( ${kkProcs} >= ${nProcsMax} )) || (( ${pLOAD} >= ${pLOAD_max} )); do
nQueueLast=${nQueue}

# read from fd_queue pipe. =${
# '+' --> increase queue depth by 1.
# '-' --> decrease queue depth by 1.
# '0' --> quit
read -r -u ${fd_nQueue} -N 1

case "${REPLY}" in
'+') ((nQueue++)) ;;
'-') ((nQueue--)) ;;
0) break ;;
*) continue ;;
esac

#(( ${verboseLevel} > 3 )) && { printf '\nnQueue = %s (nProcs = %s)\n' "${nQueue}" "${kkProcs}"; cat /proc/self/schedstat; } >&${fd_stderr}

# dont trigger spawning more workers until the main thread is done spawning the initial $nProcs workers

if [[ -f "${tmpDir}"/.spawned ]] && (( ( ( 2 * ${nQueue} ) + ${nQueueLast} + 1 ) < ( 3 * ${nQueueMin} ) )); then

if (( ${nQueueLastCount} < 3 )); then
((nQueueLastCount++))
else
nQueueLastCount=0

{ read -r pLOAD; read -r cpu_ALL; read -r cpu_LOAD; } < <(_forkrun_get_load "${pLOAD}" "${cpu_ALL}" "${cpu_LOAD}")

(( ${verboseLevel} > 2 )) && printf 'pLOAD=%s\n' "${pLOAD}" >&${fd_stderr}

(( ${pLOAD} >= ${pLOAD_max} )) || {

if (( ${nCPU} > ${kkProcs} )); then
pAdd=$(( 1 + ( ( ${nCPU} - ${kkProcs} ) * ( ${pLOAD_max} - ${pLOAD} ) ) / ( 1 + ${pLOAD} ) ))
(( ${verboseLevel} > 3 )) && printf '(pLOAD=%s -- initial pAdd: %s ' "${pLOAD}" "${pAdd}" >&${fd_stderr}


(( ${pAdd} > ( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ) )) && pAdd=$(( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ))
(( ${pAdd} > ( 1 + ( ${nCPU} / 16 ) ) )) && pAdd=$(( 1 + ( ${nCPU} / 16 ) ))

(( ${pAdd} < 1 )) && pAdd=1
else
pAdd=1
fi

(( ${verboseLevel} > 3 )) && printf 'final pAdd: %s \n' "${pAdd}" >&${fd_stderr}

for (( kk=0; kk<${pAdd}; kk++ )); do
source /proc/self/fd/0 <<<"${coprocSrcCode//'{<#>}'/"${kkProcs}"}"
(( ${verboseLevel} > 2 )) && printf '\nSPAWNING A NEW WORKER COPROC (%s/%s). There are now %s coprocs. (read queue depth = %s)\n' "${kk}" "${pAdd}" "${kkProcs}" "${nQueue}" >&${fd_stderr}
((kkProcs++))
done
echo "${kkProcs}" >"${tmpDir}"/.nWorkers
}
fi
[[ -f "${tmpDir}"/.done ]] && (( ${kkProcs} >= ${nCPU} )) && break
fi

done

[[ ${#p_PID[@]} == 0 ]] || wait "${p_PID[@]}"

} 2>&${fd_stderr}
} 2>/dev/null

exitTrapStr+='echo "0" >&'"${fd_nQueue}"'; '$'\n'
printf '%s\n' "${pQueue_PID}" > "${tmpDir}"/.run/pQueue

}

# # # # # WAIT FOR THEM TO FINISH # # # # #
# # PRINT OUTPUT IF ORDERED # #

Expand Down Expand Up @@ -1715,71 +1723,88 @@ _forkrun_lseek_setup() {

_forkrun_lseek_setup

_forkrun_get_load() {
# prints average load gathered from /proc/stat
export -p | grep -F '_forkrun_get_load' && export -nf _forkrun_get_load

_forkrun_get_load() (
## computes a "smoothed average system CPU load" using info gathered from /proc/stat
#
# USAGE: _forkrun_get_load [-i|--init] [maxLoadNum]
# USAGE: { read -r pLOAD; read -r cpu_ALL; read -r cpu_LOAD; } < <(_forkrun_get_load [-i|--init] [-m|--max|--max-load maxLoadNum] pLOAD cpu_ALL cpu_LOAD)
#
# sets variable `pLOAD` to a number between 0 - 1 million that represents the average load between all logical CPU cores
# 0 --> no load 1000000 --> 100% load
# FLAGS:
# '-i'|'--init': initialize/reset load calculation.
# '-m'|'--max'|'--max-load' maxloadNum: positive integer (maxLoadNum) that replaces 10000 as the number that repesents 100% load.
#
# FLAGS:
# '-i' or '--init': initialize/reset load calculation
# '-e' or '--echo': in addition to setting pLOAD variable, echo the load level (to stdout)
# maxloadNum (#) : positive integer that replaces 1000000 as the number that repesents 100% load.
# OUTPUTS: pLOAD cpu_ALL cpu_LOAD
# --> pLOAD: represents the current average load level estimate between all logical CPU cores ( scaled between 0 - 10000, or (if set) between 0 - $maxLoadNum )
# --> cpu_ALL: total sum of ALL components from /proc/stats when the last pLOAD was computed
# --> cpu_LOAD: total sum of the components that represent CPU load (everything except idle time and IOwait time) when the last pLOAD was computed
#
# NOTE: requires the following variables be stored without modification by the caller between calls to _forkrun_get_load:
# cpu_LOAD1 cpu_ALL1 tLOAD1 tALL1 pLOAD cpu_LOAD0 cpu_ALL0 tALL0 cpu_LOAD0

local -i loadMaxVal cpu_user1 cpu_nice1 cpu_system1 cpu_idle1 cpu_IOwait1 cpu_irq1 cpu_softirq1 cpu_steal1 cpu_guest1 cpu_guestnice1 cpu_LOAD1 cpu_ALL1
local nn echoFlag
# declare -ig pLOAD pLOAD0 cpu_LOAD0 cpu_ALL0

echoFlag=false
loadMaxVal=1000
for nn in "${@}"; do
case "${nn}" in
'-i'|'--init')
# unset pLOAD pLOAD0 cpu_LOAD0 cpu_ALL0
# declare pLOAD pLOAD0 cpu_LOAD0 cpu_ALL0
pLOAD=''
pLOAD0=''
cpu_LOAD0=''
cpu_ALL0=''
# INPUTS: pLOAD cpu_ALL cpu_LOAD
# --> Input the 3 values that were output last time _forkrun_get_load was called.
# --> Not required if using -i flag. If any of these 3 values are not given then `-i` flag is implied

local -i loadMaxVal cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice cpu_LOAD cpu_ALL cpu_LOAD cpu_ALL tLOAD tALL pLOAD cpu_LOAD0 cpu_ALL0 tALL0 cpu_LOAD0 pLOAD pLOAD0
local nn initFlag

loadMaxVal=10000
initFlag=false
pLOAD0=''
cpu_LOAD0=''
cpu_ALL0=''

while (( ${#} > 0 )); do
case "${1}" in
-i|--init)
initFlag=true
;;
'-e'|'--echo')
echoFlag=true
-m|--max|--max-load)
[[ "${2}" == [0-9]* ]] && {
loadMaxVal="${2}"
(( ${loadMaxVal} > 0 )) || loadMaxVal=10000
shift 1
}
;;
[0-9]*)
loadMaxVal="${1}"
(( ${loadMaxVal} > 0 )) || loadMaxVal=1000
if [[ ${pLOAD0} == 0 ]]; then
[[ ${1} == 0 ]] && pLOAD0=1 || pLOAD0="${1}"
elif [[ ${cpu_ALL0} == 0 ]]; then
cpu_ALL0="${1}"
elif [[ ${cpu_LOAD0} == 0 ]]; then
cpu_LOAD0="${1}"
fi
;;
esac
shift 1
done

# pLOAD0=${pLOAD:-0}
if ${initFlag} || [[ ${pLOAD0} == 0 ]] || [[ ${cpu_ALL0} == 0 ]] || [[ ${cpu_LOAD0} == 0 ]] || [[ -z ${pLOAD0} ]] || [[ -z ${cpu_ALL0} ]] || [[ -z ${cpu_LOAD0} ]] ; then
pLOAD0=''
cpu_LOAD0=''
cpu_ALL0=''
initFlag=true
fi

read -r _ cpu_user1 cpu_nice1 cpu_system1 cpu_idle1 cpu_IOwait1 cpu_irq1 cpu_softirq1 cpu_steal1 cpu_guest1 cpu_guestnice1 </proc/stat;
read -r _ cpu_user cpu_nice cpu_system cpu_idle cpu_IOwait cpu_irq cpu_softirq cpu_steal cpu_guest cpu_guestnice </proc/stat;

cpu_LOAD1=$(( cpu_user1 + cpu_nice1 + cpu_system1 + cpu_irq1 + cpu_softirq1 + cpu_steal1 + cpu_guest1 + cpu_guestnice1 ));
cpu_ALL1=$(( cpu_LOAD1 + cpu_idle1 + cpu_IOwait1 ));
cpu_LOAD=$(( cpu_user + cpu_nice + cpu_system + cpu_irq + cpu_softirq + cpu_steal + cpu_guest + cpu_guestnice ));
cpu_ALL=$(( cpu_LOAD + cpu_idle + cpu_IOwait ));

pLOAD=$(( ( loadMaxVal * ( cpu_LOAD1 - ${cpu_LOAD0:-${cpu_LOAD1}} ) ) / ( 1 + cpu_ALL1 - ${cpu_ALL0:-${cpu_ALL1}} ) ))

# tLOAD1=$(( cpu_LOAD1 - ${cpu_LOAD0:-${cpu_LOAD1}} ))
# tALL1=$(( cpu_ALL1 - ${cpu_ALL0:-${cpu_ALL1}} ))
#
# (( ${tALL0:-${tALL1}} > ( 10 * ${tALL1} ) )) && tALL0=$(( 10 * tALL1 ))
#
# pLOAD=$(( ( loadMaxVal * tLOAD1 ) / ( 1 + tALL1 ) ))
# pLOAD=$(( ( ( ( ${tALL0:-${tALL1}} + tALL1 ) * pLOAD ) + ( ${tALL0:-${tALL1}} * ${pLOAD0:-${pLOAD}} ) ) / ( 1 + tALL1 + ( 2 * ${tALL0:-${tALL1}} ) ) ))
pLOAD=$(( ( loadMaxVal * ( cpu_LOAD - ${cpu_LOAD0:-${cpu_LOAD}} ) ) / ( 1 + cpu_ALL - ${cpu_ALL0:-${cpu_ALL}} ) ))

cpu_LOAD0=${cpu_LOAD1}
cpu_ALL0=${cpu_ALL1}
# tALL0+=${tALL1}
pLOAD0=${pLOAD}
${initFlag} || {

tLOAD=$(( cpu_LOAD - ${cpu_LOAD0:-${cpu_LOAD}} ))
tALL=$(( cpu_ALL - ${cpu_ALL0:-${cpu_ALL}} ))

(( ${tALL0:-${tALL}} > ( 10 * ${tALL} ) )) && tALL0=$(( 10 * tALL ))


${echoFlag} && echo "${pLOAD}"
}
pLOAD=$(( ( loadMaxVal * tLOAD ) / ( 1 + tALL ) ))
pLOAD=$(( ( ( ( ${tALL0:-${tALL}} + tALL ) * pLOAD ) + ( ${tALL0:-${tALL}} * ${pLOAD0:-${pLOAD}} ) ) / ( 1 + tALL + ( 2 * ${tALL0:-${tALL}} ) ) ))

}

printf '%s\n%s\n%s\n\n' "${pLOAD}" "${cpu_ALL}" "${cpu_LOAD}"

)

export -f _forkrun_get_load

0 comments on commit ed5d10a

Please sign in to comment.