Skip to content

Commit

Permalink
Viz EC2 Services to Lambda - Part 2 - Rapid Onset Flooding Probability (
Browse files Browse the repository at this point in the history
#558)

This PR migrates the ensemble-based Rapid Onset Flooding Probability
products from the Viz EC2 to the Viz Max Values lambda function (now
called Python Preprocessing). It broadly encapsulates the following
changes:

**Changes**
- New rapid_onset_flooding product script in the python_preprocessing
lambda function that supports both 12 hour SRF and 5 day GFS MRF
configurations.
- Removal of rapid_onset_flooding pipeline and product files in the
source-aws_loosa library

**Deployment Considerations:**
- Not sure if we should include this in the 2.1.4 release or not? I'm
good to test/fix quickly and thoroughly next week if you want to include
it. Otherwise, fine to go in the next one (wait to merge for now)
- We will need to include a ingest schema db dump when deploying to UAT.

---------

Co-authored-by: CoreyKrewson-NOAA <[email protected]>
  • Loading branch information
TylerSchrag-NOAA and CoreyKrewson-NOAA authored Dec 12, 2023
1 parent 578e439 commit 493f5f9
Show file tree
Hide file tree
Showing 32 changed files with 229 additions and 282 deletions.
Binary file added Core/LAMBDA/layers/dask.zip
Binary file not shown.
18 changes: 18 additions & 0 deletions Core/LAMBDA/layers/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ resource "aws_lambda_layer_version" "yaml" {
description = "Python yaml module"
}

################
## Dask Layer ##
################

resource "aws_lambda_layer_version" "dask" {
filename = "${path.module}/dask.zip"
source_code_hash = filebase64sha256("${path.module}/dask.zip")

layer_name = "hv-vpp-${var.environment}-dask"

compatible_runtimes = ["python3.9"]
description = "Python dask module"
}

#############
## Outputs ##
#############
Expand Down Expand Up @@ -316,3 +330,7 @@ output "requests" {
output "yaml" {
value = resource.aws_lambda_layer_version.yaml
}

output "dask" {
value = resource.aws_lambda_layer_version.yaml
}
Original file line number Diff line number Diff line change
Expand Up @@ -760,3 +760,10 @@ def gen_dict_extract(key, var):
for d in v:
for result in gen_dict_extract(key, d):
yield result

def organize_input_files(fileset_bucket, fileset, download_subfolder):
local_files = []
for file in fileset:
download_path = check_if_file_exists(fileset_bucket, file, download=True, download_subfolder=download_subfolder)
local_files.append(download_path)
return local_files
5 changes: 5 additions & 0 deletions Core/LAMBDA/viz_functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ variable "yaml_layer" {
type = string
}

variable "dask_layer" {
type = string
}

variable "viz_lambda_shared_funcs_layer" {
type = string
}
Expand Down Expand Up @@ -428,6 +432,7 @@ resource "aws_lambda_function" "viz_python_preprocessing" {
var.psycopg2_sqlalchemy_layer,
var.viz_lambda_shared_funcs_layer,
var.requests_layer,
var.dask_layer,
]

tags = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE IF EXISTS publish.mrf_gfs_5day_max_high_water_probability;
DROP TABLE IF EXISTS publish.mrf_gfs_5day_max_high_water_prob;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
Expand All @@ -16,6 +16,6 @@ SELECT
hwp.high_water_threshold,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.mrf_gfs_5day_max_high_water_probability
FROM ingest.mrf_gfs_5day_max_high_water_probability as hwp
INTO publish.mrf_gfs_5day_max_high_water_prob
FROM ingest.mrf_gfs_5day_max_high_water_prob as hwp
JOIN derived.channels_conus channels ON hwp.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
DROP TABLE IF EXISTS publish.mrf_gfs_5day_rof_prob;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
channels.name,
channels.strm_order,
channels.huc6,
channels.state,
rofp.nwm_vers,
rofp.reference_time,
rofp.rapid_onset_prob_day1,
rofp.rapid_onset_prob_day2,
rofp.rapid_onset_prob_day3,
rofp.rapid_onset_prob_day4_5,
rofp.rapid_onset_prob_all,
rf.high_water_threshold,
ST_LENGTH(channels.geom)*0.000621371 AS reach_length_miles,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.mrf_gfs_5day_rof_prob
FROM ingest.mrf_gfs_5day_rof_prob as rofp
JOIN derived.channels_conus channels ON rofp.feature_id = channels.feature_id
JOIN derived.recurrence_flows_conus rf ON rofp.feature_id = rf.feature_id;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE IF EXISTS publish.srf_12hr_max_high_water_probability;
DROP TABLE IF EXISTS publish.srf_12hr_max_high_water_prob;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
Expand All @@ -12,6 +12,6 @@ SELECT
hwp.high_water_threshold,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.srf_12hr_max_high_water_probability
FROM ingest.srf_12hr_max_high_water_probability as hwp
INTO publish.srf_12hr_max_high_water_prob
FROM ingest.srf_12hr_max_high_water_prob as hwp
JOIN derived.channels_conus channels ON hwp.feature_id = channels.feature_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
DROP TABLE IF EXISTS publish.srf_12hr_rof_prob;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
channels.name,
channels.strm_order,
channels.huc6,
channels.state,
rofp.nwm_vers,
rofp.reference_time,
rofp.rapid_onset_prob_1_6,
rofp.rapid_onset_prob_7_12,
rofp.rapid_onset_prob_all,
rf.high_water_threshold,
ST_LENGTH(channels.geom)*0.000621371 AS reach_length_miles,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.srf_12hr_rof_prob
FROM ingest.srf_12hr_rof_prob as rofp
JOIN derived.channels_conus channels ON rofp.feature_id = channels.feature_id
JOIN derived.recurrence_flows_conus rf ON rofp.feature_id = rf.feature_id;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- HUC8 Hotpsot Layer
DROP TABLE IF EXISTS publish.mrf_gfs_5day_max_high_water_probability_hucs;
DROP TABLE IF EXISTS publish.mrf_gfs_5day_max_high_water_prob_hucs;
SELECT
hucs.huc8,
TO_CHAR(hucs.huc8, 'fm00000000') AS huc8_str,
Expand All @@ -9,8 +9,8 @@ SELECT
to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS reference_time,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
hucs.geom
INTO publish.mrf_gfs_5day_max_high_water_probability_hucs
INTO publish.mrf_gfs_5day_max_high_water_prob_hucs
FROM derived.huc8s_conus AS hucs
JOIN derived.featureid_huc_crosswalk AS crosswalk ON hucs.huc8 = crosswalk.huc8
JOIN publish.mrf_gfs_5day_max_high_water_probability AS hwp ON crosswalk.feature_id = hwp.feature_id
GROUP BY hucs.huc8, total_nwm_features, hucs.geom
JOIN publish.mrf_gfs_5day_max_high_water_prob AS hwp ON crosswalk.feature_id = hwp.feature_id
GROUP BY hucs.huc8, hucs.total_nwm_features, hucs.geom;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE IF EXISTS publish.mrf_gfs_5day_rapid_onset_flooding_probability_hucs;
DROP TABLE IF EXISTS publish.mrf_gfs_5day_rof_prob_hucs;
SELECT
hucs.huc8,
TO_CHAR(hucs.huc8, 'fm00000000') AS huc8_str,
Expand All @@ -13,8 +13,8 @@ SELECT
to_char(CAST(max(rofp.reference_time) AS timestamp) , 'YYYY-MM-DD HH24:MI:SS UTC') AS reference_time,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
hucs.geom
INTO publish.mrf_gfs_5day_rapid_onset_flooding_probability_hucs
INTO publish.mrf_gfs_5day_rof_prob_hucs
FROM derived.huc8s_conus AS hucs
JOIN derived.featureid_huc_crosswalk AS crosswalk ON hucs.huc8 = crosswalk.huc8
JOIN publish.mrf_gfs_5day_rapid_onset_flooding_probability AS rofp ON crosswalk.feature_id = rofp.feature_id
JOIN publish.mrf_gfs_5day_rof_prob AS rofp ON crosswalk.feature_id = rofp.feature_id
GROUP BY hucs.huc8, hucs.low_order_reach_count, hucs.total_low_order_reach_length, hucs.total_low_order_reach_miles, hucs.geom
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- HUC10 Hotpsot Layer
DROP TABLE IF EXISTS publish.srf_12hr_max_high_water_probability_hucs;
DROP TABLE IF EXISTS publish.srf_12hr_max_high_water_prob_hucs;
SELECT
hucs.huc10,
TO_CHAR(hucs.huc10, 'fm0000000000') AS huc10_str,
Expand All @@ -9,8 +9,8 @@ SELECT
to_char('1900-01-01 00:00:00'::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS reference_time,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
hucs.geom
INTO publish.srf_12hr_max_high_water_probability_hucs
INTO publish.srf_12hr_max_high_water_prob_hucs
FROM derived.huc10s_conus AS hucs
JOIN derived.featureid_huc_crosswalk AS crosswalk ON hucs.huc10 = crosswalk.huc10
JOIN publish.srf_12hr_max_high_water_probability AS hwp ON crosswalk.feature_id = hwp.feature_id
GROUP BY hucs.huc10, total_nwm_features, hucs.geom
JOIN publish.srf_12hr_max_high_water_prob AS hwp ON crosswalk.feature_id = hwp.feature_id
GROUP BY hucs.huc10, hucs.total_nwm_features, hucs.geom;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP TABLE IF EXISTS publish.srf_12hr_rapid_onset_flooding_probability_hucs;
DROP TABLE IF EXISTS publish.srf_12hr_rof_prob_hucs;
SELECT
hucs.huc10,
TO_CHAR(hucs.huc10, 'fm0000000000') AS huc10_str,
Expand All @@ -13,8 +13,8 @@ SELECT
to_char(CAST(max(rofp.reference_time) AS timestamp) , 'YYYY-MM-DD HH24:MI:SS UTC') AS reference_time,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
hucs.geom
INTO publish.srf_12hr_rapid_onset_flooding_probability_hucs
INTO publish.srf_12hr_rof_prob_hucs
FROM derived.huc10s_conus AS hucs
JOIN derived.featureid_huc_crosswalk AS crosswalk ON hucs.huc10 = crosswalk.huc10
JOIN publish.srf_12hr_rapid_onset_flooding_probability AS rofp ON crosswalk.feature_id = rofp.feature_id
JOIN publish.srf_12hr_rof_prob AS rofp ON crosswalk.feature_id = rofp.feature_id
GROUP BY hucs.huc10, hucs.low_order_reach_count, hucs.total_low_order_reach_length, hucs.total_low_order_reach_miles, hucs.geom
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def __init__(self, start_event, print_init=True):
self.organize_rename_dict() #This method organizes input table metadata based on the admin.pipeline_data_flows db table, and updates the sql_rename_dict dictionary if/when needed for past events.
for word, replacement in self.sql_rename_dict.items():
self.configuration.configuration_data_flow = json.loads(json.dumps(self.configuration.configuration_data_flow).replace(word, replacement))
self.configuration.db_ingest_groups = json.loads(json.dumps(self.configuration.db_ingest_groups).replace(word, replacement))
self.pipeline_products = json.loads(json.dumps(self.pipeline_products).replace(word, replacement))
self.sql_rename_dict.update({'1900-01-01 00:00:00': self.reference_time.strftime("%Y-%m-%d %H:%M:%S")}) #Add a reference time for placeholders in sql files

Expand Down Expand Up @@ -464,7 +465,7 @@ def generate_ingest_groups_file_list(self, file_groups, data_origin="raw"):
ingest_file = target_table_metadata["s3_keys"][0]
if "rnr" in ingest_file:
bucket=os.environ['RNR_DATA_BUCKET']
elif "max" in ingest_file:
elif "viz_ingest" in ingest_file:
bucket=os.environ['PYTHON_PREPROCESSING_BUCKET']
else:
bucket = self.input_bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ python_preprocessing:
file_window: None
product: high_water_probability
output_file: viz_ingest/medium_range_mem1/{{datetime:%Y%m%d}}/{{datetime:%H}}_mrf_gfs_5day_max_high_water_probability.csv
target_table: ingest.mrf_gfs_5day_max_high_water_probability
target_table: ingest.mrf_gfs_5day_max_high_water_prob
target_keys: (feature_id)

postprocess_sql:
- sql_file: mrf_gfs_5day_max_high_water_probability
target_table: publish.mrf_gfs_5day_max_high_water_probability
target_table: publish.mrf_gfs_5day_max_high_water_prob

product_summaries:
- sql_file: hucs
target_table:
- publish.mrf_gfs_5day_max_high_water_probability_hucs
- publish.mrf_gfs_5day_max_high_water_prob_hucs

services:
- mrf_gfs_5day_max_high_water_probability
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
product: mrf_gfs_5day_rapid_onset_flooding_probability
configuration: medium_range_mem1
product_type: "vector"
run: true

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/medium_range_mem{{range:1,7,1,%01d}}/nwm.t{{datetime:%H}}z.medium_range.channel_rt_{{range:1,7,1,%01d}}.f{{range:3,121,3,%03d}}.conus.nc
file_step: None
file_window: None
product: rapid_onset_flooding_probability
output_file: viz_ingest/medium_range_mem1/{{datetime:%Y%m%d}}/{{datetime:%H}}_mrf_gfs_5day_rapid_onset_flooding_probability.csv
target_table: ingest.mrf_gfs_5day_rof_prob
target_keys: (feature_id)

postprocess_sql:
- sql_file: mrf_gfs_5day_rapid_onset_flooding_probability
target_table: publish.mrf_gfs_5day_rof_prob

product_summaries:
- sql_file: hucs
target_table:
- publish.mrf_gfs_5day_rof_prob_hucs

services:
- mrf_gfs_5day_rapid_onset_flooding_probability
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ python_preprocessing:
file_window: PT7H
product: high_water_probability
output_file: viz_ingest/short_range/{{datetime:%Y%m%d}}/{{datetime:%H}}_srf_12hr_max_high_water_probability.csv
target_table: ingest.srf_12hr_max_high_water_probability
target_table: ingest.srf_12hr_max_high_water_prob
target_keys: (feature_id)

postprocess_sql:
- sql_file: srf_12hr_max_high_water_probability
target_table: publish.srf_12hr_max_high_water_probability
target_table: publish.srf_12hr_max_high_water_prob

product_summaries:
- sql_file: hucs
target_table:
- publish.srf_12hr_max_high_water_probability_hucs
- publish.srf_12hr_max_high_water_prob_hucs

services:
- srf_12hr_max_high_water_probability
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
product: srf_12hr_rapid_onset_flooding_probability
configuration: short_range
product_type: "vector"
run: true

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/short_range/nwm.t{{datetime:%H}}z.short_range.channel_rt.f{{range:1,19,1,%03d}}.conus.nc
file_step: 1H
file_window: PT7H
product: rapid_onset_flooding_probability
output_file: viz_ingest/short_range/{{datetime:%Y%m%d}}/{{datetime:%H}}_srf_12hr_rapid_onset_flooding_probability.csv
target_table: ingest.srf_12hr_rof_prob
target_keys: (feature_id)

postprocess_sql:
- sql_file: srf_12hr_rapid_onset_flooding_probability
target_table: publish.srf_12hr_rof_prob

product_summaries:
- sql_file: hucs
target_table:
- publish.srf_12hr_rof_prob_hucs

services:
- srf_12hr_rapid_onset_flooding_probability
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%Day 1 Bankfull Probability (%)_6",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_3_to_24,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_probability",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_3_to_24,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_prob",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down Expand Up @@ -1174,7 +1174,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%Day 1 Bankfull Probability (%)_1_5",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_27_to_48,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_probability",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_27_to_48,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_prob",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down Expand Up @@ -1935,7 +1935,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%Day 1 Bankfull Probability (%)_1_2_4",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_51_to_72,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_probability",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_51_to_72,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_prob",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down Expand Up @@ -2696,7 +2696,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%Day 1 Bankfull Probability (%)_1_2_3_3",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_75_to_120, high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_probability",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_75_to_120, high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_prob",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down Expand Up @@ -3447,7 +3447,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%Day 1 Bankfull Probability (%)_1_2_3_1_1_2",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_3_to_120,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_probability",
"sqlQuery" : "select strm_order,name,huc6,nwm_vers,geom,hours_3_to_120,high_water_threshold,reference_time,feature_id_str AS feature_id,update_time,oid from hydrovis.services.mrf_gfs_5day_max_high_water_prob",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down Expand Up @@ -4182,7 +4182,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.hydrovis.%hucs",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select huc8_str, total_nwm_features, nwm_features_flooded_percent, avg_prob, reference_time, update_time, geom, oid from hydrovis.services.mrf_gfs_5day_max_high_water_probability_hucs",
"sqlQuery" : "select huc8_str, total_nwm_features, nwm_features_flooded_percent, avg_prob, reference_time, update_time, geom, oid from hydrovis.services.mrf_gfs_5day_max_high_water_prob_hucs",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down
Loading

0 comments on commit 493f5f9

Please sign in to comment.