From f0c67c830a586e8677f99945ce0323844c9eef05 Mon Sep 17 00:00:00 2001 From: TylerSchrag-NOAA <97621365+TylerSchrag-NOAA@users.noreply.github.com> Date: Tue, 12 Dec 2023 11:13:40 -0800 Subject: [PATCH] Viz EC2 Services to Lambda - Part 3a - Anomaly (#582) This PR migrates the Anomaly product from the Viz EC2 to the Viz Python Preprocessing lambda function. It broadly encapsulates the following changes: Changes: - New anomaly product script in the python_preprocessing lambda function that supports both 7day and 14day configurations. - Removal of anomaly pipeline and product files in the source-aws_loosa library - A second python_preprocessing lambda function, which uses the same source code, but with more RAM (I still need to add logic to choose the correct function, based on the anomaly config. This will be much easier to do once we deploy this to TI next week, so I will do it then). Deployment Considerations: - I may need to resolve merge conflicts after we merge Part 2 - Rapid Onset Flooding - We will need to include a ingest schema db dump when deploying to UAT. --------- Co-authored-by: CoreyKrewson-NOAA <69687817+CoreyKrewson-NOAA@users.noreply.github.com> Co-authored-by: NickChadwick-NOAA --- Core/LAMBDA/viz_functions/main.tf | 69 ++++++++- .../products/analysis_assim/ana_anomaly.sql | 35 +++++ .../lambda_function.py | 4 +- .../analysis_assim/ana_anomaly.yml | 31 ++++ .../services/analysis_assim/ana_anomaly.mapx | 4 +- .../lambda_function.py | 9 +- .../products/anomaly.py | 141 ++++++++++++++++++ Core/StepFunctions/main.tf | 9 +- .../viz_processing_pipeline.json.tftpl | 4 +- Core/main.tf | 37 ++--- .../aws_loosa/deploy/pipelines_config.yml | 1 - .../pipelines/ana_anomaly/ana_anomaly.sql | 14 -- .../pipelines/ana_anomaly/pipeline.yml | 17 --- .../pipelines/ana_anomaly/process.py | 107 ------------- .../aws_loosa/products/anomaly.py | 135 ----------------- 15 files changed, 311 insertions(+), 306 deletions(-) create mode 100644 Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql create mode 100644 Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml create mode 100644 Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py delete mode 100644 Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql delete mode 100644 Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml delete mode 100644 Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py delete mode 100644 Source/Visualizations/aws_loosa/products/anomaly.py diff --git a/Core/LAMBDA/viz_functions/main.tf b/Core/LAMBDA/viz_functions/main.tf index 616da481..fba1b123 100644 --- a/Core/LAMBDA/viz_functions/main.tf +++ b/Core/LAMBDA/viz_functions/main.tf @@ -393,7 +393,10 @@ resource "aws_s3_object" "python_preprocessing_zip_upload" { source_hash = filemd5(data.archive_file.python_preprocessing_zip.output_path) } -resource "aws_lambda_function" "viz_python_preprocessing" { +######################### +#### 2GB RAM Version #### +######################### +resource "aws_lambda_function" "viz_python_preprocessing_2GB" { function_name = "hv-vpp-${var.environment}-viz-python-preprocessing" description = "Lambda function to create max streamflow files for NWM data" memory_size = 2048 @@ -410,6 +413,7 @@ resource "aws_lambda_function" "viz_python_preprocessing" { environment { variables = { CACHE_DAYS = 1 + AUTH_DATA_BUCKET = var.viz_authoritative_bucket DATA_BUCKET_UPLOAD = var.fim_output_bucket VIZ_DB_DATABASE = var.viz_db_name VIZ_DB_HOST = var.viz_db_host @@ -432,11 +436,62 @@ resource "aws_lambda_function" "viz_python_preprocessing" { var.psycopg2_sqlalchemy_layer, var.viz_lambda_shared_funcs_layer, var.requests_layer, - var.dask_layer, + var.dask_layer ] tags = { - "Name" = "hv-vpp-${var.environment}-viz-python-preprocessing" + "Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-2GB" + } +} + +######################### +#### 10GB RAM Version #### +######################### +resource "aws_lambda_function" "viz_python_preprocessing_10GB" { + function_name = "hv-vpp-${var.environment}-viz-python-preprocessing" + description = "Lambda function to create max streamflow files for NWM data" + memory_size = 10240 + ephemeral_storage { + size = 6656 + } + timeout = 900 + + vpc_config { + security_group_ids = var.db_lambda_security_groups + subnet_ids = var.db_lambda_subnets + } + + environment { + variables = { + CACHE_DAYS = 1 + AUTH_DATA_BUCKET = var.viz_authoritative_bucket + DATA_BUCKET_UPLOAD = var.fim_output_bucket + VIZ_DB_DATABASE = var.viz_db_name + VIZ_DB_HOST = var.viz_db_host + VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"] + VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"] + NWM_DATAFLOW_VERSION = var.nwm_dataflow_version + } + } + s3_bucket = aws_s3_object.python_preprocessing_zip_upload.bucket + s3_key = aws_s3_object.python_preprocessing_zip_upload.key + source_code_hash = filebase64sha256(data.archive_file.python_preprocessing_zip.output_path) + + runtime = "python3.9" + handler = "lambda_function.lambda_handler" + + role = var.lambda_role + + layers = [ + var.xarray_layer, + var.psycopg2_sqlalchemy_layer, + var.viz_lambda_shared_funcs_layer, + var.requests_layer, + var.dask_layer + ] + + tags = { + "Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-2GB" } } @@ -945,8 +1000,12 @@ module "image-based-lambdas" { ######################################################################################################################################## ######################################################################################################################################## -output "python_preprocessing" { - value = aws_lambda_function.viz_python_preprocessing +output "python_preprocessing_2GB" { + value = aws_lambda_function.viz_python_preprocessing_2GB +} + +output "python_preprocessing_10GB" { + value = aws_lambda_function.viz_python_preprocessing_10GB } output "initialize_pipeline" { diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql new file mode 100644 index 00000000..89eb763f --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql @@ -0,0 +1,35 @@ +DROP TABLE IF EXISTS publish.ana_anomaly; +SELECT + channels.feature_id, + channels.feature_id::TEXT AS feature_id_str, + channels.strm_order, + channels.name, + channels.huc6, + anom_7d.nwm_vers, + anom_7d.reference_time, + anom_7d.reference_time AS valid_time, + anom_7d.average_flow_7day, + anom_7d.prcntle_5 as pctl_5_7d, + anom_7d.prcntle_10 as pctl_10_7d, + anom_7d.prcntle_25 as pctl_25_7d, + anom_7d.prcntle_75 as pctl_75_7d, + anom_7d.prcntle_90 as pctl_90_7d, + anom_7d.prcntle_95 as pctl_95_7d, + anom_7d.anom_cat_7day, + anom_14d.average_flow_14day, + anom_14d.prcntle_5 as pctl_5_14d, + anom_14d.prcntle_10 as pctl_10_14d, + anom_14d.prcntle_25 as pctl_25_14d, + anom_14d.prcntle_75 as pctl_75_14d, + anom_14d.prcntle_90 as pctl_90_14d, + anom_14d.prcntle_95 as pctl_95_14d, + anom_14d.anom_cat_14day, + ana.streamflow AS latest_flow, + to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, + channels.geom +INTO publish.ana_anomaly +FROM derived.channels_conus AS channels +LEFT OUTER JOIN ingest.ana_7day_anomaly AS anom_7d ON channels.feature_id = anom_7d.feature_id +LEFT OUTER JOIN ingest.ana_14day_anomaly AS anom_14d ON channels.feature_id = anom_14d.feature_id +LEFT OUTER JOIN ingest.nwm_channel_rt_ana AS ana ON channels.feature_id = ana.feature_id +WHERE average_flow_7day IS NOT NULL OR average_flow_14day IS NOT NULL; \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py index 8018bdd1..3e3029cb 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py @@ -497,7 +497,8 @@ def generate_python_preprocessing_file_list(self, file_groups): python_preprocesing_ingest_sets = [] db_ingest_sets = [] for file_group in file_groups: - product = file_group['product'] + product = file_group['product'] + config = file_group['config'] if file_group.get('config') else None output_file = file_group['output_file'] token_dict = get_file_tokens(output_file) @@ -508,6 +509,7 @@ def generate_python_preprocessing_file_list(self, file_groups): "fileset": python_preprocesing_file_set[0]['ingest_datasets'], "fileset_bucket": python_preprocesing_file_set[0]['bucket'], "product": product, + "config": config, "output_file": formatted_output_file, "output_file_bucket": os.environ['PYTHON_PREPROCESSING_BUCKET'], }) diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml new file mode 100644 index 00000000..458e74c9 --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml @@ -0,0 +1,31 @@ +product: ana_anomaly +configuration: analysis_assim +product_type: "vector" +run: true +run_times: + - '00:00' + +python_preprocessing: + - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc + file_step: 1H + file_window: P7D + product: anomaly + config: 7 + output_file: viz_ingest/analysis_assim/{{datetime:%Y%m%d}}/{{datetime:%H}}_ana_7day_anomaly.csv + target_table: ingest.ana_7day_anomaly + target_keys: (feature_id) + - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc + file_step: 1H + file_window: P14D + product: anomaly + config: 14 + output_file: viz_ingest/analysis_assim/{{datetime:%Y%m%d}}/{{datetime:%H}}_ana_14day_anomaly.csv + target_table: ingest.ana_14day_anomaly + target_keys: (feature_id) + +postprocess_sql: + - sql_file: ana_anomaly + target_table: publish.ana_anomaly + +services: + - ana_anomaly \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx b/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx index 6d1a6956..5326755b 100644 --- a/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx +++ b/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx @@ -446,7 +446,7 @@ "workspaceFactory" : "SDE", "dataset" : "hydrovis.services.%7-Day Average Streamflow Percentile_1_1_1", "datasetType" : "esriDTFeatureClass", - "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_7day, prcntle_5, prcntle_10, prcntle_25, prcntle_75, prcntle_90, prcntle_95, anom_cat_7day, latest_flow,reference_time,valid_time,update_time, oid from hydrovis.services.ana_anomaly", + "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_7day, pctl_5_7d AS prcntle_5, pctl_10_7d AS prcntle_10, pctl_25_7d AS prcntle_25, pctl_75_7d AS prcntle_75, pctl_90_7d AS prcntle_90, pctl_95_7d AS prcntle_95, anom_cat_7day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly WHERE anom_cat_7day IS NOT NULL", "srid" : "3857", "spatialReference" : { "wkid" : 102100, @@ -1404,7 +1404,7 @@ "workspaceFactory" : "SDE", "dataset" : "hydrovis.services.%14-Day Average Streamflow Percentile_2_2_2", "datasetType" : "esriDTFeatureClass", - "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, prcntle_5, prcntle_10, prcntle_25, prcntle_75, prcntle_90, prcntle_95, average_flow_14day, anom_cat_14day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly", + "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_14day, pctl_5_14d AS prcntle_5, pctl_10_14d AS prcntle_10, pctl_25_14d AS prcntle_25, pctl_75_14d AS prcntle_75, pctl_90_14d AS prcntle_90, pctl_95_14d AS prcntle_95, anom_cat_14day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly WHERE anom_cat_14day IS NOT NULL", "srid" : "3857", "spatialReference" : { "wkid" : 102100, diff --git a/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py b/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py index 29e13c2a..4d594270 100644 --- a/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py @@ -1,10 +1,11 @@ from datetime import datetime +import os from viz_lambda_shared_funcs import generate_file_list from products.max_values import aggregate_max_to_file from products.high_water_probability import run_high_water_probability from products.rapid_onset_flooding_probability import run_rapid_onset_flooding_probability - +from products.anomaly import run_anomaly def lambda_handler(event, context): """ @@ -53,13 +54,17 @@ def lambda_handler(event, context): output_file_bucket = event['args']['python_preprocessing']['output_file_bucket'] - print(f"Creating {output_file}") + print(f"Running {product} code and creating {output_file}") if product == "max_values": aggregate_max_to_file(fileset_bucket, fileset, output_file_bucket, output_file) elif product == "high_water_probability": run_high_water_probability(reference_date, fileset_bucket, fileset, output_file_bucket, output_file) elif product == "rapid_onset_flooding_probability": run_rapid_onset_flooding_probability(reference_date, fileset_bucket, fileset, output_file_bucket, output_file) + elif product == "anomaly": + anomaly_config = event['args']['python_preprocessing']['config'] + auth_data_bucket = os.environ['AUTH_DATA_BUCKET'] + run_anomaly(reference_date, fileset_bucket, fileset, output_file_bucket, output_file, auth_data_bucket, anomaly_config=anomaly_config) print(f"Successfully created {output_file} in {output_file_bucket}") diff --git a/Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py b/Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py new file mode 100644 index 00000000..6b7ca146 --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py @@ -0,0 +1,141 @@ +import numpy as np +import pandas as pd +import xarray as xr +import tempfile +import boto3 +import os +from viz_lambda_shared_funcs import check_if_file_exists, organize_input_files + +INSUFFICIENT_DATA_ERROR_CODE = -9998 +PERCENTILE_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_5th_perc.nc" +PERCENTILE_TABLE_10TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_10th_perc.nc" +PERCENTILE_TABLE_25TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_25th_perc.nc" +PERCENTILE_TABLE_75TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_75th_perc.nc" +PERCENTILE_TABLE_90TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_90th_perc.nc" +PERCENTILE_TABLE_95TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_95th_perc.nc" +PERCENTILE_14_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_5th_perc.nc" +PERCENTILE_14_TABLE_10TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_10th_perc.nc" +PERCENTILE_14_TABLE_25TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_25th_perc.nc" +PERCENTILE_14_TABLE_75TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_75th_perc.nc" +PERCENTILE_14_TABLE_90TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_90th_perc.nc" +PERCENTILE_14_TABLE_95TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_95th_perc.nc" + +def run_anomaly(reference_time, fileset_bucket, fileset, output_file_bucket, output_file, auth_data_bucket, anomaly_config=7): + average_flow_col = f'average_flow_{anomaly_config}day' + anom_col = f'anom_cat_{anomaly_config}day' + + ##### Data Prep ##### + if anomaly_config == 7: + download_subfolder = "7_day" + percentile_5 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_5TH, download=True, download_subfolder=download_subfolder) + percentile_10 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_10TH, download=True, download_subfolder=download_subfolder) + percentile_25 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_25TH, download=True, download_subfolder=download_subfolder) + percentile_75 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_75TH, download=True, download_subfolder=download_subfolder) + percentile_90 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_90TH, download=True, download_subfolder=download_subfolder) + percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_95TH, download=True, download_subfolder=download_subfolder) + elif anomaly_config == 14: + download_subfolder = "14_day" + percentile_5 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_5TH, download=True, download_subfolder=download_subfolder) + percentile_10 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_10TH, download=True, download_subfolder=download_subfolder) + percentile_25 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_25TH, download=True, download_subfolder=download_subfolder) + percentile_75 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_75TH, download=True, download_subfolder=download_subfolder) + percentile_90 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_90TH, download=True, download_subfolder=download_subfolder) + percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_95TH, download=True, download_subfolder=download_subfolder) + else: + raise Exception("Anomaly config must be 7 or 14 for the appropriate percentile files") + + print("Downloading NWM Data") + input_files = organize_input_files(fileset_bucket, fileset, download_subfolder=reference_time.strftime('%Y%m%d')) + + #Get NWM version from first file + with xr.open_dataset(input_files[0]) as first_file: + nwm_vers = first_file.NWM_version_number.replace("v","") + + # Import Feature IDs + print("-->Looping through files to get streamflow sum") + df = pd.DataFrame() + for file in input_files: + ds_file = xr.open_dataset(file) + df_file = ds_file['streamflow'].to_dataframe() + df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs + + if df.empty: + df = df_file + df = df.rename(columns={"streamflow": "streamflow_sum"}) + else: + df['streamflow_sum'] += df_file['streamflow'] + os.remove(file) + + df[average_flow_col] = df['streamflow_sum'] / len(input_files) + df = df.drop(columns=['streamflow_sum']) + df[average_flow_col] = df[average_flow_col].round(2) + + # Import Percentile Data + print("-->Importing percentile data:") + + date = int(reference_time.strftime("%j")) - 1 # retrieves the date in integer form from reference_time + + print(f"---->Retrieving {anomaly_config} day 5th percentiles...") + ds_perc = xr.open_dataset(percentile_5) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_5"}) + df_perc['prcntle_5'] = (df_perc['prcntle_5'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 10th percentiles...") + ds_perc = xr.open_dataset(percentile_10) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_10"}) + df_perc['prcntle_10'] = (df_perc['prcntle_10'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 25th percentiles...") + ds_perc = xr.open_dataset(percentile_25) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_25"}) + df_perc['prcntle_25'] = (df_perc['prcntle_25'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 75th percentiles...") + ds_perc = xr.open_dataset(percentile_75) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_75"}) + df_perc['prcntle_75'] = (df_perc['prcntle_75'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 90th percentiles...") + ds_perc = xr.open_dataset(percentile_90) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_90"}) + df_perc['prcntle_90'] = (df_perc['prcntle_90'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 95th percentiles...") + ds_perc = xr.open_dataset(percentile_95) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_95"}) + df_perc['prcntle_95'] = (df_perc['prcntle_95'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print("---->Creating percentile dictionary...") + df[anom_col] = np.nan + df.loc[(df[average_flow_col] >= df['prcntle_95']) & df[anom_col].isna(), anom_col] = "High (> 95th)" + df.loc[(df[average_flow_col] >= df['prcntle_90']) & df[anom_col].isna(), anom_col] = "Much Above Normal (91st - 95th)" # noqa: E501 + df.loc[(df[average_flow_col] >= df['prcntle_75']) & df[anom_col].isna(), anom_col] = "Above Normal (76th - 90th)" + df.loc[(df[average_flow_col] >= df['prcntle_25']) & df[anom_col].isna(), anom_col] = "Normal (26th - 75th)" + df.loc[(df[average_flow_col] >= df['prcntle_10']) & df[anom_col].isna(), anom_col] = "Below Normal (11th - 25th))" + df.loc[(df[average_flow_col] >= df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Much Below Normal (6th - 10th)" + df.loc[(df[average_flow_col] < df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Low (<= 5th)" + df.loc[df[anom_col].isna(), anom_col] = "Insufficient Data Available" + df = df.replace(round(INSUFFICIENT_DATA_ERROR_CODE * 35.3147, 2), None) + df['nwm_vers'] = nwm_vers + + print("Uploading output CSV file to S3") + s3 = boto3.client('s3') + tempdir = tempfile.mkdtemp() + tmp_ouput_path = os.path.join(tempdir, f"temp_output.csv") + df = df.reset_index() + df.to_csv(tmp_ouput_path, index=False) + s3.upload_file(tmp_ouput_path, output_file_bucket, output_file) + print(f"--- Uploaded to {output_file_bucket}:{output_file}") + os.remove(tmp_ouput_path) \ No newline at end of file diff --git a/Core/StepFunctions/main.tf b/Core/StepFunctions/main.tf index 1a5c35ce..ad6b0be7 100644 --- a/Core/StepFunctions/main.tf +++ b/Core/StepFunctions/main.tf @@ -10,7 +10,11 @@ variable "environment" { type = string } -variable "python_preprocessing_arn" { +variable "python_preprocessing_2GB_arn" { + type = string +} + +variable "python_preprocessing_10GB_arn" { type = string } @@ -148,7 +152,8 @@ resource "aws_sfn_state_machine" "viz_pipeline_step_function" { role_arn = var.viz_lambda_role definition = templatefile("${path.module}/viz_processing_pipeline.json.tftpl", { - python_preprocessing_arn = var.python_preprocessing_arn + python_preprocessing_2GB_arn = var.python_preprocessing_2GB_arn + python_preprocessing_10GB_arn = var.python_preprocessing_10GB_arn db_postprocess_sql_arn = var.db_postprocess_sql_arn db_ingest_arn = var.db_ingest_arn raster_processing_arn = var.raster_processing_arn diff --git a/Core/StepFunctions/viz_processing_pipeline.json.tftpl b/Core/StepFunctions/viz_processing_pipeline.json.tftpl index 3ba36246..af0ffdbf 100644 --- a/Core/StepFunctions/viz_processing_pipeline.json.tftpl +++ b/Core/StepFunctions/viz_processing_pipeline.json.tftpl @@ -41,7 +41,7 @@ "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { - "FunctionName": "${python_preprocessing_arn}", + "FunctionName": "${python_preprocessing_2GB_arn}", "Payload": { "args.$": "$", "step": "python_preprocessing" @@ -683,7 +683,7 @@ "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { - "FunctionName": "${python_preprocessing_arn}", + "FunctionName": "${python_preprocessing_2GB_arn}", "Payload": { "args.$": "$", "step": "fim_config_max_file" diff --git a/Core/main.tf b/Core/main.tf index 339e4e92..6cddded5 100644 --- a/Core/main.tf +++ b/Core/main.tf @@ -625,24 +625,25 @@ module "viz-lambda-functions" { module "step-functions" { source = "./StepFunctions" - viz_lambda_role = module.iam-roles.role_viz_pipeline.arn - rnr_lambda_role = module.iam-roles.role_sync_wrds_location_db.arn - environment = local.env.environment - optimize_rasters_arn = module.viz-lambda-functions.optimize_rasters.arn - update_egis_data_arn = module.viz-lambda-functions.update_egis_data.arn - fim_data_prep_arn = module.viz-lambda-functions.fim_data_prep.arn - db_postprocess_sql_arn = module.viz-lambda-functions.db_postprocess_sql.arn - db_ingest_arn = module.viz-lambda-functions.db_ingest.arn - raster_processing_arn = module.viz-lambda-functions.raster_processing.arn - publish_service_arn = module.viz-lambda-functions.publish_service.arn - python_preprocessing_arn = module.viz-lambda-functions.python_preprocessing.arn - hand_fim_processing_arn = module.viz-lambda-functions.hand_fim_processing.arn - schism_fim_processing_arn = module.viz-lambda-functions.schism_fim_processing.arn - initialize_pipeline_arn = module.viz-lambda-functions.initialize_pipeline.arn - rnr_domain_generator_arn = module.rnr-lambda-functions.rnr_domain_generator.arn - email_sns_topics = module.sns.email_sns_topics - aws_instances_to_reboot = [module.rnr.ec2.id] - fifteen_minute_trigger = module.eventbridge.fifteen_minute_eventbridge + viz_lambda_role = module.iam-roles.role_viz_pipeline.arn + rnr_lambda_role = module.iam-roles.role_sync_wrds_location_db.arn + environment = local.env.environment + optimize_rasters_arn = module.viz-lambda-functions.optimize_rasters.arn + update_egis_data_arn = module.viz-lambda-functions.update_egis_data.arn + fim_data_prep_arn = module.viz-lambda-functions.fim_data_prep.arn + db_postprocess_sql_arn = module.viz-lambda-functions.db_postprocess_sql.arn + db_ingest_arn = module.viz-lambda-functions.db_ingest.arn + raster_processing_arn = module.viz-lambda-functions.raster_processing.arn + publish_service_arn = module.viz-lambda-functions.publish_service.arn + python_preprocessing_2GB_arn = module.viz-lambda-functions.python_preprocessing_2GB.arn + python_preprocessing_10GB_arn = module.viz-lambda-functions.python_preprocessing_10GB.arn + hand_fim_processing_arn = module.viz-lambda-functions.hand_fim_processing.arn + schism_fim_processing_arn = module.viz-lambda-functions.schism_fim_processing.arn + initialize_pipeline_arn = module.viz-lambda-functions.initialize_pipeline.arn + rnr_domain_generator_arn = module.rnr-lambda-functions.rnr_domain_generator.arn + email_sns_topics = module.sns.email_sns_topics + aws_instances_to_reboot = [module.rnr.ec2.id] + fifteen_minute_trigger = module.eventbridge.fifteen_minute_eventbridge viz_processing_pipeline_log_group = module.cloudwatch.viz_processing_pipeline_log_group.name } diff --git a/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml b/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml index 40085289..5eb9ff0f 100644 --- a/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml +++ b/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml @@ -1,2 +1 @@ ADDED: - - ana_anomaly diff --git a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql b/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql deleted file mode 100644 index 4c557421..00000000 --- a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql +++ /dev/null @@ -1,14 +0,0 @@ -DROP TABLE IF EXISTS publish.ana_anomaly; -SELECT - channels.strm_order, - channels.name, - channels.huc6, - channels.nwm_vers, - channels.geom, - anom.*, - anom.feature_id::TEXT AS feature_id_str, - to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time -INTO publish.ana_anomaly -FROM derived.channels_conus AS channels -JOIN cache.ana_anomaly AS anom ON channels.feature_id = anom.feature_id; -DROP TABLE IF EXISTS cache.ana_anomaly; \ No newline at end of file diff --git a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml b/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml deleted file mode 100644 index 98931a1f..00000000 --- a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: ana_anomaly -dataset: - uris: - - s3-https://$NWM_DATA_BUCKET$.s3.amazonaws.com/common/data/model/com/nwm/$NWM_DATAFLOW_VERSION$/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - window: P14D - repeat: PT1H - repeat_ref_time: 0:00:00 - delay: PT55M - expect: PT1H35M - transfer: all - clean: True - acceptable_uris_missing: 5% -process: - script: .\process.py - interval_ref_time: 0:00:00 - interval: PT24H - timeout: PT2H diff --git a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py b/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py deleted file mode 100644 index a5f39cf3..00000000 --- a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py +++ /dev/null @@ -1,107 +0,0 @@ -import sys -import time -import os -import re -import xarray -from datetime import datetime -import arcpy - -from aws_loosa.processes.base.aws_egis_process import AWSEgisPublishingProcess -from aws_loosa.consts.egis import PRIMARY_SERVER, NWM_FOLDER, VALID_TIME -from aws_loosa.products.anomaly import anomaly -from aws_loosa.utils.shared_funcs import create_service_db_tables - - -class AnaAnomaly(AWSEgisPublishingProcess): - service_name = 'ana_anomaly' - - def _process(self, a_event_time, a_input_files, a_output_location, *args, **kwargs): - """ - Override this method to execute the post processing steps. - - Args: - a_event_time(datetime.datetime): the datetime associated with the current processing run. - a_input_files(list): a list of absolute paths to the required input files. - a_output_location(str): the location to be used to write output from the process. - - Returns: - bool: True if processing finished successfully and False if it encountered an error. - """ - - # Execute script - start_time = time.time() - arcpy.AddMessage("Running Streamflow Anomaly v2.0:") - - day7_list = [] - day14_list = [] - day7_threshold = a_event_time - datetime.timedelta(days=7) - day14_threshold = a_event_time - datetime.timedelta(days=14) - latest_file = None - latest_date = None - - # With file threshold being used, we cant assume the first half of the files are for 7 days - for file in a_input_files: - re_str = r"(\d{4})(\d{2})(\d{2})-analysis_assim.nwm.t(\d{2})z.analysis_assim.channel_rt.tm00.conus.nc" - findings = re.findall(re_str, file.replace("\\", "."))[0] - file_date = datetime.datetime( - year=int(findings[0]), month=int(findings[1]), day=int(findings[2]), hour=int(findings[3]) - ) - - if day14_threshold <= file_date <= a_event_time: - day14_list.append(file) - - if day7_threshold <= file_date <= a_event_time: - day7_list.append(file) - - if not latest_file: - latest_file = file - latest_date = file_date - elif file_date > latest_date: - latest_file = file - latest_date = file_date - - ds_latest = xarray.open_dataset(latest_file) - - channel_rt_date_time = a_event_time - self._log.info("Calculating 7 day anomaly") - df_7_day_anomaly = anomaly(day7_list, channel_rt_date_time, anomaly_config=7) - - self._log.info("Calculating 14 day anomaly") - df_14_day_anomaly = anomaly(day14_list, channel_rt_date_time, anomaly_config=14) - - self._log.info("Joining 7day and 14day dataframes") - df = df_7_day_anomaly.join(df_14_day_anomaly[['average_flow_14day', 'anom_cat_14day']]) - - self._log.info("Adding latest data") - df['latest_flow'] = (ds_latest.streamflow * 35.3147).round(2) - df['reference_time'] = latest_date.strftime("%Y-%m-%d %H:%M:%S UTC") - df['valid_time'] = latest_date.strftime("%Y-%m-%d %H:%M:%S UTC") - - self._log.info("Removing unnecessary rows") - df = df.loc[~((df['average_flow_7day'] == 0) & (df['average_flow_14day'] == 0) & (df['prcntle_95'] == 0))] - df = df.loc[~((df['anom_cat_7day'] == 'Normal (26th - 75th)') & (df['anom_cat_14day'] == 'Normal (26th - 75th)'))] # noqa: E501 - df = df.reset_index() - - sql_files = [os.path.join(os.path.dirname(__file__), "ana_anomaly.sql")] - - service_table_names = ["ana_anomaly"] - - create_service_db_tables(df, self.service_name, sql_files, service_table_names, self.process_event_time, past_run=self.one_off) - - arcpy.AddMessage("All done!") - seconds = time.time() - start_time - minutes = str(round((seconds / 60.0), 2)) - arcpy.AddMessage("Run time: " + minutes + " minutes") - - -if __name__ == '__main__': - # Get Args - forecast_date = sys.argv[1] - input_files = sys.argv[2] - log_directory = sys.argv[3] - next_forecast_date = sys.argv[4] - - # Create process - p = AnaAnomaly(a_log_directory=log_directory, a_log_level='INFO') - success = p.execute(forecast_date, input_files, next_forecast_date) - exit(0 if success else 1) diff --git a/Source/Visualizations/aws_loosa/products/anomaly.py b/Source/Visualizations/aws_loosa/products/anomaly.py deleted file mode 100644 index 4296e193..00000000 --- a/Source/Visualizations/aws_loosa/products/anomaly.py +++ /dev/null @@ -1,135 +0,0 @@ -import arcpy -import numpy as np -import pandas as pd -import xarray as xr -from warnings import filterwarnings - -# Import Authoritative Datasets and Constants -from aws_loosa.consts import (INSUFFICIENT_DATA_ERROR_CODE) -from aws_loosa.consts.paths import (PERCENTILE_TABLE_5TH, PERCENTILE_TABLE_10TH, - PERCENTILE_TABLE_25TH, PERCENTILE_TABLE_75TH, - PERCENTILE_TABLE_90TH, PERCENTILE_TABLE_95TH, - PERCENTILE_14_TABLE_5TH, PERCENTILE_14_TABLE_10TH, - PERCENTILE_14_TABLE_25TH, PERCENTILE_14_TABLE_75TH, - PERCENTILE_14_TABLE_90TH, PERCENTILE_14_TABLE_95TH) - - -filterwarnings("ignore") - -arcpy.env.overwriteOutput = True - - -def anomaly(channel_rt_files_list, channel_rt_date_time, anomaly_config=7): - """ - This function calculates the 7-day streamflow average for all reaches and classifies these into anomaly categories. - - Args: - channel_rt_files_list (list): A list of all National Water Model (NWM) channel_rt files for each hour over the - past 7 days. - channel_rt_date_time (datetime object): The date and time of interest. - anomaly_config (int): Day configuration to use for percentiles (i.e. 7 day config, 14 day config, etc). - - Returns: - anomaly_array (array): An array of (Feature ID, Product Version, Valid Time, Anomaly Category, 95th 7-day - Streamflow Percentile, 90th 7-day Streamflow Percentile, 75th 7-day Streamflow Percentile, 25th 7-day - Streamflow Percentile, 10th 7-day Streamflow Percentile, 5th 7-day Streamflow Percentile) groups for the - date and time of interest. - """ - average_flow_col = f'average_flow_{anomaly_config}day' - anom_col = f'anom_cat_{anomaly_config}day' - - if anomaly_config == 7: - percentile_5 = PERCENTILE_TABLE_5TH - percentile_10 = PERCENTILE_TABLE_10TH - percentile_25 = PERCENTILE_TABLE_25TH - percentile_75 = PERCENTILE_TABLE_75TH - percentile_90 = PERCENTILE_TABLE_90TH - percentile_95 = PERCENTILE_TABLE_95TH - elif anomaly_config == 14: - percentile_5 = PERCENTILE_14_TABLE_5TH - percentile_10 = PERCENTILE_14_TABLE_10TH - percentile_25 = PERCENTILE_14_TABLE_25TH - percentile_75 = PERCENTILE_14_TABLE_75TH - percentile_90 = PERCENTILE_14_TABLE_90TH - percentile_95 = PERCENTILE_14_TABLE_95TH - else: - raise Exception("Anomaly config must be 7 or 14 for the appropriate percentile files") - - # Import Feature IDs - arcpy.AddMessage("-->Looping through files to get streamflow sum") - df = pd.DataFrame() - for file in channel_rt_files_list: - ds_file = xr.open_dataset(file) - df_file = ds_file['streamflow'].to_dataframe() - df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs - - if df.empty: - df = df_file - df = df.rename(columns={"streamflow": "streamflow_sum"}) - else: - df['streamflow_sum'] += df_file['streamflow'] - - df[average_flow_col] = df['streamflow_sum'] / len(channel_rt_files_list) - df = df.drop(columns=['streamflow_sum']) - df[average_flow_col] = df[average_flow_col].round(2) - - # Import Percentile Data - arcpy.AddMessage("-->Importing percentile data:") - - date = int(channel_rt_date_time.strftime("%j")) - 1 # retrieves the date in integer form from channel_rt_date_time - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 5th percentiles...") - ds_perc = xr.open_dataset(percentile_5) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_5"}) - df_perc['prcntle_5'] = (df_perc['prcntle_5'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 10th percentiles...") - ds_perc = xr.open_dataset(percentile_10) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_10"}) - df_perc['prcntle_10'] = (df_perc['prcntle_10'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 25th percentiles...") - ds_perc = xr.open_dataset(percentile_25) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_25"}) - df_perc['prcntle_25'] = (df_perc['prcntle_25'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 75th percentiles...") - ds_perc = xr.open_dataset(percentile_75) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_75"}) - df_perc['prcntle_75'] = (df_perc['prcntle_75'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 90th percentiles...") - ds_perc = xr.open_dataset(percentile_90) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_90"}) - df_perc['prcntle_90'] = (df_perc['prcntle_90'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 95th percentiles...") - ds_perc = xr.open_dataset(percentile_95) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_95"}) - df_perc['prcntle_95'] = (df_perc['prcntle_95'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage("---->Creating percentile dictionary...") - df[anom_col] = np.nan - df.loc[(df[average_flow_col] >= df['prcntle_95']) & df[anom_col].isna(), anom_col] = "High (> 95th)" - df.loc[(df[average_flow_col] >= df['prcntle_90']) & df[anom_col].isna(), anom_col] = "Much Above Normal (91st - 95th)" # noqa: E501 - df.loc[(df[average_flow_col] >= df['prcntle_75']) & df[anom_col].isna(), anom_col] = "Above Normal (76th - 90th)" - df.loc[(df[average_flow_col] >= df['prcntle_25']) & df[anom_col].isna(), anom_col] = "Normal (26th - 75th)" - df.loc[(df[average_flow_col] >= df['prcntle_10']) & df[anom_col].isna(), anom_col] = "Below Normal (11th - 25th))" - df.loc[(df[average_flow_col] >= df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Much Below Normal (6th - 10th)" - df.loc[(df[average_flow_col] < df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Low (<= 5th)" - df.loc[df[anom_col].isna(), anom_col] = "Insufficient Data Available" - df = df.replace(round(INSUFFICIENT_DATA_ERROR_CODE * 35.3147, 2), "No Data") - - return df