Skip to content

Commit

Permalink
a first test to open the path to glory
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeh committed Dec 7, 2024
1 parent 43c3585 commit 470acc7
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 36 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/testing-data-import.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Data Import Tests

on:
push:
paths:
- 'data/**'

workflow_dispatch:

jobs:
data-import-test:
name: Data Import Tests
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
working-directory: data
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Data Import tests
working-directory: data/test
run: |
pytest
Empty file added data/__init__.py
Empty file.
131 changes: 95 additions & 36 deletions data/indicator_coefficient_importer/indicator_coefficient_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import json
import logging
import os
import time
from io import StringIO
from pathlib import Path
from typing import List, Dict, Any

from dotenv import load_dotenv
import boto3

Expand Down Expand Up @@ -77,61 +80,117 @@ def copy_data_to_table(conn: connection, df: pd.DataFrame, indicator_id: str):
buffer.seek(0)
with conn:
with conn.cursor() as cursor:
start_time = time.perf_counter()
logging.info(f"Deleting existing records for indicator with code {indicator_id}...")
cursor.execute(
'delete from indicator_coefficient where "indicatorId" = %s',
(indicator_id,),
)
log.info(f"Copying {len(df)} records into indicator_coefficient...")
end_time = time.perf_counter()
logging.info(f"Deleted existing records in {end_time - start_time:0.4f} seconds")
log.info(f"Copying {len(df)} records into indicator_coefficient for indicator with ID {indicator_id}...")
cursor.copy_from(
buffer,
"indicator_coefficient",
sep=",",
columns=df.columns,
null="NULL",
)
log.info("Done!")
log.info(f"Imported indicator coefficients for indicator with ID {indicator_id}")


# @click.command()
# @click.argument("file", type=click.Path(exists=True, path_type=Path))
# @click.argument("indicator_code", type=str)
# @click.argument("year", type=int)
def main():
"""Process and ingest csv data with per country data into indicator_coefficient table."""
# def remove_downloaded_files(downloaded_files: list[str]):


def download_indicator_coefficient_files(files_to_download: list[str]) -> list[str]:
log.info(f"Downloading files: {files_to_download}")
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

indicator_config = json.loads(os.getenv('INDICATOR_COEFFICIENT_CONFIG'))
files_to_download = [{"name": key, **value} for key, value in indicator_config.items()]
print(files_to_download)
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
for file in files_to_download:
s3.download_file(Bucket='landgriffon-raw-data', Key=file['file'], Filename=file['file'].split('/')[-1])
downloaded_files = []
print(files_to_download)
try:
for file in files_to_download:
file_name = file.split('/')[-1]
log.info(f"Downloading file: {file_name}")
s3.download_file(Bucket='landgriffon-raw-data', Key=file, Filename=file_name)
if os.path.exists(file_name):
downloaded_files.append(file_name)
else:
raise Exception(f"Error downloading file: {file_name}")
return downloaded_files
except Exception as e:
log.error(f"Error downloading files: {e}")
raise Exception('There was some error downloading the files. Aborting import process')
finally:
if len(downloaded_files) != len(files_to_download):
log.info('Cleaning up downloaded files...')
for downloaded_file in downloaded_files:
os.remove(downloaded_file)
log.info(f"Deleted file: {downloaded_file}")
log.info('All files downloaded successfully')


def load_indicator_config() -> list[dict[str, Any]]:
try:
indicator_config_json = os.getenv('INDICATOR_COEFFICIENT_CONFIG')
print(indicator_config_json)

if not indicator_config_json:
raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' is missing or empty. Aborting.")
parsed_config = json.loads(indicator_config_json)

if not isinstance(parsed_config, dict):
raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' does not contain valid JSON data.")
indicator_config = [{"name": key, **value} for key, value in parsed_config.items()]

return indicator_config

except json.JSONDecodeError:
log.error("Invalid JSON format in 'INDICATOR_COEFFICIENT_CONFIG'.")
raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' must be a valid JSON string.")
except Exception as e:
log.error(f"Error loading configuration: {e}")
raise

return 'hey'
data = load_data(file, year)

conn = postgres_thread_pool.getconn()
def main():
"""Process and ingest csv data with per country data into indicator_coefficient table."""
indicator_config = load_indicator_config()
files_to_download = [file_config['file'] for file_config in indicator_config]
## TODO: We might want to cleanup downloaded stuff regardless the success of the import. But since IRL
## this runs on a pod that will be wipeout after the job has run, we can skip this for now, we are in a rush
downloaded_files = download_indicator_coefficient_files(files_to_download)

with conn:
with conn.cursor() as cursor:
cursor.execute(
"""select id from indicator where indicator."nameCode" = %s;""",
(indicator_code,),
)
indicator_id = cursor.fetchone()[0]
# add admin region ID to dataframe so we can insert all rows at once
# without having to query for the IDs every time
admin_region_ids = get_admin_region_ids_from_countries(conn, list(data.country.unique()))
# same with material ID
material_ids = get_material_ids_from_hs_codes(conn, data.hs_2017_code.astype(str).to_list())

data = pd.merge(data, admin_region_ids, on="country", how="left")
data = pd.merge(data, material_ids, on="hs_2017_code", how="left")
data["indicatorId"] = indicator_id
data_to_insert = data[["value", "year", "adminRegionId", "indicatorId", "materialId"]]
copy_data_to_table(conn, data_to_insert, indicator_id)
postgres_thread_pool.putconn(conn, close=True)
conn = postgres_thread_pool.getconn()
for indicator in indicator_config:
file = indicator['file'].split('/')[-1]
year = indicator['year']
indicator_code = indicator['indicator_code']
data = load_data(filename=file, year=year)
with conn:
with conn.cursor() as cursor:
cursor.execute(
"""select id from indicator where indicator."nameCode" = %s;""",
(indicator_code,),
)
indicator_id = cursor.fetchone()[0]
# add admin region ID to dataframe so we can insert all rows at once
# without having to query for the IDs every time
admin_region_ids = get_admin_region_ids_from_countries(conn, list(data.country.unique()))
# same with material ID
material_ids = get_material_ids_from_hs_codes(conn, data.hs_2017_code.astype(str).to_list())

data = pd.merge(data, admin_region_ids, on="country", how="left")
data = pd.merge(data, material_ids, on="hs_2017_code", how="left")
data["indicatorId"] = indicator_id
data_to_insert = data[["value", "year", "adminRegionId", "indicatorId", "materialId"]]
copy_data_to_table(conn, data_to_insert, indicator_id)

postgres_thread_pool.putconn(conn)
log.info("Done!!")

return


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions data/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[pytest]
testpaths = test
python_files = test_*.py
python_functions = test_*
8 changes: 8 additions & 0 deletions data/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,11 @@ urllib3==2.2.1
# via
# botocore
# requests
python-dotenv==1.0.1
exceptiongroup==1.2.2
iniconfig==2.0.0
pluggy==1.5.0
pytest==8.3.4
tomli==2.2.1


Empty file added data/test/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions data/test/test_indicator_coefficients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import json
import os

from data.indicator_coefficient_importer.indicator_coefficient_importer import load_indicator_config


def test_load_indicator_config(pytest=None):
# Test case 1: Test with valid JSON data
os.environ['INDICATOR_COEFFICIENT_CONFIG'] = json.dumps({"test": {"file": "path/file.csv", "indicator_code": "AA"}})
expected_output = [{"name": "test", "file": "path/file.csv", "indicator_code": "AA"}]
real_output = load_indicator_config()
assert real_output == expected_output

# Test case 2: Test with invalid JSON data
os.environ['INDICATOR_COEFFICIENT_CONFIG'] = "invalid_json"
try:
load_indicator_config()
except ValueError as e:
assert str(e) == "Environment variable 'INDICATOR_COEFFICIENT_CONFIG' must be a valid JSON string."
# Test case 3: Test with empty JSON data
os.environ['INDICATOR_COEFFICIENT_CONFIG'] = ""
try:
load_indicator_config()
except ValueError as e:
assert str(e) == "Environment variable 'INDICATOR_COEFFICIENT_CONFIG' is missing or empty. Aborting."

0 comments on commit 470acc7

Please sign in to comment.