From 470acc7cd479346f28d586c9002d0150c63059ea Mon Sep 17 00:00:00 2001 From: alexeh Date: Sat, 7 Dec 2024 07:41:30 +0100 Subject: [PATCH] a first test to open the path to glory --- .github/workflows/testing-data-import.yml | 29 ++++ data/__init__.py | 0 .../indicator_coefficient_importer.py | 131 +++++++++++++----- data/pytest.ini | 4 + data/requirements.txt | 8 ++ data/test/__init__.py | 0 data/test/test_indicator_coefficients.py | 25 ++++ 7 files changed, 161 insertions(+), 36 deletions(-) create mode 100644 .github/workflows/testing-data-import.yml create mode 100644 data/__init__.py create mode 100644 data/pytest.ini create mode 100644 data/test/__init__.py create mode 100644 data/test/test_indicator_coefficients.py diff --git a/.github/workflows/testing-data-import.yml b/.github/workflows/testing-data-import.yml new file mode 100644 index 000000000..bb3afc0dd --- /dev/null +++ b/.github/workflows/testing-data-import.yml @@ -0,0 +1,29 @@ +name: Data Import Tests + +on: + push: + paths: + - 'data/**' + + workflow_dispatch: + +jobs: + data-import-test: + name: Data Import Tests + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: "3.10" + - name: Install dependencies + working-directory: data + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Data Import tests + working-directory: data/test + run: | + pytest diff --git a/data/__init__.py b/data/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/data/indicator_coefficient_importer/indicator_coefficient_importer.py b/data/indicator_coefficient_importer/indicator_coefficient_importer.py index fa4bfab08..9f0c9c295 100644 --- a/data/indicator_coefficient_importer/indicator_coefficient_importer.py +++ b/data/indicator_coefficient_importer/indicator_coefficient_importer.py @@ -12,8 +12,11 @@ import json import logging import os +import time from io import StringIO from pathlib import Path +from typing import List, Dict, Any + from dotenv import load_dotenv import boto3 @@ -77,11 +80,15 @@ def copy_data_to_table(conn: connection, df: pd.DataFrame, indicator_id: str): buffer.seek(0) with conn: with conn.cursor() as cursor: + start_time = time.perf_counter() + logging.info(f"Deleting existing records for indicator with code {indicator_id}...") cursor.execute( 'delete from indicator_coefficient where "indicatorId" = %s', (indicator_id,), ) - log.info(f"Copying {len(df)} records into indicator_coefficient...") + end_time = time.perf_counter() + logging.info(f"Deleted existing records in {end_time - start_time:0.4f} seconds") + log.info(f"Copying {len(df)} records into indicator_coefficient for indicator with ID {indicator_id}...") cursor.copy_from( buffer, "indicator_coefficient", @@ -89,49 +96,101 @@ def copy_data_to_table(conn: connection, df: pd.DataFrame, indicator_id: str): columns=df.columns, null="NULL", ) - log.info("Done!") + log.info(f"Imported indicator coefficients for indicator with ID {indicator_id}") -# @click.command() -# @click.argument("file", type=click.Path(exists=True, path_type=Path)) -# @click.argument("indicator_code", type=str) -# @click.argument("year", type=int) -def main(): - """Process and ingest csv data with per country data into indicator_coefficient table.""" +# def remove_downloaded_files(downloaded_files: list[str]): + + +def download_indicator_coefficient_files(files_to_download: list[str]) -> list[str]: + log.info(f"Downloading files: {files_to_download}") aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') - - indicator_config = json.loads(os.getenv('INDICATOR_COEFFICIENT_CONFIG')) - files_to_download = [{"name": key, **value} for key, value in indicator_config.items()] - print(files_to_download) s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) - for file in files_to_download: - s3.download_file(Bucket='landgriffon-raw-data', Key=file['file'], Filename=file['file'].split('/')[-1]) + downloaded_files = [] + print(files_to_download) + try: + for file in files_to_download: + file_name = file.split('/')[-1] + log.info(f"Downloading file: {file_name}") + s3.download_file(Bucket='landgriffon-raw-data', Key=file, Filename=file_name) + if os.path.exists(file_name): + downloaded_files.append(file_name) + else: + raise Exception(f"Error downloading file: {file_name}") + return downloaded_files + except Exception as e: + log.error(f"Error downloading files: {e}") + raise Exception('There was some error downloading the files. Aborting import process') + finally: + if len(downloaded_files) != len(files_to_download): + log.info('Cleaning up downloaded files...') + for downloaded_file in downloaded_files: + os.remove(downloaded_file) + log.info(f"Deleted file: {downloaded_file}") + log.info('All files downloaded successfully') + + +def load_indicator_config() -> list[dict[str, Any]]: + try: + indicator_config_json = os.getenv('INDICATOR_COEFFICIENT_CONFIG') + print(indicator_config_json) + + if not indicator_config_json: + raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' is missing or empty. Aborting.") + parsed_config = json.loads(indicator_config_json) + + if not isinstance(parsed_config, dict): + raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' does not contain valid JSON data.") + indicator_config = [{"name": key, **value} for key, value in parsed_config.items()] + + return indicator_config + + except json.JSONDecodeError: + log.error("Invalid JSON format in 'INDICATOR_COEFFICIENT_CONFIG'.") + raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' must be a valid JSON string.") + except Exception as e: + log.error(f"Error loading configuration: {e}") + raise - return 'hey' - data = load_data(file, year) - conn = postgres_thread_pool.getconn() +def main(): + """Process and ingest csv data with per country data into indicator_coefficient table.""" + indicator_config = load_indicator_config() + files_to_download = [file_config['file'] for file_config in indicator_config] + ## TODO: We might want to cleanup downloaded stuff regardless the success of the import. But since IRL + ## this runs on a pod that will be wipeout after the job has run, we can skip this for now, we are in a rush + downloaded_files = download_indicator_coefficient_files(files_to_download) - with conn: - with conn.cursor() as cursor: - cursor.execute( - """select id from indicator where indicator."nameCode" = %s;""", - (indicator_code,), - ) - indicator_id = cursor.fetchone()[0] - # add admin region ID to dataframe so we can insert all rows at once - # without having to query for the IDs every time - admin_region_ids = get_admin_region_ids_from_countries(conn, list(data.country.unique())) - # same with material ID - material_ids = get_material_ids_from_hs_codes(conn, data.hs_2017_code.astype(str).to_list()) - - data = pd.merge(data, admin_region_ids, on="country", how="left") - data = pd.merge(data, material_ids, on="hs_2017_code", how="left") - data["indicatorId"] = indicator_id - data_to_insert = data[["value", "year", "adminRegionId", "indicatorId", "materialId"]] - copy_data_to_table(conn, data_to_insert, indicator_id) - postgres_thread_pool.putconn(conn, close=True) + conn = postgres_thread_pool.getconn() + for indicator in indicator_config: + file = indicator['file'].split('/')[-1] + year = indicator['year'] + indicator_code = indicator['indicator_code'] + data = load_data(filename=file, year=year) + with conn: + with conn.cursor() as cursor: + cursor.execute( + """select id from indicator where indicator."nameCode" = %s;""", + (indicator_code,), + ) + indicator_id = cursor.fetchone()[0] + # add admin region ID to dataframe so we can insert all rows at once + # without having to query for the IDs every time + admin_region_ids = get_admin_region_ids_from_countries(conn, list(data.country.unique())) + # same with material ID + material_ids = get_material_ids_from_hs_codes(conn, data.hs_2017_code.astype(str).to_list()) + + data = pd.merge(data, admin_region_ids, on="country", how="left") + data = pd.merge(data, material_ids, on="hs_2017_code", how="left") + data["indicatorId"] = indicator_id + data_to_insert = data[["value", "year", "adminRegionId", "indicatorId", "materialId"]] + copy_data_to_table(conn, data_to_insert, indicator_id) + + postgres_thread_pool.putconn(conn) + log.info("Done!!") + + return if __name__ == "__main__": diff --git a/data/pytest.ini b/data/pytest.ini new file mode 100644 index 000000000..547de63be --- /dev/null +++ b/data/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +testpaths = test +python_files = test_*.py +python_functions = test_* diff --git a/data/requirements.txt b/data/requirements.txt index 16cf38d33..b35dfc2f2 100644 --- a/data/requirements.txt +++ b/data/requirements.txt @@ -116,3 +116,11 @@ urllib3==2.2.1 # via # botocore # requests +python-dotenv==1.0.1 +exceptiongroup==1.2.2 +iniconfig==2.0.0 +pluggy==1.5.0 +pytest==8.3.4 +tomli==2.2.1 + + diff --git a/data/test/__init__.py b/data/test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/data/test/test_indicator_coefficients.py b/data/test/test_indicator_coefficients.py new file mode 100644 index 000000000..554743b24 --- /dev/null +++ b/data/test/test_indicator_coefficients.py @@ -0,0 +1,25 @@ +import json +import os + +from data.indicator_coefficient_importer.indicator_coefficient_importer import load_indicator_config + + +def test_load_indicator_config(pytest=None): + # Test case 1: Test with valid JSON data + os.environ['INDICATOR_COEFFICIENT_CONFIG'] = json.dumps({"test": {"file": "path/file.csv", "indicator_code": "AA"}}) + expected_output = [{"name": "test", "file": "path/file.csv", "indicator_code": "AA"}] + real_output = load_indicator_config() + assert real_output == expected_output + + # Test case 2: Test with invalid JSON data + os.environ['INDICATOR_COEFFICIENT_CONFIG'] = "invalid_json" + try: + load_indicator_config() + except ValueError as e: + assert str(e) == "Environment variable 'INDICATOR_COEFFICIENT_CONFIG' must be a valid JSON string." + # Test case 3: Test with empty JSON data + os.environ['INDICATOR_COEFFICIENT_CONFIG'] = "" + try: + load_indicator_config() + except ValueError as e: + assert str(e) == "Environment variable 'INDICATOR_COEFFICIENT_CONFIG' is missing or empty. Aborting."