Skip to content

Commit

Permalink
implement process limit
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Feb 21, 2024
1 parent 87b43b5 commit dcb7eeb
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 29 deletions.
5 changes: 4 additions & 1 deletion configs/conf_datastream_small.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# CONFIGURATION FILE FOR NGEN-DATASTREAM

START_DATE="200301200100"
END_DATE="200301210100"
DATA_PATH="/home/ec2-user/datastream-small-run"
Expand All @@ -6,4 +8,5 @@ RESOURCE_PATH="s3://ngen-datastream/resources_small/"
# S3_MOUNT=""
# SUBSET_ID_TYPE=""
# SUBSET_ID=""
# HYDROFABRIC_VERSION=""
# HYDROFABRIC_VERSION=""
# NPROCS=""
5 changes: 4 additions & 1 deletion configs/conf_datastream_startNend.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# CONFIGURATION FILE FOR NGEN-DATASTREAM

START_DATE="202402010100"
END_DATE="202402100000"
DATA_PATH="/home/ec2-user/example-folder"
Expand All @@ -6,4 +8,5 @@ DATA_PATH="/home/ec2-user/example-folder"
# S3_MOUNT=""
# SUBSET_ID_TYPE=""
# SUBSET_ID=""
# HYDROFABRIC_VERSION=""
# HYDROFABRIC_VERSION=""
# NPROCS=""
2 changes: 2 additions & 0 deletions configs/conf_datastream_subset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ START_DATE="DAILY"
# DATA_PATH=""
# RESOURCE_PATH=""
# RELATIVE_TO=""
# S3_MOUNT=""
SUBSET_ID_TYPE="hl_uri"
SUBSET_ID="Gages-04185000"
HYDROFABRIC_VERSION="v20"
NPROCS=""

3 changes: 2 additions & 1 deletion configs/conf_datastream_template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ END_DATE="202402100000"
# S3_MOUNT=""
# SUBSET_ID_TYPE=""
# SUBSET_ID=""
# HYDROFABRIC_VERSION=""
# HYDROFABRIC_VERSION=""
# NPROCS=""
12 changes: 6 additions & 6 deletions forcingprocessor/configs/conf_nwmurl_operational.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"forcing_type" : "operational_archive",
"start_date" : "202310300000",
"end_date" : "202310300000",
"runinput" : 1,
"start_date" : "",
"end_date" : "",
"fcst_cycle" : [0],
"lead_time" : [],
"varinput" : 5,
"geoinput" : 1,
"meminput" : 0,
"runinput" : 2,
"urlbaseinput" : 7,
"fcst_cycle" : [0],
"lead_time" : [1,2,3,4,5,6]
"meminput" : 0
}
28 changes: 15 additions & 13 deletions python/configure-datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
def generate_config(args):
config = {
"globals": {
"start_date": args.start_date,
"end_date": args.end_date,
"data_dir": args.data_dir,
"relative_to": args.relative_to,
"resource_dir": args.resource_dir,
"nwmurl_file" : args.nwmurl_file
"start_date" : args.start_date,
"end_date" : args.end_date,
"data_dir" : args.data_dir,
"relative_to" : args.relative_to,
"resource_dir" : args.resource_dir,
"nwmurl_file" : args.nwmurl_file,
"nprocs" : args.nprocs
},
"subset": {
"id_type": args.subset_id_type,
"id": args.subset_id,
"version": args.hydrofabric_version
"id_type" : args.subset_id_type,
"id" : args.subset_id,
"version" : args.hydrofabric_version
}
}
return config
Expand All @@ -27,7 +28,7 @@ def write_json(conf, out_dir, name):
json.dump(conf, fp, indent=2)
return conf_path

def create_conf_fp(start,end,ii_retro):
def create_conf_fp(start,end,ii_retro,nprocs):
if ii_retro:
filename = "retro_filenamelist.txt"
else:
Expand All @@ -49,8 +50,8 @@ def create_conf_fp(start,end,ii_retro):
"run" : {
"verbose" : True,
"collect_stats" : True,
"proc_process" : int(os.cpu_count() * 0.8),
"write_process" : os.cpu_count()
"proc_process" : min(int(os.cpu_count() * 0.8),nprocs),
"write_process" : min(os.cpu_count(),nprocs)
}
}

Expand Down Expand Up @@ -106,7 +107,7 @@ def create_confs(conf):
nwm_conf['end_date'] = end

ii_retro = nwm_conf['forcing_type'] == 'retrospective'
fp_conf = create_conf_fp(start, end, ii_retro)
fp_conf = create_conf_fp(start, end, ii_retro,conf['globals']['nprocs'])
conf['nwmurl'] = nwm_conf
conf['forcingprocessor'] = nwm_conf

Expand Down Expand Up @@ -149,6 +150,7 @@ def create_confs(conf):
parser.add_argument("--subset-id", help="Set the subset ID")
parser.add_argument("--hydrofabric-version", help="Set the Hydrofabric version")
parser.add_argument("--nwmurl_file", help="Provide an optional nwmurl file")
parser.add_argument("--nprocs", type=int,help="Maximum number of processes to use")

args = parser.parse_args()

Expand Down
25 changes: 18 additions & 7 deletions scripts/stream.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ usage() {
echo " -i, --SUBSET_ID_TYPE <Hydrofabric id type> "
echo " -I, --SUBSET_ID <Hydrofabric id to subset> "
echo " -v, --HYDROFABRIC_VERSION <Hydrofabric version> "
echo " -n, --NPROCS <Process limit> "
exit 1
}

Expand All @@ -44,6 +45,7 @@ SUBSET_ID_TYPE=""
SUBSET_ID=""
HYDROFABRIC_VERSION=""
CONF_FILE=""
NPROCS=""

while [ "$#" -gt 0 ]; do
case "$1" in
Expand All @@ -57,16 +59,23 @@ while [ "$#" -gt 0 ]; do
-I|--SUBSET_ID) SUBSET_ID="$2"; shift 2;;
-v|--HYDROFABRIC_VERSION) HYDROFABRIC_VERSION="$2"; shift 2;;
-c|--CONF_FILE) CONF_FILE="$2"; shift 2;;
-n|--NPROCS) NPROCS="$2"; shift 2;;
*) usage;;
esac
done

if [ -n "$NPROCS" ]; then
NPROCS=$(nproc) - 2
fi

if [ -n "$CONF_FILE" ]; then
echo "Configuration option provided" $CONF_FILE
if [ -e "$CONF_FILE" ]; then
echo "Any variables defined in "$CONF_FILE" will override cli args"
echo "Using options:"
echo ""
cat $CONF_FILE
echo ""
echo ""
source "$CONF_FILE"
else
echo $CONF_FILE" not found!!"
Expand Down Expand Up @@ -108,17 +117,18 @@ else
fi
fi

echo "DATA_PATH: " $DATA_PATH

if [ ${#RELATIVE_TO} -gt 0 ] ; then
echo "Prepending ${RELATIVE_TO} to ${DATA_PATH#/}"
echo "Relative path provided. Prepending ${RELATIVE_TO} to ${DATA_PATH#/}"
DATA_PATH="${RELATIVE_TO%/}/${DATA_PATH%/}"
if [ -n "$RESOURCE_PATH" ]; then
echo "Prepending ${RELATIVE_TO} to ${RESOURCE_PATH#/}"
RESOURCE_PATH="${RELATIVE_TO%/}/${RESOURCE_PATH%/}"
fi
fi

echo "DATA_PATH: " $DATA_PATH
echo "RESOURCE_PATH: " $RESOURCE_PATH

if [ -e "$DATA_PATH" ]; then
echo "The path $DATA_PATH exists. Please delete it or set a different path."
exit 1
Expand Down Expand Up @@ -155,7 +165,7 @@ else
echo "Copying into current data path "$DATA_PATH
cp -r $RESOURCE_PATH $DATASTREAM_RESOURCES
else
echo $RESOURCE_PATH " provided doesn't exist!"
echo $RESOURCE_PATH " doesn't exist!"
fi
fi
fi
Expand Down Expand Up @@ -271,7 +281,7 @@ else
-u $(id -u):$(id -g) \
-w "$DOCKER_MOUNT" forcingprocessor \
python "$DOCKER_FP_PATH"weights_parq2json.py \
--gpkg $GEO_PATH_DOCKER --outname $WEIGHTS_DOCKER
--gpkg $GEO_PATH_DOCKER --outname $WEIGHTS_DOCKER --nprocs $NPROCS

WEIGHTS_FILE="${DATA%/}/${GEOPACKAGE#/}"

Expand All @@ -287,7 +297,8 @@ python3 $CONF_GENERATOR \
--subset-id-type "$SUBSET_ID_TYPE" \
--subset-id "$SUBSET_ID" \
--hydrofabric-version "$HYDROFABRIC_VERSION" \
--nwmurl_file "$NWMURL_CONF_PATH"
--nwmurl_file "$NWMURL_CONF_PATH" \
--nprocs "$NPROCS"

echo "Creating nwm filenames file"
docker run --rm -v "$DATA_PATH:"$DOCKER_MOUNT"" \
Expand Down

0 comments on commit dcb7eeb

Please sign in to comment.