From 352f55698aa0aa08e2f8424d1cb453e7044c3cbd Mon Sep 17 00:00:00 2001 From: Dhanushree Date: Tue, 21 Jan 2025 06:16:45 +0000 Subject: [PATCH 1/3] date column is added to output file --- .../us_fema/national_risk_index/manifest.json | 21 ++++ .../national_risk_index/process_data.py | 119 +++++++++++++----- 2 files changed, 112 insertions(+), 28 deletions(-) create mode 100644 scripts/us_fema/national_risk_index/manifest.json diff --git a/scripts/us_fema/national_risk_index/manifest.json b/scripts/us_fema/national_risk_index/manifest.json new file mode 100644 index 0000000000..32a4070174 --- /dev/null +++ b/scripts/us_fema/national_risk_index/manifest.json @@ -0,0 +1,21 @@ +{ + "import_specifications": [ + { + "import_name": "USFEMA_NationalRiskIndex", + "curator_emails": ["dhhhanushree@google.com"], + "provenance_url": "https://hazards.fema.gov/nri/data-resources", + "provenance_description": "USFEMA National RiskIndex", + "scripts": ["sh/e2e.sh"], + "import_inputs": [{ + "template_mcf": "output/fema_nri_counties.tmcf", + "cleaned_csv": "output/nri_tracts_table_March 2023.csv" + }, + { + "template_mcf": "output/fema_nri_counties.tmcf", + "cleaned_csv": "output/nri_counties_table_March 2023.csv" + }], + "cron_schedule": "0 4 2 * * *" + } + ] +} + diff --git a/scripts/us_fema/national_risk_index/process_data.py b/scripts/us_fema/national_risk_index/process_data.py index 9a5618c984..3a8c6aac45 100644 --- a/scripts/us_fema/national_risk_index/process_data.py +++ b/scripts/us_fema/national_risk_index/process_data.py @@ -1,10 +1,26 @@ import json +import os import pandas as pd import numpy as np - +import logging +from pathlib import Path +from absl import app +from absl import flags +# Configure logging +logging.basicConfig( + filename="file_errors.log", # Log file name + level=logging.ERROR, # Log level + format="%(asctime)s - %(levelname)s - %(message)s", # Log format +) + +FLAGS = flags.FLAGS +flags.DEFINE_string('output_path', 'output', 'The local path to download the files') + +# if not os.path.exists(FLAGS.output_path): +# pathlib.Path(FLAGS.output_path).mkdir(parents=True, exist_ok=True) INPUT_TO_OUTPUT_PATHS = { - "source_data/NRI_Table_Counties.csv": "output/nri_counties_table.csv", - "source_data/NRI_Table_CensusTracts.csv": "output/nri_tracts_table.csv", + "source_data/NRI_Table_Counties.csv": "nri_counties_table.csv", + "source_data/NRI_Table_CensusTracts.csv": "nri_tracts_table.csv", } @@ -38,31 +54,78 @@ def fips_to_geoid(row): return "geoId/" + str(row[field]).zfill(length) +def rename_file (file_path,nri_ver): + file = Path(file_path) + new_file_path = file.stem + "_"+ nri_ver + file.suffix # Modify filename + new_file_path = file.with_name(new_file_path) + return new_file_path -def process_csv(input_path, output_path, csv_structure_f): - data_table = pd.read_csv(input_path) - - # TODO: interpret empty values. semantics of empty values is described in - # Table 2 of the Technical Documentation available at: - # https://www.fema.gov/sites/default/files/documents/fema_national-risk-index_technical-documentation.pdf - - # the column structure should be the same between the county and tract tables - # so we normalize it with the list of fields "csv_structure" - with open(csv_structure_f, "r") as json_file: - csv_structure = json.load(json_file) - normalized_table = data_table[csv_structure] - - # - after the structure is normalized, add the DCID_GeoID field to the first location - # - the TMCF generated in generate_schema_and_tmcf.py expect to find the - # geoID in the field "DCID_GeoID" - normalized_table.insert(0, "DCID_GeoID", - data_table.apply(fips_to_geoid, axis=1)) - - normalized_table.to_csv(output_path) - - -if __name__ == "__main__": +def process_csv(input_path, output_path, csv_structure_f): + try: + data_table = pd.read_csv(input_path) + nri_ver =data_table["NRI_VER"].iloc[1] + + # TODO: interpret empty values. semantics of empty values is described in + # Table 2 of the Technical Documentation available at: + # https://www.fema.gov/sites/default/files/documents/fema_national-risk-index_technical-documentation.pdf + + # the column structure should be the same between the county and tract tables + # so we normalize it with the list of fields "csv_structure" + with open(csv_structure_f, "r") as json_file: + csv_structure = json.load(json_file) + normalized_table = data_table[csv_structure] + normalized_table["OBSER_DATE"] = nri_ver + + # - after the structure is normalized, add the DCID_GeoID field to the first location + # - the TMCF generated in generate_schema_and_tmcf.py expect to find the + # geoID in the field "DCID_GeoID" + normalized_table.insert(0, "DCID_GeoID", + data_table.apply(fips_to_geoid, axis=1)) + + new_output_path = rename_file(output_path, nri_ver) + normalized_table.to_csv(new_output_path) + new_input_file =rename_file(input_path, nri_ver) + os.rename(input_path, new_input_file) + + print("****************************",new_input_file) + # for filename in os.listdir(folder_path): + # if filename.endswith(".csv"): + # old_filepath = os.path.join(folder_path, filename) + # new_filename = filename + suffix + ".csv" # Add suffix before existing extension + # new_filepath = os.path.join(folder_path, new_filename) + # try: + # os.rename(old_filepath, new_filepath) + # print(f"Renamed {old_filepath} to {new_filepath}") + # except OSError as e: + # print(f"Error renaming {old_filepath}: {e}") + + + + except FileNotFoundError as e: + logging.error(f"FileNotFoundError: {e}") + print(f"Error: The file '{input_path}' was not found.") + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + print(f"An unexpected error occurred: {e}") + +def main(argv): + # path = "source_data/" + # input_files =os.listdir(path) + # file_name = "source_data/" + input_files[0] + # data_table = pd.read_csv(file_name) + # nri_ver =data_table["NRI_VER"].iloc[1] + # for files in input_files: + # file = Path(files) + # new_input_path = file.stem + "_"+ nri_ver + file.suffix # Modify filename + # new_input_path = file.with_name(new_input_path) + # print(new_input_path) + print(FLAGS.output_path) + for input_path in INPUT_TO_OUTPUT_PATHS: - output_path = INPUT_TO_OUTPUT_PATHS[input_path] - process_csv(input_path, output_path, "output/csv_columns.json") + + output_path_new = os.path.join(FLAGS.output_path, INPUT_TO_OUTPUT_PATHS[input_path]) + + process_csv(input_path, output_path_new, "output/csv_columns.json") +if __name__ == "__main__": + app.run(main) \ No newline at end of file From 2de6732c01f5bae99834e4c2f708ce2447c5a473 Mon Sep 17 00:00:00 2001 From: Dhanushree Date: Tue, 21 Jan 2025 06:56:33 +0000 Subject: [PATCH 2/3] removed commented code --- .../national_risk_index/process_data.py | 29 +------------------ 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/scripts/us_fema/national_risk_index/process_data.py b/scripts/us_fema/national_risk_index/process_data.py index 3a8c6aac45..2fb5069384 100644 --- a/scripts/us_fema/national_risk_index/process_data.py +++ b/scripts/us_fema/national_risk_index/process_data.py @@ -87,21 +87,7 @@ def process_csv(input_path, output_path, csv_structure_f): normalized_table.to_csv(new_output_path) new_input_file =rename_file(input_path, nri_ver) os.rename(input_path, new_input_file) - - print("****************************",new_input_file) - # for filename in os.listdir(folder_path): - # if filename.endswith(".csv"): - # old_filepath = os.path.join(folder_path, filename) - # new_filename = filename + suffix + ".csv" # Add suffix before existing extension - # new_filepath = os.path.join(folder_path, new_filename) - # try: - # os.rename(old_filepath, new_filepath) - # print(f"Renamed {old_filepath} to {new_filepath}") - # except OSError as e: - # print(f"Error renaming {old_filepath}: {e}") - - except FileNotFoundError as e: logging.error(f"FileNotFoundError: {e}") print(f"Error: The file '{input_path}' was not found.") @@ -110,22 +96,9 @@ def process_csv(input_path, output_path, csv_structure_f): print(f"An unexpected error occurred: {e}") def main(argv): - # path = "source_data/" - # input_files =os.listdir(path) - # file_name = "source_data/" + input_files[0] - # data_table = pd.read_csv(file_name) - # nri_ver =data_table["NRI_VER"].iloc[1] - # for files in input_files: - # file = Path(files) - # new_input_path = file.stem + "_"+ nri_ver + file.suffix # Modify filename - # new_input_path = file.with_name(new_input_path) - # print(new_input_path) - print(FLAGS.output_path) - for input_path in INPUT_TO_OUTPUT_PATHS: - output_path_new = os.path.join(FLAGS.output_path, INPUT_TO_OUTPUT_PATHS[input_path]) - process_csv(input_path, output_path_new, "output/csv_columns.json") + if __name__ == "__main__": app.run(main) \ No newline at end of file From ba79d7fcb3b8c57aa8cab6d87aa8e0ab54dfb02d Mon Sep 17 00:00:00 2001 From: Dhanushree Date: Fri, 31 Jan 2025 09:49:59 +0000 Subject: [PATCH 3/3] uploading files to gcs --- .../us_fema/national_risk_index/manifest.json | 4 +- .../national_risk_index/process_data.py | 77 +++++++++++++------ 2 files changed, 57 insertions(+), 24 deletions(-) diff --git a/scripts/us_fema/national_risk_index/manifest.json b/scripts/us_fema/national_risk_index/manifest.json index 32a4070174..19aa051cbd 100644 --- a/scripts/us_fema/national_risk_index/manifest.json +++ b/scripts/us_fema/national_risk_index/manifest.json @@ -8,11 +8,11 @@ "scripts": ["sh/e2e.sh"], "import_inputs": [{ "template_mcf": "output/fema_nri_counties.tmcf", - "cleaned_csv": "output/nri_tracts_table_March 2023.csv" + "cleaned_csv": "" }, { "template_mcf": "output/fema_nri_counties.tmcf", - "cleaned_csv": "output/nri_counties_table_March 2023.csv" + "cleaned_csv": "" }], "cron_schedule": "0 4 2 * * *" } diff --git a/scripts/us_fema/national_risk_index/process_data.py b/scripts/us_fema/national_risk_index/process_data.py index 2fb5069384..b6f1d7b231 100644 --- a/scripts/us_fema/national_risk_index/process_data.py +++ b/scripts/us_fema/national_risk_index/process_data.py @@ -3,9 +3,16 @@ import pandas as pd import numpy as np import logging +import sys from pathlib import Path from absl import app from absl import flags +from datetime import datetime +from google.cloud import storage +_MODULE_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_MODULE_DIR, '../../../util/')) +import file_util +from io import StringIO # Configure logging logging.basicConfig( filename="file_errors.log", # Log file name @@ -14,10 +21,8 @@ ) FLAGS = flags.FLAGS -flags.DEFINE_string('output_path', 'output', 'The local path to download the files') +flags.DEFINE_string('output_path', 'gs://unresolved_mcf/us_fema/national_risk_index/latest/', 'The local path to download the files') -# if not os.path.exists(FLAGS.output_path): -# pathlib.Path(FLAGS.output_path).mkdir(parents=True, exist_ok=True) INPUT_TO_OUTPUT_PATHS = { "source_data/NRI_Table_Counties.csv": "nri_counties_table.csv", "source_data/NRI_Table_CensusTracts.csv": "nri_tracts_table.csv", @@ -54,40 +59,66 @@ def fips_to_geoid(row): return "geoId/" + str(row[field]).zfill(length) -def rename_file (file_path,nri_ver): +def convert_month_year(date_str): + """ + Converts a date string in the format "Month Year" to "YYYY_MM". + + Args: + date_str: The input date string (e.g., "March 2023"). + + Returns: + The converted date string in the format "YYYY_MM". + """ + try: + date_obj = datetime.strptime(date_str, '%B %Y') # Parse the input string + year = str(date_obj.year) + month = str(date_obj.month).zfill(2) # Pad single-digit months with zero + return f"{year}-{month}" + except ValueError: + print(f"Invalid date format: {date_str}") + return None + +def rename_file (file_path, nri_ver): file = Path(file_path) new_file_path = file.stem + "_"+ nri_ver + file.suffix # Modify filename new_file_path = file.with_name(new_file_path) return new_file_path -def process_csv(input_path, output_path, csv_structure_f): +def rename_gcs_file_path(gcs_path, nri_vers): + file_name_with_ext = gcs_path.split("/")[-1] + file_name, ext = file_name_with_ext.rsplit(".", 1) + new_file_name_with_ext = f"{file_name}_{nri_vers}.{ext}" + new_gcs_path = gcs_path.replace(file_name_with_ext, new_file_name_with_ext) + return new_gcs_path + +def process_csv(input_path, output_path, csv_structure_f, out_put_file_name): try: data_table = pd.read_csv(input_path) nri_ver =data_table["NRI_VER"].iloc[1] - - # TODO: interpret empty values. semantics of empty values is described in - # Table 2 of the Technical Documentation available at: - # https://www.fema.gov/sites/default/files/documents/fema_national-risk-index_technical-documentation.pdf + + # TODO: interpret empty values. semantics of empty values is described in + # Table 2 of the Technical Documentation available at: + # https://www.fema.gov/sites/default/files/documents/fema_national-risk-index_technical-documentation.pdf - # the column structure should be the same between the county and tract tables - # so we normalize it with the list of fields "csv_structure" + # the column structure should be the same between the county and tract tables + # so we normalize it with the list of fields "csv_structure" with open(csv_structure_f, "r") as json_file: csv_structure = json.load(json_file) normalized_table = data_table[csv_structure] - normalized_table["OBSER_DATE"] = nri_ver + nri_month_year = convert_month_year(nri_ver) + normalized_table["OBSER_DATE"] = nri_month_year - # - after the structure is normalized, add the DCID_GeoID field to the first location - # - the TMCF generated in generate_schema_and_tmcf.py expect to find the - # geoID in the field "DCID_GeoID" + # - after the structure is normalized, add the DCID_GeoID field to the first location + # - the TMCF generated in generate_schema_and_tmcf.py expect to find the + # geoID in the field "DCID_GeoID" normalized_table.insert(0, "DCID_GeoID", data_table.apply(fips_to_geoid, axis=1)) - - new_output_path = rename_file(output_path, nri_ver) - normalized_table.to_csv(new_output_path) - new_input_file =rename_file(input_path, nri_ver) + new_output_path = rename_gcs_file_path(output_path, nri_month_year) + normalized_table.to_csv(out_put_file_name) + file_util.file_copy(out_put_file_name, new_output_path) + new_input_file =rename_file(input_path, nri_month_year) os.rename(input_path, new_input_file) - except FileNotFoundError as e: logging.error(f"FileNotFoundError: {e}") print(f"Error: The file '{input_path}' was not found.") @@ -96,9 +127,11 @@ def process_csv(input_path, output_path, csv_structure_f): print(f"An unexpected error occurred: {e}") def main(argv): + for input_path in INPUT_TO_OUTPUT_PATHS: - output_path_new = os.path.join(FLAGS.output_path, INPUT_TO_OUTPUT_PATHS[input_path]) - process_csv(input_path, output_path_new, "output/csv_columns.json") + out_put_file_name = INPUT_TO_OUTPUT_PATHS[input_path] + output_path_new =FLAGS.output_path.rstrip("/") + "/" + out_put_file_name + process_csv(input_path, output_path_new, "output/csv_columns.json", str(out_put_file_name)) if __name__ == "__main__": app.run(main) \ No newline at end of file