Skip to content

Commit

Permalink
Viz EC2 Services to Lambda - Part 3a - Anomaly (#582)
Browse files Browse the repository at this point in the history
This PR migrates the Anomaly product from the Viz EC2 to the Viz Python
Preprocessing lambda function. It broadly encapsulates the following
changes:

Changes:
- New anomaly product script in the python_preprocessing lambda function
that supports both 7day and 14day configurations.
- Removal of anomaly pipeline and product files in the source-aws_loosa
library
- A second python_preprocessing lambda function, which uses the same
source code, but with more RAM (I still need to add logic to choose the
correct function, based on the anomaly config. This will be much easier
to do once we deploy this to TI next week, so I will do it then).

Deployment Considerations:
- I may need to resolve merge conflicts after we merge Part 2 - Rapid
Onset Flooding
- We will need to include a ingest schema db dump when deploying to UAT.

---------

Co-authored-by: CoreyKrewson-NOAA <[email protected]>
Co-authored-by: NickChadwick-NOAA <[email protected]>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent 493f5f9 commit f0c67c8
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 306 deletions.
69 changes: 64 additions & 5 deletions Core/LAMBDA/viz_functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,10 @@ resource "aws_s3_object" "python_preprocessing_zip_upload" {
source_hash = filemd5(data.archive_file.python_preprocessing_zip.output_path)
}

resource "aws_lambda_function" "viz_python_preprocessing" {
#########################
#### 2GB RAM Version ####
#########################
resource "aws_lambda_function" "viz_python_preprocessing_2GB" {
function_name = "hv-vpp-${var.environment}-viz-python-preprocessing"
description = "Lambda function to create max streamflow files for NWM data"
memory_size = 2048
Expand All @@ -410,6 +413,7 @@ resource "aws_lambda_function" "viz_python_preprocessing" {
environment {
variables = {
CACHE_DAYS = 1
AUTH_DATA_BUCKET = var.viz_authoritative_bucket
DATA_BUCKET_UPLOAD = var.fim_output_bucket
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
Expand All @@ -432,11 +436,62 @@ resource "aws_lambda_function" "viz_python_preprocessing" {
var.psycopg2_sqlalchemy_layer,
var.viz_lambda_shared_funcs_layer,
var.requests_layer,
var.dask_layer,
var.dask_layer
]

tags = {
"Name" = "hv-vpp-${var.environment}-viz-python-preprocessing"
"Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-2GB"
}
}

#########################
#### 10GB RAM Version ####
#########################
resource "aws_lambda_function" "viz_python_preprocessing_10GB" {
function_name = "hv-vpp-${var.environment}-viz-python-preprocessing"
description = "Lambda function to create max streamflow files for NWM data"
memory_size = 10240
ephemeral_storage {
size = 6656
}
timeout = 900

vpc_config {
security_group_ids = var.db_lambda_security_groups
subnet_ids = var.db_lambda_subnets
}

environment {
variables = {
CACHE_DAYS = 1
AUTH_DATA_BUCKET = var.viz_authoritative_bucket
DATA_BUCKET_UPLOAD = var.fim_output_bucket
VIZ_DB_DATABASE = var.viz_db_name
VIZ_DB_HOST = var.viz_db_host
VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"]
VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"]
NWM_DATAFLOW_VERSION = var.nwm_dataflow_version
}
}
s3_bucket = aws_s3_object.python_preprocessing_zip_upload.bucket
s3_key = aws_s3_object.python_preprocessing_zip_upload.key
source_code_hash = filebase64sha256(data.archive_file.python_preprocessing_zip.output_path)

runtime = "python3.9"
handler = "lambda_function.lambda_handler"

role = var.lambda_role

layers = [
var.xarray_layer,
var.psycopg2_sqlalchemy_layer,
var.viz_lambda_shared_funcs_layer,
var.requests_layer,
var.dask_layer
]

tags = {
"Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-2GB"
}
}

Expand Down Expand Up @@ -945,8 +1000,12 @@ module "image-based-lambdas" {
########################################################################################################################################
########################################################################################################################################

output "python_preprocessing" {
value = aws_lambda_function.viz_python_preprocessing
output "python_preprocessing_2GB" {
value = aws_lambda_function.viz_python_preprocessing_2GB
}

output "python_preprocessing_10GB" {
value = aws_lambda_function.viz_python_preprocessing_10GB
}

output "initialize_pipeline" {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
DROP TABLE IF EXISTS publish.ana_anomaly;
SELECT
channels.feature_id,
channels.feature_id::TEXT AS feature_id_str,
channels.strm_order,
channels.name,
channels.huc6,
anom_7d.nwm_vers,
anom_7d.reference_time,
anom_7d.reference_time AS valid_time,
anom_7d.average_flow_7day,
anom_7d.prcntle_5 as pctl_5_7d,
anom_7d.prcntle_10 as pctl_10_7d,
anom_7d.prcntle_25 as pctl_25_7d,
anom_7d.prcntle_75 as pctl_75_7d,
anom_7d.prcntle_90 as pctl_90_7d,
anom_7d.prcntle_95 as pctl_95_7d,
anom_7d.anom_cat_7day,
anom_14d.average_flow_14day,
anom_14d.prcntle_5 as pctl_5_14d,
anom_14d.prcntle_10 as pctl_10_14d,
anom_14d.prcntle_25 as pctl_25_14d,
anom_14d.prcntle_75 as pctl_75_14d,
anom_14d.prcntle_90 as pctl_90_14d,
anom_14d.prcntle_95 as pctl_95_14d,
anom_14d.anom_cat_14day,
ana.streamflow AS latest_flow,
to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time,
channels.geom
INTO publish.ana_anomaly
FROM derived.channels_conus AS channels
LEFT OUTER JOIN ingest.ana_7day_anomaly AS anom_7d ON channels.feature_id = anom_7d.feature_id
LEFT OUTER JOIN ingest.ana_14day_anomaly AS anom_14d ON channels.feature_id = anom_14d.feature_id
LEFT OUTER JOIN ingest.nwm_channel_rt_ana AS ana ON channels.feature_id = ana.feature_id
WHERE average_flow_7day IS NOT NULL OR average_flow_14day IS NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ def generate_python_preprocessing_file_list(self, file_groups):
python_preprocesing_ingest_sets = []
db_ingest_sets = []
for file_group in file_groups:
product = file_group['product']
product = file_group['product']
config = file_group['config'] if file_group.get('config') else None
output_file = file_group['output_file']

token_dict = get_file_tokens(output_file)
Expand All @@ -508,6 +509,7 @@ def generate_python_preprocessing_file_list(self, file_groups):
"fileset": python_preprocesing_file_set[0]['ingest_datasets'],
"fileset_bucket": python_preprocesing_file_set[0]['bucket'],
"product": product,
"config": config,
"output_file": formatted_output_file,
"output_file_bucket": os.environ['PYTHON_PREPROCESSING_BUCKET'],
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
product: ana_anomaly
configuration: analysis_assim
product_type: "vector"
run: true
run_times:
- '00:00'

python_preprocessing:
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_window: P7D
product: anomaly
config: 7
output_file: viz_ingest/analysis_assim/{{datetime:%Y%m%d}}/{{datetime:%H}}_ana_7day_anomaly.csv
target_table: ingest.ana_7day_anomaly
target_keys: (feature_id)
- file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc
file_step: 1H
file_window: P14D
product: anomaly
config: 14
output_file: viz_ingest/analysis_assim/{{datetime:%Y%m%d}}/{{datetime:%H}}_ana_14day_anomaly.csv
target_table: ingest.ana_14day_anomaly
target_keys: (feature_id)

postprocess_sql:
- sql_file: ana_anomaly
target_table: publish.ana_anomaly

services:
- ana_anomaly
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%7-Day Average Streamflow Percentile_1_1_1",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_7day, prcntle_5, prcntle_10, prcntle_25, prcntle_75, prcntle_90, prcntle_95, anom_cat_7day, latest_flow,reference_time,valid_time,update_time, oid from hydrovis.services.ana_anomaly",
"sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_7day, pctl_5_7d AS prcntle_5, pctl_10_7d AS prcntle_10, pctl_25_7d AS prcntle_25, pctl_75_7d AS prcntle_75, pctl_90_7d AS prcntle_90, pctl_95_7d AS prcntle_95, anom_cat_7day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly WHERE anom_cat_7day IS NOT NULL",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down Expand Up @@ -1404,7 +1404,7 @@
"workspaceFactory" : "SDE",
"dataset" : "hydrovis.services.%14-Day Average Streamflow Percentile_2_2_2",
"datasetType" : "esriDTFeatureClass",
"sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, prcntle_5, prcntle_10, prcntle_25, prcntle_75, prcntle_90, prcntle_95, average_flow_14day, anom_cat_14day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly",
"sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_14day, pctl_5_14d AS prcntle_5, pctl_10_14d AS prcntle_10, pctl_25_14d AS prcntle_25, pctl_75_14d AS prcntle_75, pctl_90_14d AS prcntle_90, pctl_95_14d AS prcntle_95, anom_cat_14day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly WHERE anom_cat_14day IS NOT NULL",
"srid" : "3857",
"spatialReference" : {
"wkid" : 102100,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import datetime
import os

from viz_lambda_shared_funcs import generate_file_list
from products.max_values import aggregate_max_to_file
from products.high_water_probability import run_high_water_probability
from products.rapid_onset_flooding_probability import run_rapid_onset_flooding_probability

from products.anomaly import run_anomaly

def lambda_handler(event, context):
"""
Expand Down Expand Up @@ -53,13 +54,17 @@ def lambda_handler(event, context):
output_file_bucket = event['args']['python_preprocessing']['output_file_bucket']


print(f"Creating {output_file}")
print(f"Running {product} code and creating {output_file}")
if product == "max_values":
aggregate_max_to_file(fileset_bucket, fileset, output_file_bucket, output_file)
elif product == "high_water_probability":
run_high_water_probability(reference_date, fileset_bucket, fileset, output_file_bucket, output_file)
elif product == "rapid_onset_flooding_probability":
run_rapid_onset_flooding_probability(reference_date, fileset_bucket, fileset, output_file_bucket, output_file)
elif product == "anomaly":
anomaly_config = event['args']['python_preprocessing']['config']
auth_data_bucket = os.environ['AUTH_DATA_BUCKET']
run_anomaly(reference_date, fileset_bucket, fileset, output_file_bucket, output_file, auth_data_bucket, anomaly_config=anomaly_config)

print(f"Successfully created {output_file} in {output_file_bucket}")

Expand Down
141 changes: 141 additions & 0 deletions Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import numpy as np
import pandas as pd
import xarray as xr
import tempfile
import boto3
import os
from viz_lambda_shared_funcs import check_if_file_exists, organize_input_files

INSUFFICIENT_DATA_ERROR_CODE = -9998
PERCENTILE_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_5th_perc.nc"
PERCENTILE_TABLE_10TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_10th_perc.nc"
PERCENTILE_TABLE_25TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_25th_perc.nc"
PERCENTILE_TABLE_75TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_75th_perc.nc"
PERCENTILE_TABLE_90TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_90th_perc.nc"
PERCENTILE_TABLE_95TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_95th_perc.nc"
PERCENTILE_14_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_5th_perc.nc"
PERCENTILE_14_TABLE_10TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_10th_perc.nc"
PERCENTILE_14_TABLE_25TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_25th_perc.nc"
PERCENTILE_14_TABLE_75TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_75th_perc.nc"
PERCENTILE_14_TABLE_90TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_90th_perc.nc"
PERCENTILE_14_TABLE_95TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_95th_perc.nc"

def run_anomaly(reference_time, fileset_bucket, fileset, output_file_bucket, output_file, auth_data_bucket, anomaly_config=7):
average_flow_col = f'average_flow_{anomaly_config}day'
anom_col = f'anom_cat_{anomaly_config}day'

##### Data Prep #####
if anomaly_config == 7:
download_subfolder = "7_day"
percentile_5 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_5TH, download=True, download_subfolder=download_subfolder)
percentile_10 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_10TH, download=True, download_subfolder=download_subfolder)
percentile_25 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_25TH, download=True, download_subfolder=download_subfolder)
percentile_75 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_75TH, download=True, download_subfolder=download_subfolder)
percentile_90 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_90TH, download=True, download_subfolder=download_subfolder)
percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_95TH, download=True, download_subfolder=download_subfolder)
elif anomaly_config == 14:
download_subfolder = "14_day"
percentile_5 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_5TH, download=True, download_subfolder=download_subfolder)
percentile_10 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_10TH, download=True, download_subfolder=download_subfolder)
percentile_25 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_25TH, download=True, download_subfolder=download_subfolder)
percentile_75 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_75TH, download=True, download_subfolder=download_subfolder)
percentile_90 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_90TH, download=True, download_subfolder=download_subfolder)
percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_95TH, download=True, download_subfolder=download_subfolder)
else:
raise Exception("Anomaly config must be 7 or 14 for the appropriate percentile files")

print("Downloading NWM Data")
input_files = organize_input_files(fileset_bucket, fileset, download_subfolder=reference_time.strftime('%Y%m%d'))

#Get NWM version from first file
with xr.open_dataset(input_files[0]) as first_file:
nwm_vers = first_file.NWM_version_number.replace("v","")

# Import Feature IDs
print("-->Looping through files to get streamflow sum")
df = pd.DataFrame()
for file in input_files:
ds_file = xr.open_dataset(file)
df_file = ds_file['streamflow'].to_dataframe()
df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs

if df.empty:
df = df_file
df = df.rename(columns={"streamflow": "streamflow_sum"})
else:
df['streamflow_sum'] += df_file['streamflow']
os.remove(file)

df[average_flow_col] = df['streamflow_sum'] / len(input_files)
df = df.drop(columns=['streamflow_sum'])
df[average_flow_col] = df[average_flow_col].round(2)

# Import Percentile Data
print("-->Importing percentile data:")

date = int(reference_time.strftime("%j")) - 1 # retrieves the date in integer form from reference_time

print(f"---->Retrieving {anomaly_config} day 5th percentiles...")
ds_perc = xr.open_dataset(percentile_5)
df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe()
df_perc = df_perc.rename(columns={"streamflow": "prcntle_5"})
df_perc['prcntle_5'] = (df_perc['prcntle_5'] * 35.3147).round(2) # convert streamflow from cms to cfs
df = df.join(df_perc)

print(f"---->Retrieving {anomaly_config} day 10th percentiles...")
ds_perc = xr.open_dataset(percentile_10)
df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe()
df_perc = df_perc.rename(columns={"streamflow": "prcntle_10"})
df_perc['prcntle_10'] = (df_perc['prcntle_10'] * 35.3147).round(2) # convert streamflow from cms to cfs
df = df.join(df_perc)

print(f"---->Retrieving {anomaly_config} day 25th percentiles...")
ds_perc = xr.open_dataset(percentile_25)
df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe()
df_perc = df_perc.rename(columns={"streamflow": "prcntle_25"})
df_perc['prcntle_25'] = (df_perc['prcntle_25'] * 35.3147).round(2) # convert streamflow from cms to cfs
df = df.join(df_perc)

print(f"---->Retrieving {anomaly_config} day 75th percentiles...")
ds_perc = xr.open_dataset(percentile_75)
df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe()
df_perc = df_perc.rename(columns={"streamflow": "prcntle_75"})
df_perc['prcntle_75'] = (df_perc['prcntle_75'] * 35.3147).round(2) # convert streamflow from cms to cfs
df = df.join(df_perc)

print(f"---->Retrieving {anomaly_config} day 90th percentiles...")
ds_perc = xr.open_dataset(percentile_90)
df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe()
df_perc = df_perc.rename(columns={"streamflow": "prcntle_90"})
df_perc['prcntle_90'] = (df_perc['prcntle_90'] * 35.3147).round(2) # convert streamflow from cms to cfs
df = df.join(df_perc)

print(f"---->Retrieving {anomaly_config} day 95th percentiles...")
ds_perc = xr.open_dataset(percentile_95)
df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe()
df_perc = df_perc.rename(columns={"streamflow": "prcntle_95"})
df_perc['prcntle_95'] = (df_perc['prcntle_95'] * 35.3147).round(2) # convert streamflow from cms to cfs
df = df.join(df_perc)

print("---->Creating percentile dictionary...")
df[anom_col] = np.nan
df.loc[(df[average_flow_col] >= df['prcntle_95']) & df[anom_col].isna(), anom_col] = "High (> 95th)"
df.loc[(df[average_flow_col] >= df['prcntle_90']) & df[anom_col].isna(), anom_col] = "Much Above Normal (91st - 95th)" # noqa: E501
df.loc[(df[average_flow_col] >= df['prcntle_75']) & df[anom_col].isna(), anom_col] = "Above Normal (76th - 90th)"
df.loc[(df[average_flow_col] >= df['prcntle_25']) & df[anom_col].isna(), anom_col] = "Normal (26th - 75th)"
df.loc[(df[average_flow_col] >= df['prcntle_10']) & df[anom_col].isna(), anom_col] = "Below Normal (11th - 25th))"
df.loc[(df[average_flow_col] >= df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Much Below Normal (6th - 10th)"
df.loc[(df[average_flow_col] < df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Low (<= 5th)"
df.loc[df[anom_col].isna(), anom_col] = "Insufficient Data Available"
df = df.replace(round(INSUFFICIENT_DATA_ERROR_CODE * 35.3147, 2), None)
df['nwm_vers'] = nwm_vers

print("Uploading output CSV file to S3")
s3 = boto3.client('s3')
tempdir = tempfile.mkdtemp()
tmp_ouput_path = os.path.join(tempdir, f"temp_output.csv")
df = df.reset_index()
df.to_csv(tmp_ouput_path, index=False)
s3.upload_file(tmp_ouput_path, output_file_bucket, output_file)
print(f"--- Uploaded to {output_file_bucket}:{output_file}")
os.remove(tmp_ouput_path)
Loading

0 comments on commit f0c67c8

Please sign in to comment.