Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached FIM - Part 1a - Initial VPP Workflow Implementation #604

Merged
merged 21 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8c74350
Lambda Shared Funcs - Update to Database class to support Redshift co…
TylerSchrag-NOAA Nov 21, 2023
bcae929
Updates to postprocess_sql to support new fim caching queries (still …
TylerSchrag-NOAA Nov 21, 2023
4c8db8b
FIM data prep lambda function - Updates so support caching (still nee…
TylerSchrag-NOAA Nov 21, 2023
74fb4aa
HAND FIM Processsing Lambda - Updates to support new caching schemas.
TylerSchrag-NOAA Nov 21, 2023
039b142
BIG Update - Making columns in max flows tables generic, instead of l…
TylerSchrag-NOAA Nov 21, 2023
65e7d39
Additional Max Flows Changes
TylerSchrag-NOAA Nov 21, 2023
090bf19
Lambda Shared Funcs Layer - Case fix to support Redshift connections
TylerSchrag-NOAA Dec 5, 2023
9a87271
More viz_db_postprocess_sql logic changes / clean up to support step …
TylerSchrag-NOAA Dec 5, 2023
2377211
fim data prep - more tweaks / clean up for cached fim.
TylerSchrag-NOAA Dec 5, 2023
4869d13
Fixes to new generalized max_flows field name.
TylerSchrag-NOAA Dec 5, 2023
29fd4a2
viz initialize pipeline - inundation product configs - new domain flo…
TylerSchrag-NOAA Dec 5, 2023
ed91adf
Adding connection.close on db connectios from viz_db_ingest lambda fu…
TylerSchrag-NOAA Dec 7, 2023
2f3d740
viz_postprocess_sql fixes, including moving fim_flows folder from fim…
TylerSchrag-NOAA Dec 7, 2023
8b6cf62
Big refactor of fim_data_prep lambda function, including deletion of …
TylerSchrag-NOAA Dec 7, 2023
e8095fd
Big refactor / simplification to fim_data_prep lambda function / remo…
TylerSchrag-NOAA Dec 12, 2023
87f104a
Some tweaks to postprocess_sql lambda function (still needs some work…
TylerSchrag-NOAA Dec 12, 2023
50a412c
Bug/typo fix to the lambda layers output.
TylerSchrag-NOAA Dec 13, 2023
c3cfcd1
Postprocess SQL Lambda clean-up and documentation for new FIM workflows
TylerSchrag-NOAA Dec 13, 2023
ca3308e
Bug fix: Properly cache reaches that have no valid rating curve step.
TylerSchrag-NOAA Dec 19, 2023
015c2fc
Adding some extra documentation to cached fim template sql files.
TylerSchrag-NOAA Dec 20, 2023
31a6a1c
Viz Pipeline Step Function Logic Update
TylerSchrag-NOAA Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Core/LAMBDA/layers/main.tf
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong.

Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,5 @@ output "requests" {
}

output "yaml" {
value = resource.aws_lambda_layer_version.yaml
value = resource.aws_lambda_layer_version.dask
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def get_db_engine(self):
def get_db_connection(self, asynchronous=False):
import psycopg2
db_host, db_name, db_user, db_password = self.get_db_credentials()
connection = psycopg2.connect(f"host={db_host} dbname={db_name} user={db_user} password={db_password}", async_=asynchronous)
port = 5439 if self.type == "REDSHIFT" else 5432
connection = psycopg2.connect(f"host={db_host} dbname={db_name} user={db_user} password={db_password} port={port}", async_=asynchronous)
print(f"***> Established db connection to: {db_host} from {inspect.stack()[1].function}()")
return connection

Expand Down Expand Up @@ -387,6 +388,8 @@ def get_s3_prefix(configuration_name, date):
prefix = f"max_stage/ahps/{date}/"
else:
nwm_dataflow_version = os.environ.get("NWM_DATAFLOW_VERSION") if os.environ.get("NWM_DATAFLOW_VERSION") else "prod"
if configuration_name == 'medium_range_ensemble':
configuration_name == 'medium_range_mem6'
prefix = f"common/data/model/com/nwm/{nwm_dataflow_version}/nwm.{date}/{configuration_name}/"

return prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def lambda_handler(event, context):
branch = huc8_branch.split("-")[1]
s3_path_piece = ''

# Get db table names and setup db connection
print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
db_schema = db_fim_table.split(".")[0]
db_table = db_fim_table.split(".")[-1]
if any(x in db_schema for x in ["aep", "fim_catchments", "catfim"]):
process_db = database(db_type="egis")
else:
process_db = database(db_type="viz")

if "catchments" in db_fim_table:
df_inundation = create_inundation_catchment_boundary(huc8, branch)
else:
Expand All @@ -76,12 +85,19 @@ def lambda_handler(event, context):
rating_curve_exists = s3_file(FIM_BUCKET, rating_curve_key).check_existence()

stage_lookup = pd.DataFrame()
df_zero_stage_records = pd.DataFrame()
if catch_exists and hand_exists and rating_curve_exists:
print("->Calculating flood depth")
stage_lookup = calculate_stage_values(rating_curve_key, data_bucket, subsetted_data, huc8_branch) # get stages
stage_lookup, df_zero_stage_records = calculate_stage_values(rating_curve_key, data_bucket, subsetted_data, huc8_branch) # get stages
else:
print(f"catchment, hand, or rating curve are missing for huc {huc8} and branch {branch}:\nCatchment exists: {catch_exists} ({catchment_key})\nHand exists: {hand_exists} ({hand_key})\nRating curve exists: {rating_curve_exists} ({rating_curve_key})")


# Upload zero_stage reaches for tracking / FIM cache
print(f"Adding zero stage data to {db_table}_zero_stage")# Only process inundation configuration if available data
df_zero_stage_records['branch'] = int(branch)
df_zero_stage_records['huc8'] = int(huc8)
df_zero_stage_records.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=True)

# If no features with above zero stages are present, then just copy an unflood raster instead of processing nothing
if stage_lookup.empty:
print("No reaches with valid stages")
Expand All @@ -90,17 +106,27 @@ def lambda_handler(event, context):
# Run the desired configuration
df_inundation = create_inundation_output(huc8, branch, stage_lookup, reference_time, input_variable)

# Split geometry into seperate table per new schema
df_inundation_geo = df_inundation[['hydro_id', 'feature_id', 'huc8', 'branch', 'rc_stage_ft', 'geom']]
df_inundation.drop(columns=['geom'], inplace=True)

# If records exist in stage_lookup that don't exist in df_inundation, add those to the zero_stage table.
df_no_inundation = stage_lookup.merge(df_inundation.drop_duplicates(), on=['feature_id','hydro_id'],how='left',indicator=True)
df_no_inundation = df_no_inundation.loc[df_no_inundation['_merge'] == 'left_only']
if df_no_inundation.empty == False:
df_no_inundation.drop(df_no_inundation.columns.difference(['hydro_id','feature_id','huc8','branch','rc_discharge_cms','note']), axis=1, inplace=True)
df_no_inundation['branch'] = int(branch)
df_no_inundation['huc8'] = int(huc8)
df_no_inundation['note'] = "Error - No inundation returned from hand processing."
df_no_inundation.to_sql(f"{db_table}_zero_stage", con=process_db.engine, schema=db_schema, if_exists='append', index=False)
# If no records exist for valid inundation, stop.
if df_inundation.empty:
return

print(f"Adding data to {db_fim_table}")# Only process inundation configuration if available data
db_schema = db_fim_table.split(".")[0]
db_table = db_fim_table.split(".")[-1]

try:
if any(x in db_schema for x in ["aep", "fim_catchments", "catfim"]):
process_db = database(db_type="egis")
else:
process_db = database(db_type="viz")

df_inundation.to_postgis(db_table, con=process_db.engine, schema=db_schema, if_exists='append')
df_inundation.to_sql(db_table, con=process_db.engine, schema=db_schema, if_exists='append', index=False)
df_inundation_geo.to_postgis(f"{db_table}_geo", con=process_db.engine, schema=db_schema, if_exists='append')
except Exception as e:
process_db.engine.dispose()
raise Exception(f"Failed to add inundation data to DB for {huc8}-{branch} - ({e})")
Expand Down Expand Up @@ -265,7 +291,9 @@ def create_inundation_output(huc8, branch, stage_lookup, reference_time, input_v
catchment_nodata = int(catchment_dataset.nodata) # get no_data value for catchment raster
valid_catchments = stage_lookup.index.tolist() # parse lookup to get features with >0 stages # noqa
hydroids = stage_lookup.index.tolist() # parse lookup to get all features
stages = stage_lookup['stage_m'].tolist() # parse lookup to get all stages

# Notable FIM Caching Change: Now using rc_stage_m (the upper value of the current hydrotable interval), instead of the interpolated stage, for inundation extent generation.
stages = stage_lookup['rc_stage_m'].tolist() # parse lookup to get all stages

k = np.array(hydroids) # Create a feature numpy array from the list
v = np.array(stages) # Create a stage numpy array from the list
Expand Down Expand Up @@ -412,25 +440,32 @@ def process(window):
print("dropping duplicates")
df_final = df_final.drop_duplicates()

print("Converting m columns to ft")
df_final['rc_stage_ft'] = (df_final['rc_stage_m'] * 3.28084).astype(int)
df_final['rc_previous_stage_ft'] = round(df_final['rc_previous_stage_m'] * 3.28084, 2)
df_final['rc_discharge_cfs'] = round(df_final['rc_discharge_cms'] * 35.315, 2)
df_final['rc_previous_discharge_cfs'] = round(df_final['rc_previous_discharge_cms'] * 35.315, 2)
df_final = df_final.drop(columns=["rc_stage_m", "rc_previous_stage_m", "rc_discharge_cms", "rc_previous_discharge_cms"])

print("Adding additional metadata columns")
df_final = df_final.reset_index()
df_final = df_final.rename(columns={"index": "hydro_id"})
df_final['fim_version'] = FIM_VERSION
df_final['reference_time'] = reference_time
df_final['huc8'] = huc8
df_final['branch'] = branch
df_final['fim_stage_ft'] = round(df_final['stage_m'] * 3.28084, 2)
df_final['hydro_id_str'] = df_final['hydro_id'].astype(str)
df_final['feature_id_str'] = df_final['feature_id'].astype(str)
df_final['forecast_stage_ft'] = round(df_final['stage_m'] * 3.28084, 2)
df_final['prc_method'] = 'HAND_Processing'

#TODO: Check with Shawn on the whole stage configuration / necessarry changes
if input_variable == 'stage':
drop_columns = ['stage_m', 'huc8_branch', 'huc']
else:
df_final['max_rc_stage_ft'] = df_final['max_rc_stage_m'] * 3.28084
df_final['max_rc_stage_ft'] = df_final['max_rc_stage_ft'].astype(int)
df_final['streamflow_cfs'] = round(df_final['streamflow_cms'] * 35.315, 2)
df_final['forecast_discharge_cfs'] = round(df_final['discharge_cms'] * 35.315, 2)
df_final['max_rc_discharge_cfs'] = round(df_final['max_rc_discharge_cms'] * 35.315, 2)
drop_columns = ["stage_m", "max_rc_stage_m", "streamflow_cms", "max_rc_discharge_cms"]
drop_columns = ["stage_m", "max_rc_stage_m", "discharge_cms", "max_rc_discharge_cms", ]

df_final = df_final.drop(columns=drop_columns)

Expand All @@ -449,50 +484,74 @@ def s3_csv_to_df(bucket, key):

def calculate_stage_values(hydrotable_key, subsetted_streams_bucket, subsetted_streams, huc8_branch):
"""
Converts streamflow values to stage using the rating curve and linear interpolation because rating curve intervals
Converts discharge (streamflow) values to stage using the rating curve and linear interpolation because rating curve intervals

Arguments:
local_hydrotable (str): Path to local copy of the branch hydrotable
df_nwm (DataFrame): A pandas dataframe with columns for feature id and desired streamflow column
df_nwm (DataFrame): A pandas dataframe with columns for feature id and desired discharge column

Returns:
stage_dict (dict): A dictionary with the hydroid as the key and interpolated stage as the value
"""
df_hydro = s3_csv_to_df(FIM_BUCKET, hydrotable_key)
df_hydro = df_hydro[['HydroID', 'feature_id', 'stage', 'discharge_cms', 'LakeID']]

df_hydro_max = df_hydro.sort_values('stage').groupby('HydroID').tail(1)
df_hydro_max = df_hydro_max.set_index('HydroID')
df_hydro_max = df_hydro_max[['stage', 'discharge_cms']].rename(columns={'stage': 'max_rc_stage_m', 'discharge_cms': 'max_rc_discharge_cms'})
df_hydro = df_hydro.rename(columns={'HydroID': 'hydro_id', 'stage': 'stage_m'})

df_hydro_max = df_hydro.sort_values('stage_m').groupby('hydro_id').tail(1)
df_hydro_max = df_hydro_max.set_index('hydro_id')
df_hydro_max = df_hydro_max[['stage_m', 'discharge_cms']].rename(columns={'stage_m': 'max_rc_stage_m', 'discharge_cms': 'max_rc_discharge_cms'})

df_forecast = s3_csv_to_df(subsetted_streams_bucket, subsetted_streams)
df_forecast = df_forecast.loc[df_forecast['huc8_branch']==huc8_branch]
df_forecast['stage_m'] = df_forecast.apply(lambda row : interpolate_stage(row, df_hydro), axis=1)
df_forecast = df_forecast.rename(columns={'streamflow_cms': 'discharge_cms'}) #TODO: Change the output CSV to list discharge instead of streamflow for consistency?
df_forecast[['stage_m', 'rc_stage_m', 'rc_previous_stage_m', 'rc_discharge_cms', 'rc_previous_discharge_cms']] = df_forecast.apply(lambda row : interpolate_stage(row, df_hydro), axis=1).apply(pd.Series)

df_forecast.drop(columns=['huc8_branch', 'huc'], inplace=True)
df_forecast = df_forecast.set_index('hydro_id')

print(f"Removing {len(df_forecast[df_forecast['stage_m'].isna()])} reaches with a NaN interpolated stage")
df_zero_stage = df_forecast[df_forecast['stage_m'].isna()].copy()
df_zero_stage['note'] = "NaN Stage After Hydrotable Lookup"
df_forecast = df_forecast[~df_forecast['stage_m'].isna()]

print(f"Removing {len(df_forecast[df_forecast['stage_m']==0])} reaches with a 0 interpolated stage")
df_zero_stage = pd.concat([df_zero_stage, df_forecast[df_forecast['stage_m']==0].copy()], axis=0)
df_zero_stage['note'] = np.where(df_zero_stage.note.isnull(), "0 Stage After Hydrotable Lookup", np.NaN)
df_forecast = df_forecast[df_forecast['stage_m']!=0]

df_forecast = df_forecast.drop(columns=['huc8_branch', 'huc'])
df_forecast = df_forecast.set_index('hydro_id')
df_zero_stage.drop(columns=['discharge_cms', 'stage_m', 'rc_stage_m', 'rc_previous_stage_m', 'rc_previous_discharge_cms'], inplace=True)

df_forecast = df_forecast.join(df_hydro_max)
print(f"{len(df_forecast)} reaches will be processed")

return df_forecast
return df_forecast, df_zero_stage

def interpolate_stage(df_row, df_hydro):
hydro_id = df_row['hydro_id']
forecast = df_row['streamflow_cms']
forecast = df_row['discharge_cms']

if hydro_id not in df_hydro['HydroID'].values:
if hydro_id not in df_hydro['hydro_id'].values:
return np.nan

subet_hydro = df_hydro.loc[df_hydro['HydroID']==hydro_id, ['discharge_cms', 'stage']]
discharge = subet_hydro['discharge_cms'].values
stage = subet_hydro['stage'].values
# Filter the hydrotable to this hydroid and pull out discharge and stages into arrays
subset_hydro = df_hydro.loc[df_hydro['hydro_id']==hydro_id, ['discharge_cms', 'stage_m']]
discharges = subset_hydro['discharge_cms'].values
stages = subset_hydro['stage_m'].values

interpolated_stage = round(np.interp(forecast, discharge, stage), 2)
# Get the interpolated stage by using the discharge forecast value against the arrays
interpolated_stage = round(np.interp(forecast, discharges, stages), 2)

# Get the upper and lower values of the 1-ft hydrotable array that the current forecast / interpolated stage is at
hydrotable_index = np.searchsorted(discharges, forecast, side='right')

# If streamflow exceeds the rating curve max, just use the max value
if hydrotable_index >= len(stages):
hydrotable_index = hydrotable_index - 1

hydrotable_previous_index = hydrotable_index-1
rc_stage = stages[hydrotable_index]
rc_previous_stage = stages[hydrotable_previous_index]
rc_discharge = discharges[hydrotable_index]
rc_previous_discharge = discharges[hydrotable_previous_index]

return interpolated_stage
return interpolated_stage, rc_stage, rc_previous_stage, rc_discharge, rc_previous_discharge
2 changes: 1 addition & 1 deletion Core/LAMBDA/viz_functions/viz_db_ingest/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def lambda_handler(event, context):
except Exception as e:
print(f"Error: {e}")
raise e

connection.close()
dump_dict = {
"file": file,
"target_table": target_table,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-- This creates the four tables on a Redshift db needed for a cached fim pipeline run.
-- These four tables exist on both RDS and Redshift, so any changes here will need to be synced with the RDS version as well - 0b_rds_create_inundation_tables_if_not_exist.sql
CREATE TABLE IF NOT EXISTS {rs_fim_table}_flows
(
feature_id integer,
hydro_id integer,
huc8 INTEGER,
branch bigint,
reference_time text,
discharge_cms double precision,
discharge_cfs double precision,
prc_status text,
PRIMARY KEY("hydro_id", "feature_id", "huc8", "branch")
);

CREATE TABLE IF NOT EXISTS {rs_fim_table} (
hydro_id integer,
feature_id integer,
huc8 integer,
branch bigint,
forecast_discharge_cfs double precision,
forecast_stage_ft double precision,
rc_discharge_cfs double precision,
rc_previous_discharge_cfs double precision,
rc_stage_ft double precision,
rc_previous_stage_ft double precision,
max_rc_discharge_cfs double precision,
max_rc_stage_ft double precision,
fim_version text,
reference_time text,
prc_method text,
PRIMARY KEY("hydro_id", "feature_id", "huc8", "branch")
) DISTSTYLE AUTO;

CREATE TABLE IF NOT EXISTS {rs_fim_table}_geo (
hydro_id integer,
feature_id integer,
huc8 INTEGER,
branch bigint,
rc_stage_ft integer,
geom_part integer,
geom geometry
) DISTSTYLE AUTO;

CREATE TABLE IF NOT EXISTS {rs_fim_table}_zero_stage
(
feature_id integer,
hydro_id integer,
huc8 INTEGER,
branch bigint,
rc_discharge_cms double precision,
note text,
PRIMARY KEY("hydro_id", "feature_id", "huc8", "branch")
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-- This creates the four tables on a RDS db needed for a cached fim pipeline run.
-- These four tables exist on both RDS and Redshift, so any changes here will need to be synced with the Redshift version as well - 0a_redshift_create_inundation_tables_if_not_exist.sql
CREATE TABLE IF NOT EXISTS {db_fim_table}_flows
(
hydro_id integer,
feature_id integer,
huc8 integer,
branch bigint,
reference_time text,
discharge_cms double precision,
discharge_cfs double precision,
prc_status text
);

CREATE TABLE IF NOT EXISTS {db_fim_table}
(
hydro_id integer,
feature_id integer,
huc8 integer,
branch bigint,
forecast_discharge_cfs double precision,
forecast_stage_ft double precision,
rc_discharge_cfs double precision,
rc_previous_discharge_cfs double precision,
rc_stage_ft integer,
rc_previous_stage_ft double precision,
max_rc_stage_ft double precision,
max_rc_discharge_cfs double precision,
fim_version text,
reference_time text,
prc_method text
);

CREATE TABLE IF NOT EXISTS {db_fim_table}_geo (
hydro_id integer,
feature_id integer,
huc8 integer,
branch bigint,
rc_stage_ft integer,
geom_part integer,
geom geometry(geometry, 3857)
);

CREATE TABLE IF NOT EXISTS {db_fim_table}_zero_stage (
hydro_id integer,
feature_id integer,
huc8 integer,
branch bigint,
rc_discharge_cms double precision,
note text
);

-- Create a view that contains subdivided polygons in WKT text, for import into Redshift
CREATE OR REPLACE VIEW {db_fim_table}_geo_view AS
SELECT fim_subdivide.hydro_id,
fim_subdivide.feature_id,
fim_subdivide.huc8,
fim_subdivide.branch,
fim_subdivide.rc_stage_ft,
0 AS geom_part,
st_astext(fim_subdivide.geom) AS geom_wkt
FROM ( SELECT fim.hydro_id,
fim.feature_id,
fim.huc8,
fim.branch,
fim.rc_stage_ft,
st_subdivide(fim_geo.geom) AS geom
FROM {db_fim_table} fim
JOIN {db_fim_table}_geo fim_geo ON fim.hydro_id = fim_geo.hydro_id AND fim.feature_id = fim_geo.feature_id AND fim.huc8 = fim_geo.huc8 AND fim.branch = fim_geo.branch AND fim.rc_stage_ft = fim_geo.rc_stage_ft
WHERE fim.prc_method = 'HAND_Processing'::text) fim_subdivide;
Loading