Skip to content

Commit

Permalink
version for subsetting/weight generation
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Feb 12, 2024
1 parent 7c44910 commit 5e2d2e8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
17 changes: 13 additions & 4 deletions forcingprocessor/src/forcingprocessor/weights_parq2json.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
import concurrent.futures as cf
import json
import os, time, argparse
import os, argparse

import pyarrow as pa
import pyarrow.compute as pc
Expand All @@ -13,7 +13,7 @@ def get_weight_json(catchments,jproc):
ncatch = len(catchments)

w = pa.dataset.dataset(
's3://lynker-spatial/v20.1/forcing_weights.parquet', format='parquet'
f's3://lynker-spatial/{version}/forcing_weights.parquet', format='parquet'
).filter(
pc.field('divide_id').isin(catchments)
).to_batches()
Expand All @@ -37,16 +37,25 @@ def get_weight_json(catchments,jproc):
parser = argparse.ArgumentParser()
parser.add_argument('--gpkg', dest="geopackage", type=str, help="Path to geopackage file",default = None)
parser.add_argument('--outname', dest="weights_filename", type=str, help="Filename for the weight file")
parser.add_argument('--version', dest="version", type=str, help="Hydrofabric version e.g. \"v21\"")
args = parser.parse_args()

global version
version = args.version

weight_versions = ["v20.1"]
if version not in weight_versions:
raise Exception(f'version must one of: {weight_versions}')

if args.geopackage is None:
# go for conus
uri = "s3://lynker-spatial/v20.1/forcing_weights.parquet"
uri = f"s3://lynker-spatial/{version}/forcing_weights.parquet"
weights_df = pd.read_parquet(uri)
catchment_list = list(weights_df.divide_id.unique())
del weights_df
else:
import geopandas as gpd
gpd.options.io_engine = "pyogrio"
catchments = gpd.read_file(args.geopackage, layer='divides')
catchment_list = sorted(list(catchments['divide_id']))

Expand All @@ -63,7 +72,7 @@ def get_weight_json(catchments,jproc):
i = k

with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
results = pool.map(get_weight_json, catchment_list_list,[x for x in range(nprocs)])
results = pool.map(get_weight_json,catchment_list_list,[x for x in range(nprocs)])

weights = {}
for jweights in results:
Expand Down
30 changes: 19 additions & 11 deletions scripts/stream.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ SUBSET_ID=""
HYDROFABRIC_VERSION=""
CONF_FILE=""


while [ "$#" -gt 0 ]; do
case "$1" in
-s|--start-date) START_DATE="$2"; shift 2;;
Expand Down Expand Up @@ -74,6 +73,15 @@ else
echo "No configuration file detected, using cli args"
fi

if [ -n "$SUBSET_ID" ]; then
if [ $HYDROFABRIC_VERSION == "v20.1" ]; then
:
else
echo "Subsetting and weight generation are not supported for hydrofabric versions less than v20.1, set to v20.1"
exit
fi
fi

DATE=$(env TZ=US/Eastern date +'%Y%m%d')
if [ $START_DATE == "DAILY" ]; then
if [ -z $END_DATE ]; then
Expand All @@ -82,7 +90,7 @@ if [ $START_DATE == "DAILY" ]; then
DATA_PATH="${PACAKGE_DIR%/}/data/$END_DATE"
fi
if [ -n $S3_MOUNT ]; then
S3_OUT="$S3_MOUNT/daily"
S3_OUT="$S3_MOUNT/daily/$DATE"
fi
else
DATA_PATH="${PACAKGE_DIR%/}/data/$START_DATE-$END_DATE"
Expand Down Expand Up @@ -150,9 +158,9 @@ fi

WEIGHTS_PATH=$(find "$DATASTREAM_RESOURCES" -type f -name "*weights*")
GEOPACKAGE_RESOURCES_PATH=$(find "$DATASTREAM_RESOURCES" -type f -name "*.gpkg")
PARTITON="partitions_$(grep -c ^processor /proc/cpuinfo).json"
PARTITION_RESOURCES_PATH=$(find "$DATASTREAM_RESOURCES" -type f -name "*ngen.yaml")
if [ -z $PARTITION_RESOURCES_PATH ]; then
PARTITION_RESOURCES_PATH=$(find "$DATASTREAM_RESOURCES" -type f -name "partitions")
if [ -e "$PARTITION_RESOURCES_PATH" ]; then
echo "Found $PARTITION_RESOURCES_PATH, copying to $$PARTITION_NGENRUN_PATH"
cp $PARTITION_RESOURCES_PATH $PARTITION_NGENRUN_PATH
fi

Expand Down Expand Up @@ -206,7 +214,6 @@ DOCKER_RESOURCES="${DOCKER_MOUNT%/}/datastream-resources"
DOCKER_CONFIGS="${DOCKER_MOUNT%/}/datastream-configs"
DOCKER_FP_PATH="/ngen-datastream/forcingprocessor/src/forcingprocessor/"

# forcingprocessor
DOCKER_TAG="forcingprocessor"
FP_DOCKER="${DOCKER_DIR%/}/forcingprocessor"
build_docker_container "$DOCKER_TAG" "$FP_DOCKER"
Expand All @@ -229,6 +236,7 @@ else
--gpkg $GEO_PATH_DOCKER --outname $WEIGHTS_DOCKER

WEIGHTS_FILE="${DATA%/}/${GEOPACKAGE#/}"

fi

CONF_GENERATOR="$PACAKGE_DIR/python/configure-datastream.py"
Expand Down Expand Up @@ -265,19 +273,19 @@ docker run --rm -v "$NGEN_RUN_PATH":"$DOCKER_MOUNT" \
validator python $VALIDATOR \
--data_dir $DOCKER_MOUNT

# ngen run
echo "Running NextGen in AUTO MODE from CIROH-UA/NGIAB-CloudInfra"
docker run --rm -v "$NGEN_RUN_PATH":"$DOCKER_MOUNT" awiciroh/ciroh-ngen-image:latest-local "$DOCKER_MOUNT" auto

echo "$NGEN_RUN_PATH"/*.csv | xargs mv -t $NGEN_OUTPUT_PATH --

# hashing
docker run --rm -v "$DATA_PATH":"$DOCKER_MOUNT" zwills/merkdir /merkdir/merkdir gen -o $DOCKER_MOUNT/merkdir.file $DOCKER_MOUNT

TAR_NAME="ngen-run.tar.gz"
TAR_PATH="${DATA_PATH%/}/$TAR_NAME"
tar -cf - $NGEN_RUN_PATH | pigz > $TAR_PATH

mv $DATASTREAM_RESOURCES "../datastream-resources-$DATE"
cp $TAR_PATH $S3_OUT

if [ -n $S3_MOUNT ]; then
cp -r $DATA_PATH $S3_OUT
echo "ngen-datastream run complete! Data exists here: $S3_OUT"
echo "and here: $DATA_PATH"

0 comments on commit 5e2d2e8

Please sign in to comment.