diff --git a/Core/LAMBDA/viz_functions/main.tf b/Core/LAMBDA/viz_functions/main.tf index 616da481..fba1b123 100644 --- a/Core/LAMBDA/viz_functions/main.tf +++ b/Core/LAMBDA/viz_functions/main.tf @@ -393,7 +393,10 @@ resource "aws_s3_object" "python_preprocessing_zip_upload" { source_hash = filemd5(data.archive_file.python_preprocessing_zip.output_path) } -resource "aws_lambda_function" "viz_python_preprocessing" { +######################### +#### 2GB RAM Version #### +######################### +resource "aws_lambda_function" "viz_python_preprocessing_2GB" { function_name = "hv-vpp-${var.environment}-viz-python-preprocessing" description = "Lambda function to create max streamflow files for NWM data" memory_size = 2048 @@ -410,6 +413,7 @@ resource "aws_lambda_function" "viz_python_preprocessing" { environment { variables = { CACHE_DAYS = 1 + AUTH_DATA_BUCKET = var.viz_authoritative_bucket DATA_BUCKET_UPLOAD = var.fim_output_bucket VIZ_DB_DATABASE = var.viz_db_name VIZ_DB_HOST = var.viz_db_host @@ -432,11 +436,62 @@ resource "aws_lambda_function" "viz_python_preprocessing" { var.psycopg2_sqlalchemy_layer, var.viz_lambda_shared_funcs_layer, var.requests_layer, - var.dask_layer, + var.dask_layer ] tags = { - "Name" = "hv-vpp-${var.environment}-viz-python-preprocessing" + "Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-2GB" + } +} + +######################### +#### 10GB RAM Version #### +######################### +resource "aws_lambda_function" "viz_python_preprocessing_10GB" { + function_name = "hv-vpp-${var.environment}-viz-python-preprocessing" + description = "Lambda function to create max streamflow files for NWM data" + memory_size = 10240 + ephemeral_storage { + size = 6656 + } + timeout = 900 + + vpc_config { + security_group_ids = var.db_lambda_security_groups + subnet_ids = var.db_lambda_subnets + } + + environment { + variables = { + CACHE_DAYS = 1 + AUTH_DATA_BUCKET = var.viz_authoritative_bucket + DATA_BUCKET_UPLOAD = var.fim_output_bucket + VIZ_DB_DATABASE = var.viz_db_name + VIZ_DB_HOST = var.viz_db_host + VIZ_DB_USERNAME = jsondecode(var.viz_db_user_secret_string)["username"] + VIZ_DB_PASSWORD = jsondecode(var.viz_db_user_secret_string)["password"] + NWM_DATAFLOW_VERSION = var.nwm_dataflow_version + } + } + s3_bucket = aws_s3_object.python_preprocessing_zip_upload.bucket + s3_key = aws_s3_object.python_preprocessing_zip_upload.key + source_code_hash = filebase64sha256(data.archive_file.python_preprocessing_zip.output_path) + + runtime = "python3.9" + handler = "lambda_function.lambda_handler" + + role = var.lambda_role + + layers = [ + var.xarray_layer, + var.psycopg2_sqlalchemy_layer, + var.viz_lambda_shared_funcs_layer, + var.requests_layer, + var.dask_layer + ] + + tags = { + "Name" = "hv-vpp-${var.environment}-viz-python-preprocessing-2GB" } } @@ -945,8 +1000,12 @@ module "image-based-lambdas" { ######################################################################################################################################## ######################################################################################################################################## -output "python_preprocessing" { - value = aws_lambda_function.viz_python_preprocessing +output "python_preprocessing_2GB" { + value = aws_lambda_function.viz_python_preprocessing_2GB +} + +output "python_preprocessing_10GB" { + value = aws_lambda_function.viz_python_preprocessing_10GB } output "initialize_pipeline" { diff --git a/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql new file mode 100644 index 00000000..89eb763f --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_db_postprocess_sql/products/analysis_assim/ana_anomaly.sql @@ -0,0 +1,35 @@ +DROP TABLE IF EXISTS publish.ana_anomaly; +SELECT + channels.feature_id, + channels.feature_id::TEXT AS feature_id_str, + channels.strm_order, + channels.name, + channels.huc6, + anom_7d.nwm_vers, + anom_7d.reference_time, + anom_7d.reference_time AS valid_time, + anom_7d.average_flow_7day, + anom_7d.prcntle_5 as pctl_5_7d, + anom_7d.prcntle_10 as pctl_10_7d, + anom_7d.prcntle_25 as pctl_25_7d, + anom_7d.prcntle_75 as pctl_75_7d, + anom_7d.prcntle_90 as pctl_90_7d, + anom_7d.prcntle_95 as pctl_95_7d, + anom_7d.anom_cat_7day, + anom_14d.average_flow_14day, + anom_14d.prcntle_5 as pctl_5_14d, + anom_14d.prcntle_10 as pctl_10_14d, + anom_14d.prcntle_25 as pctl_25_14d, + anom_14d.prcntle_75 as pctl_75_14d, + anom_14d.prcntle_90 as pctl_90_14d, + anom_14d.prcntle_95 as pctl_95_14d, + anom_14d.anom_cat_14day, + ana.streamflow AS latest_flow, + to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time, + channels.geom +INTO publish.ana_anomaly +FROM derived.channels_conus AS channels +LEFT OUTER JOIN ingest.ana_7day_anomaly AS anom_7d ON channels.feature_id = anom_7d.feature_id +LEFT OUTER JOIN ingest.ana_14day_anomaly AS anom_14d ON channels.feature_id = anom_14d.feature_id +LEFT OUTER JOIN ingest.nwm_channel_rt_ana AS ana ON channels.feature_id = ana.feature_id +WHERE average_flow_7day IS NOT NULL OR average_flow_14day IS NOT NULL; \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py index 8018bdd1..3e3029cb 100644 --- a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py @@ -497,7 +497,8 @@ def generate_python_preprocessing_file_list(self, file_groups): python_preprocesing_ingest_sets = [] db_ingest_sets = [] for file_group in file_groups: - product = file_group['product'] + product = file_group['product'] + config = file_group['config'] if file_group.get('config') else None output_file = file_group['output_file'] token_dict = get_file_tokens(output_file) @@ -508,6 +509,7 @@ def generate_python_preprocessing_file_list(self, file_groups): "fileset": python_preprocesing_file_set[0]['ingest_datasets'], "fileset_bucket": python_preprocesing_file_set[0]['bucket'], "product": product, + "config": config, "output_file": formatted_output_file, "output_file_bucket": os.environ['PYTHON_PREPROCESSING_BUCKET'], }) diff --git a/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml new file mode 100644 index 00000000..458e74c9 --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_initialize_pipeline/product_configs/analysis_assim/ana_anomaly.yml @@ -0,0 +1,31 @@ +product: ana_anomaly +configuration: analysis_assim +product_type: "vector" +run: true +run_times: + - '00:00' + +python_preprocessing: + - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc + file_step: 1H + file_window: P7D + product: anomaly + config: 7 + output_file: viz_ingest/analysis_assim/{{datetime:%Y%m%d}}/{{datetime:%H}}_ana_7day_anomaly.csv + target_table: ingest.ana_7day_anomaly + target_keys: (feature_id) + - file_format: common/data/model/com/nwm/{{variable:NWM_DATAFLOW_VERSION}}/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc + file_step: 1H + file_window: P14D + product: anomaly + config: 14 + output_file: viz_ingest/analysis_assim/{{datetime:%Y%m%d}}/{{datetime:%H}}_ana_14day_anomaly.csv + target_table: ingest.ana_14day_anomaly + target_keys: (feature_id) + +postprocess_sql: + - sql_file: ana_anomaly + target_table: publish.ana_anomaly + +services: + - ana_anomaly \ No newline at end of file diff --git a/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx b/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx index 6d1a6956..5326755b 100644 --- a/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx +++ b/Core/LAMBDA/viz_functions/viz_publish_service/services/analysis_assim/ana_anomaly.mapx @@ -446,7 +446,7 @@ "workspaceFactory" : "SDE", "dataset" : "hydrovis.services.%7-Day Average Streamflow Percentile_1_1_1", "datasetType" : "esriDTFeatureClass", - "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_7day, prcntle_5, prcntle_10, prcntle_25, prcntle_75, prcntle_90, prcntle_95, anom_cat_7day, latest_flow,reference_time,valid_time,update_time, oid from hydrovis.services.ana_anomaly", + "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_7day, pctl_5_7d AS prcntle_5, pctl_10_7d AS prcntle_10, pctl_25_7d AS prcntle_25, pctl_75_7d AS prcntle_75, pctl_90_7d AS prcntle_90, pctl_95_7d AS prcntle_95, anom_cat_7day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly WHERE anom_cat_7day IS NOT NULL", "srid" : "3857", "spatialReference" : { "wkid" : 102100, @@ -1404,7 +1404,7 @@ "workspaceFactory" : "SDE", "dataset" : "hydrovis.services.%14-Day Average Streamflow Percentile_2_2_2", "datasetType" : "esriDTFeatureClass", - "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, prcntle_5, prcntle_10, prcntle_25, prcntle_75, prcntle_90, prcntle_95, average_flow_14day, anom_cat_14day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly", + "sqlQuery" : "select strm_order, name, huc6, nwm_vers, geom, feature_id_str AS feature_id, average_flow_14day, pctl_5_14d AS prcntle_5, pctl_10_14d AS prcntle_10, pctl_25_14d AS prcntle_25, pctl_75_14d AS prcntle_75, pctl_90_14d AS prcntle_90, pctl_95_14d AS prcntle_95, anom_cat_14day, latest_flow, reference_time, valid_time, update_time, oid from hydrovis.services.ana_anomaly WHERE anom_cat_14day IS NOT NULL", "srid" : "3857", "spatialReference" : { "wkid" : 102100, diff --git a/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py b/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py index 29e13c2a..4d594270 100644 --- a/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py +++ b/Core/LAMBDA/viz_functions/viz_python_preprocessing/lambda_function.py @@ -1,10 +1,11 @@ from datetime import datetime +import os from viz_lambda_shared_funcs import generate_file_list from products.max_values import aggregate_max_to_file from products.high_water_probability import run_high_water_probability from products.rapid_onset_flooding_probability import run_rapid_onset_flooding_probability - +from products.anomaly import run_anomaly def lambda_handler(event, context): """ @@ -53,13 +54,17 @@ def lambda_handler(event, context): output_file_bucket = event['args']['python_preprocessing']['output_file_bucket'] - print(f"Creating {output_file}") + print(f"Running {product} code and creating {output_file}") if product == "max_values": aggregate_max_to_file(fileset_bucket, fileset, output_file_bucket, output_file) elif product == "high_water_probability": run_high_water_probability(reference_date, fileset_bucket, fileset, output_file_bucket, output_file) elif product == "rapid_onset_flooding_probability": run_rapid_onset_flooding_probability(reference_date, fileset_bucket, fileset, output_file_bucket, output_file) + elif product == "anomaly": + anomaly_config = event['args']['python_preprocessing']['config'] + auth_data_bucket = os.environ['AUTH_DATA_BUCKET'] + run_anomaly(reference_date, fileset_bucket, fileset, output_file_bucket, output_file, auth_data_bucket, anomaly_config=anomaly_config) print(f"Successfully created {output_file} in {output_file_bucket}") diff --git a/Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py b/Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py new file mode 100644 index 00000000..6b7ca146 --- /dev/null +++ b/Core/LAMBDA/viz_functions/viz_python_preprocessing/products/anomaly.py @@ -0,0 +1,141 @@ +import numpy as np +import pandas as pd +import xarray as xr +import tempfile +import boto3 +import os +from viz_lambda_shared_funcs import check_if_file_exists, organize_input_files + +INSUFFICIENT_DATA_ERROR_CODE = -9998 +PERCENTILE_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_5th_perc.nc" +PERCENTILE_TABLE_10TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_10th_perc.nc" +PERCENTILE_TABLE_25TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_25th_perc.nc" +PERCENTILE_TABLE_75TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_75th_perc.nc" +PERCENTILE_TABLE_90TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_90th_perc.nc" +PERCENTILE_TABLE_95TH = "viz_authoritative_data/derived_data/nwm_v21_7_day_average_percentiles/final_7day_all_95th_perc.nc" +PERCENTILE_14_TABLE_5TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_5th_perc.nc" +PERCENTILE_14_TABLE_10TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_10th_perc.nc" +PERCENTILE_14_TABLE_25TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_25th_perc.nc" +PERCENTILE_14_TABLE_75TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_75th_perc.nc" +PERCENTILE_14_TABLE_90TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_90th_perc.nc" +PERCENTILE_14_TABLE_95TH = "viz_authoritative_data/derived_data/nwm_v21_14_day_average_percentiles/final_14day_all_95th_perc.nc" + +def run_anomaly(reference_time, fileset_bucket, fileset, output_file_bucket, output_file, auth_data_bucket, anomaly_config=7): + average_flow_col = f'average_flow_{anomaly_config}day' + anom_col = f'anom_cat_{anomaly_config}day' + + ##### Data Prep ##### + if anomaly_config == 7: + download_subfolder = "7_day" + percentile_5 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_5TH, download=True, download_subfolder=download_subfolder) + percentile_10 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_10TH, download=True, download_subfolder=download_subfolder) + percentile_25 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_25TH, download=True, download_subfolder=download_subfolder) + percentile_75 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_75TH, download=True, download_subfolder=download_subfolder) + percentile_90 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_90TH, download=True, download_subfolder=download_subfolder) + percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_TABLE_95TH, download=True, download_subfolder=download_subfolder) + elif anomaly_config == 14: + download_subfolder = "14_day" + percentile_5 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_5TH, download=True, download_subfolder=download_subfolder) + percentile_10 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_10TH, download=True, download_subfolder=download_subfolder) + percentile_25 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_25TH, download=True, download_subfolder=download_subfolder) + percentile_75 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_75TH, download=True, download_subfolder=download_subfolder) + percentile_90 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_90TH, download=True, download_subfolder=download_subfolder) + percentile_95 = check_if_file_exists(auth_data_bucket, PERCENTILE_14_TABLE_95TH, download=True, download_subfolder=download_subfolder) + else: + raise Exception("Anomaly config must be 7 or 14 for the appropriate percentile files") + + print("Downloading NWM Data") + input_files = organize_input_files(fileset_bucket, fileset, download_subfolder=reference_time.strftime('%Y%m%d')) + + #Get NWM version from first file + with xr.open_dataset(input_files[0]) as first_file: + nwm_vers = first_file.NWM_version_number.replace("v","") + + # Import Feature IDs + print("-->Looping through files to get streamflow sum") + df = pd.DataFrame() + for file in input_files: + ds_file = xr.open_dataset(file) + df_file = ds_file['streamflow'].to_dataframe() + df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs + + if df.empty: + df = df_file + df = df.rename(columns={"streamflow": "streamflow_sum"}) + else: + df['streamflow_sum'] += df_file['streamflow'] + os.remove(file) + + df[average_flow_col] = df['streamflow_sum'] / len(input_files) + df = df.drop(columns=['streamflow_sum']) + df[average_flow_col] = df[average_flow_col].round(2) + + # Import Percentile Data + print("-->Importing percentile data:") + + date = int(reference_time.strftime("%j")) - 1 # retrieves the date in integer form from reference_time + + print(f"---->Retrieving {anomaly_config} day 5th percentiles...") + ds_perc = xr.open_dataset(percentile_5) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_5"}) + df_perc['prcntle_5'] = (df_perc['prcntle_5'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 10th percentiles...") + ds_perc = xr.open_dataset(percentile_10) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_10"}) + df_perc['prcntle_10'] = (df_perc['prcntle_10'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 25th percentiles...") + ds_perc = xr.open_dataset(percentile_25) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_25"}) + df_perc['prcntle_25'] = (df_perc['prcntle_25'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 75th percentiles...") + ds_perc = xr.open_dataset(percentile_75) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_75"}) + df_perc['prcntle_75'] = (df_perc['prcntle_75'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 90th percentiles...") + ds_perc = xr.open_dataset(percentile_90) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_90"}) + df_perc['prcntle_90'] = (df_perc['prcntle_90'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print(f"---->Retrieving {anomaly_config} day 95th percentiles...") + ds_perc = xr.open_dataset(percentile_95) + df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() + df_perc = df_perc.rename(columns={"streamflow": "prcntle_95"}) + df_perc['prcntle_95'] = (df_perc['prcntle_95'] * 35.3147).round(2) # convert streamflow from cms to cfs + df = df.join(df_perc) + + print("---->Creating percentile dictionary...") + df[anom_col] = np.nan + df.loc[(df[average_flow_col] >= df['prcntle_95']) & df[anom_col].isna(), anom_col] = "High (> 95th)" + df.loc[(df[average_flow_col] >= df['prcntle_90']) & df[anom_col].isna(), anom_col] = "Much Above Normal (91st - 95th)" # noqa: E501 + df.loc[(df[average_flow_col] >= df['prcntle_75']) & df[anom_col].isna(), anom_col] = "Above Normal (76th - 90th)" + df.loc[(df[average_flow_col] >= df['prcntle_25']) & df[anom_col].isna(), anom_col] = "Normal (26th - 75th)" + df.loc[(df[average_flow_col] >= df['prcntle_10']) & df[anom_col].isna(), anom_col] = "Below Normal (11th - 25th))" + df.loc[(df[average_flow_col] >= df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Much Below Normal (6th - 10th)" + df.loc[(df[average_flow_col] < df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Low (<= 5th)" + df.loc[df[anom_col].isna(), anom_col] = "Insufficient Data Available" + df = df.replace(round(INSUFFICIENT_DATA_ERROR_CODE * 35.3147, 2), None) + df['nwm_vers'] = nwm_vers + + print("Uploading output CSV file to S3") + s3 = boto3.client('s3') + tempdir = tempfile.mkdtemp() + tmp_ouput_path = os.path.join(tempdir, f"temp_output.csv") + df = df.reset_index() + df.to_csv(tmp_ouput_path, index=False) + s3.upload_file(tmp_ouput_path, output_file_bucket, output_file) + print(f"--- Uploaded to {output_file_bucket}:{output_file}") + os.remove(tmp_ouput_path) \ No newline at end of file diff --git a/Core/StepFunctions/main.tf b/Core/StepFunctions/main.tf index 1a5c35ce..ad6b0be7 100644 --- a/Core/StepFunctions/main.tf +++ b/Core/StepFunctions/main.tf @@ -10,7 +10,11 @@ variable "environment" { type = string } -variable "python_preprocessing_arn" { +variable "python_preprocessing_2GB_arn" { + type = string +} + +variable "python_preprocessing_10GB_arn" { type = string } @@ -148,7 +152,8 @@ resource "aws_sfn_state_machine" "viz_pipeline_step_function" { role_arn = var.viz_lambda_role definition = templatefile("${path.module}/viz_processing_pipeline.json.tftpl", { - python_preprocessing_arn = var.python_preprocessing_arn + python_preprocessing_2GB_arn = var.python_preprocessing_2GB_arn + python_preprocessing_10GB_arn = var.python_preprocessing_10GB_arn db_postprocess_sql_arn = var.db_postprocess_sql_arn db_ingest_arn = var.db_ingest_arn raster_processing_arn = var.raster_processing_arn diff --git a/Core/StepFunctions/viz_processing_pipeline.json.tftpl b/Core/StepFunctions/viz_processing_pipeline.json.tftpl index 3ba36246..af0ffdbf 100644 --- a/Core/StepFunctions/viz_processing_pipeline.json.tftpl +++ b/Core/StepFunctions/viz_processing_pipeline.json.tftpl @@ -41,7 +41,7 @@ "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { - "FunctionName": "${python_preprocessing_arn}", + "FunctionName": "${python_preprocessing_2GB_arn}", "Payload": { "args.$": "$", "step": "python_preprocessing" @@ -683,7 +683,7 @@ "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { - "FunctionName": "${python_preprocessing_arn}", + "FunctionName": "${python_preprocessing_2GB_arn}", "Payload": { "args.$": "$", "step": "fim_config_max_file" diff --git a/Core/main.tf b/Core/main.tf index 339e4e92..6cddded5 100644 --- a/Core/main.tf +++ b/Core/main.tf @@ -625,24 +625,25 @@ module "viz-lambda-functions" { module "step-functions" { source = "./StepFunctions" - viz_lambda_role = module.iam-roles.role_viz_pipeline.arn - rnr_lambda_role = module.iam-roles.role_sync_wrds_location_db.arn - environment = local.env.environment - optimize_rasters_arn = module.viz-lambda-functions.optimize_rasters.arn - update_egis_data_arn = module.viz-lambda-functions.update_egis_data.arn - fim_data_prep_arn = module.viz-lambda-functions.fim_data_prep.arn - db_postprocess_sql_arn = module.viz-lambda-functions.db_postprocess_sql.arn - db_ingest_arn = module.viz-lambda-functions.db_ingest.arn - raster_processing_arn = module.viz-lambda-functions.raster_processing.arn - publish_service_arn = module.viz-lambda-functions.publish_service.arn - python_preprocessing_arn = module.viz-lambda-functions.python_preprocessing.arn - hand_fim_processing_arn = module.viz-lambda-functions.hand_fim_processing.arn - schism_fim_processing_arn = module.viz-lambda-functions.schism_fim_processing.arn - initialize_pipeline_arn = module.viz-lambda-functions.initialize_pipeline.arn - rnr_domain_generator_arn = module.rnr-lambda-functions.rnr_domain_generator.arn - email_sns_topics = module.sns.email_sns_topics - aws_instances_to_reboot = [module.rnr.ec2.id] - fifteen_minute_trigger = module.eventbridge.fifteen_minute_eventbridge + viz_lambda_role = module.iam-roles.role_viz_pipeline.arn + rnr_lambda_role = module.iam-roles.role_sync_wrds_location_db.arn + environment = local.env.environment + optimize_rasters_arn = module.viz-lambda-functions.optimize_rasters.arn + update_egis_data_arn = module.viz-lambda-functions.update_egis_data.arn + fim_data_prep_arn = module.viz-lambda-functions.fim_data_prep.arn + db_postprocess_sql_arn = module.viz-lambda-functions.db_postprocess_sql.arn + db_ingest_arn = module.viz-lambda-functions.db_ingest.arn + raster_processing_arn = module.viz-lambda-functions.raster_processing.arn + publish_service_arn = module.viz-lambda-functions.publish_service.arn + python_preprocessing_2GB_arn = module.viz-lambda-functions.python_preprocessing_2GB.arn + python_preprocessing_10GB_arn = module.viz-lambda-functions.python_preprocessing_10GB.arn + hand_fim_processing_arn = module.viz-lambda-functions.hand_fim_processing.arn + schism_fim_processing_arn = module.viz-lambda-functions.schism_fim_processing.arn + initialize_pipeline_arn = module.viz-lambda-functions.initialize_pipeline.arn + rnr_domain_generator_arn = module.rnr-lambda-functions.rnr_domain_generator.arn + email_sns_topics = module.sns.email_sns_topics + aws_instances_to_reboot = [module.rnr.ec2.id] + fifteen_minute_trigger = module.eventbridge.fifteen_minute_eventbridge viz_processing_pipeline_log_group = module.cloudwatch.viz_processing_pipeline_log_group.name } diff --git a/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml b/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml index 40085289..5eb9ff0f 100644 --- a/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml +++ b/Source/Visualizations/aws_loosa/deploy/pipelines_config.yml @@ -1,2 +1 @@ ADDED: - - ana_anomaly diff --git a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql b/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql deleted file mode 100644 index 4c557421..00000000 --- a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/ana_anomaly.sql +++ /dev/null @@ -1,14 +0,0 @@ -DROP TABLE IF EXISTS publish.ana_anomaly; -SELECT - channels.strm_order, - channels.name, - channels.huc6, - channels.nwm_vers, - channels.geom, - anom.*, - anom.feature_id::TEXT AS feature_id_str, - to_char(now()::timestamp without time zone, 'YYYY-MM-DD HH24:MI:SS UTC') AS update_time -INTO publish.ana_anomaly -FROM derived.channels_conus AS channels -JOIN cache.ana_anomaly AS anom ON channels.feature_id = anom.feature_id; -DROP TABLE IF EXISTS cache.ana_anomaly; \ No newline at end of file diff --git a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml b/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml deleted file mode 100644 index 98931a1f..00000000 --- a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/pipeline.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: ana_anomaly -dataset: - uris: - - s3-https://$NWM_DATA_BUCKET$.s3.amazonaws.com/common/data/model/com/nwm/$NWM_DATAFLOW_VERSION$/nwm.{{datetime:%Y%m%d}}/analysis_assim/nwm.t{{datetime:%H}}z.analysis_assim.channel_rt.tm00.conus.nc - window: P14D - repeat: PT1H - repeat_ref_time: 0:00:00 - delay: PT55M - expect: PT1H35M - transfer: all - clean: True - acceptable_uris_missing: 5% -process: - script: .\process.py - interval_ref_time: 0:00:00 - interval: PT24H - timeout: PT2H diff --git a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py b/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py deleted file mode 100644 index a5f39cf3..00000000 --- a/Source/Visualizations/aws_loosa/pipelines/ana_anomaly/process.py +++ /dev/null @@ -1,107 +0,0 @@ -import sys -import time -import os -import re -import xarray -from datetime import datetime -import arcpy - -from aws_loosa.processes.base.aws_egis_process import AWSEgisPublishingProcess -from aws_loosa.consts.egis import PRIMARY_SERVER, NWM_FOLDER, VALID_TIME -from aws_loosa.products.anomaly import anomaly -from aws_loosa.utils.shared_funcs import create_service_db_tables - - -class AnaAnomaly(AWSEgisPublishingProcess): - service_name = 'ana_anomaly' - - def _process(self, a_event_time, a_input_files, a_output_location, *args, **kwargs): - """ - Override this method to execute the post processing steps. - - Args: - a_event_time(datetime.datetime): the datetime associated with the current processing run. - a_input_files(list): a list of absolute paths to the required input files. - a_output_location(str): the location to be used to write output from the process. - - Returns: - bool: True if processing finished successfully and False if it encountered an error. - """ - - # Execute script - start_time = time.time() - arcpy.AddMessage("Running Streamflow Anomaly v2.0:") - - day7_list = [] - day14_list = [] - day7_threshold = a_event_time - datetime.timedelta(days=7) - day14_threshold = a_event_time - datetime.timedelta(days=14) - latest_file = None - latest_date = None - - # With file threshold being used, we cant assume the first half of the files are for 7 days - for file in a_input_files: - re_str = r"(\d{4})(\d{2})(\d{2})-analysis_assim.nwm.t(\d{2})z.analysis_assim.channel_rt.tm00.conus.nc" - findings = re.findall(re_str, file.replace("\\", "."))[0] - file_date = datetime.datetime( - year=int(findings[0]), month=int(findings[1]), day=int(findings[2]), hour=int(findings[3]) - ) - - if day14_threshold <= file_date <= a_event_time: - day14_list.append(file) - - if day7_threshold <= file_date <= a_event_time: - day7_list.append(file) - - if not latest_file: - latest_file = file - latest_date = file_date - elif file_date > latest_date: - latest_file = file - latest_date = file_date - - ds_latest = xarray.open_dataset(latest_file) - - channel_rt_date_time = a_event_time - self._log.info("Calculating 7 day anomaly") - df_7_day_anomaly = anomaly(day7_list, channel_rt_date_time, anomaly_config=7) - - self._log.info("Calculating 14 day anomaly") - df_14_day_anomaly = anomaly(day14_list, channel_rt_date_time, anomaly_config=14) - - self._log.info("Joining 7day and 14day dataframes") - df = df_7_day_anomaly.join(df_14_day_anomaly[['average_flow_14day', 'anom_cat_14day']]) - - self._log.info("Adding latest data") - df['latest_flow'] = (ds_latest.streamflow * 35.3147).round(2) - df['reference_time'] = latest_date.strftime("%Y-%m-%d %H:%M:%S UTC") - df['valid_time'] = latest_date.strftime("%Y-%m-%d %H:%M:%S UTC") - - self._log.info("Removing unnecessary rows") - df = df.loc[~((df['average_flow_7day'] == 0) & (df['average_flow_14day'] == 0) & (df['prcntle_95'] == 0))] - df = df.loc[~((df['anom_cat_7day'] == 'Normal (26th - 75th)') & (df['anom_cat_14day'] == 'Normal (26th - 75th)'))] # noqa: E501 - df = df.reset_index() - - sql_files = [os.path.join(os.path.dirname(__file__), "ana_anomaly.sql")] - - service_table_names = ["ana_anomaly"] - - create_service_db_tables(df, self.service_name, sql_files, service_table_names, self.process_event_time, past_run=self.one_off) - - arcpy.AddMessage("All done!") - seconds = time.time() - start_time - minutes = str(round((seconds / 60.0), 2)) - arcpy.AddMessage("Run time: " + minutes + " minutes") - - -if __name__ == '__main__': - # Get Args - forecast_date = sys.argv[1] - input_files = sys.argv[2] - log_directory = sys.argv[3] - next_forecast_date = sys.argv[4] - - # Create process - p = AnaAnomaly(a_log_directory=log_directory, a_log_level='INFO') - success = p.execute(forecast_date, input_files, next_forecast_date) - exit(0 if success else 1) diff --git a/Source/Visualizations/aws_loosa/products/anomaly.py b/Source/Visualizations/aws_loosa/products/anomaly.py deleted file mode 100644 index 4296e193..00000000 --- a/Source/Visualizations/aws_loosa/products/anomaly.py +++ /dev/null @@ -1,135 +0,0 @@ -import arcpy -import numpy as np -import pandas as pd -import xarray as xr -from warnings import filterwarnings - -# Import Authoritative Datasets and Constants -from aws_loosa.consts import (INSUFFICIENT_DATA_ERROR_CODE) -from aws_loosa.consts.paths import (PERCENTILE_TABLE_5TH, PERCENTILE_TABLE_10TH, - PERCENTILE_TABLE_25TH, PERCENTILE_TABLE_75TH, - PERCENTILE_TABLE_90TH, PERCENTILE_TABLE_95TH, - PERCENTILE_14_TABLE_5TH, PERCENTILE_14_TABLE_10TH, - PERCENTILE_14_TABLE_25TH, PERCENTILE_14_TABLE_75TH, - PERCENTILE_14_TABLE_90TH, PERCENTILE_14_TABLE_95TH) - - -filterwarnings("ignore") - -arcpy.env.overwriteOutput = True - - -def anomaly(channel_rt_files_list, channel_rt_date_time, anomaly_config=7): - """ - This function calculates the 7-day streamflow average for all reaches and classifies these into anomaly categories. - - Args: - channel_rt_files_list (list): A list of all National Water Model (NWM) channel_rt files for each hour over the - past 7 days. - channel_rt_date_time (datetime object): The date and time of interest. - anomaly_config (int): Day configuration to use for percentiles (i.e. 7 day config, 14 day config, etc). - - Returns: - anomaly_array (array): An array of (Feature ID, Product Version, Valid Time, Anomaly Category, 95th 7-day - Streamflow Percentile, 90th 7-day Streamflow Percentile, 75th 7-day Streamflow Percentile, 25th 7-day - Streamflow Percentile, 10th 7-day Streamflow Percentile, 5th 7-day Streamflow Percentile) groups for the - date and time of interest. - """ - average_flow_col = f'average_flow_{anomaly_config}day' - anom_col = f'anom_cat_{anomaly_config}day' - - if anomaly_config == 7: - percentile_5 = PERCENTILE_TABLE_5TH - percentile_10 = PERCENTILE_TABLE_10TH - percentile_25 = PERCENTILE_TABLE_25TH - percentile_75 = PERCENTILE_TABLE_75TH - percentile_90 = PERCENTILE_TABLE_90TH - percentile_95 = PERCENTILE_TABLE_95TH - elif anomaly_config == 14: - percentile_5 = PERCENTILE_14_TABLE_5TH - percentile_10 = PERCENTILE_14_TABLE_10TH - percentile_25 = PERCENTILE_14_TABLE_25TH - percentile_75 = PERCENTILE_14_TABLE_75TH - percentile_90 = PERCENTILE_14_TABLE_90TH - percentile_95 = PERCENTILE_14_TABLE_95TH - else: - raise Exception("Anomaly config must be 7 or 14 for the appropriate percentile files") - - # Import Feature IDs - arcpy.AddMessage("-->Looping through files to get streamflow sum") - df = pd.DataFrame() - for file in channel_rt_files_list: - ds_file = xr.open_dataset(file) - df_file = ds_file['streamflow'].to_dataframe() - df_file['streamflow'] = df_file['streamflow'] * 35.3147 # convert streamflow from cms to cfs - - if df.empty: - df = df_file - df = df.rename(columns={"streamflow": "streamflow_sum"}) - else: - df['streamflow_sum'] += df_file['streamflow'] - - df[average_flow_col] = df['streamflow_sum'] / len(channel_rt_files_list) - df = df.drop(columns=['streamflow_sum']) - df[average_flow_col] = df[average_flow_col].round(2) - - # Import Percentile Data - arcpy.AddMessage("-->Importing percentile data:") - - date = int(channel_rt_date_time.strftime("%j")) - 1 # retrieves the date in integer form from channel_rt_date_time - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 5th percentiles...") - ds_perc = xr.open_dataset(percentile_5) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_5"}) - df_perc['prcntle_5'] = (df_perc['prcntle_5'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 10th percentiles...") - ds_perc = xr.open_dataset(percentile_10) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_10"}) - df_perc['prcntle_10'] = (df_perc['prcntle_10'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 25th percentiles...") - ds_perc = xr.open_dataset(percentile_25) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_25"}) - df_perc['prcntle_25'] = (df_perc['prcntle_25'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 75th percentiles...") - ds_perc = xr.open_dataset(percentile_75) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_75"}) - df_perc['prcntle_75'] = (df_perc['prcntle_75'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 90th percentiles...") - ds_perc = xr.open_dataset(percentile_90) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_90"}) - df_perc['prcntle_90'] = (df_perc['prcntle_90'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage(f"---->Retrieving {anomaly_config} day 95th percentiles...") - ds_perc = xr.open_dataset(percentile_95) - df_perc = ds_perc.sel(time=date)['streamflow'].to_dataframe() - df_perc = df_perc.rename(columns={"streamflow": "prcntle_95"}) - df_perc['prcntle_95'] = (df_perc['prcntle_95'] * 35.3147).round(2) # convert streamflow from cms to cfs - df = df.join(df_perc) - - arcpy.AddMessage("---->Creating percentile dictionary...") - df[anom_col] = np.nan - df.loc[(df[average_flow_col] >= df['prcntle_95']) & df[anom_col].isna(), anom_col] = "High (> 95th)" - df.loc[(df[average_flow_col] >= df['prcntle_90']) & df[anom_col].isna(), anom_col] = "Much Above Normal (91st - 95th)" # noqa: E501 - df.loc[(df[average_flow_col] >= df['prcntle_75']) & df[anom_col].isna(), anom_col] = "Above Normal (76th - 90th)" - df.loc[(df[average_flow_col] >= df['prcntle_25']) & df[anom_col].isna(), anom_col] = "Normal (26th - 75th)" - df.loc[(df[average_flow_col] >= df['prcntle_10']) & df[anom_col].isna(), anom_col] = "Below Normal (11th - 25th))" - df.loc[(df[average_flow_col] >= df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Much Below Normal (6th - 10th)" - df.loc[(df[average_flow_col] < df['prcntle_5']) & df[anom_col].isna(), anom_col] = "Low (<= 5th)" - df.loc[df[anom_col].isna(), anom_col] = "Insufficient Data Available" - df = df.replace(round(INSUFFICIENT_DATA_ERROR_CODE * 35.3147, 2), "No Data") - - return df