From 57d8d2a2a89f4079faa2b5270df1b6cb0f8e0faa Mon Sep 17 00:00:00 2001 From: Brad Edwards Date: Thu, 16 May 2024 17:09:29 -0700 Subject: [PATCH] Add save summaries to data lake Update changelog and version for last merge --- .github/workflows/infra.yaml | 6 ++ CHANGELOG.md | 6 ++ infra/core/dev.tfvars | 4 +- .../tasks/save_summaries_to_datalake_task.py | 84 ++++++++++++++++++- .../airflow/dags/shared/utils/constants.py | 1 + 5 files changed, 98 insertions(+), 3 deletions(-) diff --git a/.github/workflows/infra.yaml b/.github/workflows/infra.yaml index 379f79f3..850dae7d 100644 --- a/.github/workflows/infra.yaml +++ b/.github/workflows/infra.yaml @@ -298,12 +298,15 @@ jobs: echo "arxiv_sets=$(awk -F'=' '/^arxiv_sets/ {gsub(/ /, "", $2); print $2}' infra/core/${{ env.ENV_NAME }}.tfvars)" >> $GITHUB_OUTPUT echo "aws_region=$(grep '^aws_region' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "backend_dynamodb_table=$(grep '^backend_dynamodb_table' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT + echo "create_intermediate_json_task_version=$(grep '^create_intermediate_json_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "data_ingestion_key_prefix=$(grep '^data_ingestion_key_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "etl_key_prefix=$(grep '^etl_key_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "fetch_from_arxiv_task_version=$(grep '^fetch_from_arxiv_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "infra_config_bucket=$(grep '^infra_config_bucket =' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "most_recent_research_records_version=$(grep '^most_recent_research_records_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "neo4j_connection_retries=$(grep '^neo4j_connection_retries' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT + echo "records_prefix=$(grep '^records_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT + echo "save_summaries_to_datalake_task_version=$(grep '^save_summaries_to_datalake_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT echo "terraform_outputs_prefix=$(grep '^terraform_outputs_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT - name: Set Region @@ -353,6 +356,7 @@ jobs: echo 'ARXIV_SETS=${{ steps.vars.outputs.arxiv_sets }}' >> .env echo "AWS_GLUE_REGISTRY_NAME=${{ steps.terraform_vars.outputs.aws_glue_registry_name }}" >> .env echo "AWS_REGION=${{ steps.vars.outputs.aws_region }}" >> .env + echo "CREATE_INTERMEDIATE_JSON_TASK_VERSION=${{ steps.vars.outputs.create_intermediate_json_task_version }}" >> .env echo "DATA_BUCKET=${{ steps.terraform_vars.outputs.data_bucket }}" >> .env echo "DATA_INGESTION_KEY_PREFIX=${{ steps.vars.outputs.data_ingestion_key_prefix }}" >> .env echo "ETL_KEY_PREFIX=${{ steps.vars.outputs.etl_key_prefix }}" >> .env @@ -362,6 +366,8 @@ jobs: echo "NEO4J_CONNECTION_RETRIES=${{ steps.vars.outputs.neo4j_connection_retries }}" >> .env echo "NEO4J_URI=neo4j://${{ steps.terraform_vars.outputs.neo4j_instance_private_ip }}:7687" >> .env echo "ORCHESTRATION_HOST_PRIVATE_IP=${{ steps.terraform_vars.outputs.orchestration_host_private_ip }}" >> .env + echo "RECORDS_PREFIX=${{ steps.vars.outputs.records_prefix }}" >> .env + echo "SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION=${{ steps.vars.outputs.save_summaries_to_datalake_task_version }}" >> .env cat .env echo "Copying file to s3..." aws s3 cp .env s3://${{ steps.vars.outputs.infra_config_bucket }}/orchestration/${{ env.ENV_NAME }}/airflow/dags/.env diff --git a/CHANGELOG.md b/CHANGELOG.md index 210eba33..ae22df70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.3.3-alpha] - 2024-05-16 + +### Changed + +- Use single host in dev for orchestration and neo4j + ## [0.3.2-alpha] - 2024-05-04 ### Added diff --git a/infra/core/dev.tfvars b/infra/core/dev.tfvars index 89995f91..d89f3cd3 100644 --- a/infra/core/dev.tfvars +++ b/infra/core/dev.tfvars @@ -3,7 +3,7 @@ # ********************************************************** app_name = "atomiklabs" -app_version = "0.3.2-alpha" +app_version = "0.3.3-alpha" availability_zones = ["us-east-1a", "us-east-1b", "us-east-1c"] aws_region = "us-east-1" backend_dynamodb_table = "terraform-state-locks" @@ -60,6 +60,8 @@ arxiv_api_max_retries = 5 arxiv_base_url = "http://export.arxiv.org/oai2" arxiv_ingestion_day_span = 5 arxiv_sets = ["cs"] +create_intermediate_json_task_version = "0.1.0" default_lambda_runtime = "python3.10" fetch_from_arxiv_task_version = "0.1.0" most_recent_research_records_version = "0.0.2" +save_summaries_to_datalake_task_version = "0.0.1" diff --git a/orchestration/airflow/dags/processing/tasks/save_summaries_to_datalake_task.py b/orchestration/airflow/dags/processing/tasks/save_summaries_to_datalake_task.py index 3ad80142..172c5fb5 100644 --- a/orchestration/airflow/dags/processing/tasks/save_summaries_to_datalake_task.py +++ b/orchestration/airflow/dags/processing/tasks/save_summaries_to_datalake_task.py @@ -1,7 +1,23 @@ +import json +import os from logging.config import dictConfig import structlog -from shared.utils.constants import LOGGING_CONFIG +from dotenv import load_dotenv +from shared.database.s3_manager import S3Manager +from shared.utils.constants import ( + AIRFLOW_DAGS_ENV_PATH, + AWS_REGION, + DATA_BUCKET, + ENVIRONMENT_NAME, + KAFKA_LISTENER, + LOGGING_CONFIG, + RECORDS_PREFIX, + SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION, + SCHEMA, + SERVICE_NAME, + SERVICE_VERSION, +) dictConfig(LOGGING_CONFIG) @@ -21,7 +37,71 @@ ) logger = structlog.get_logger() +load_dotenv(dotenv_path=AIRFLOW_DAGS_ENV_PATH) + +ABSTRACT = "abstract" +ABSTRACT_URL = "abstract_url" +DATE = "date" +IDENTIFIER = "identifier" +PRIMARY_CATEGORY = "primary_category" +TITLE = "title" + +TASK_NAME = "save_summaries_to_datalake" def run(**context: dict): - logger.info("Running save_summaries_to_datalake_task") + try: + logger.info("Running save_summaries_to_datalake_task") + config = get_config(context) + schema = context["ti"].xcom_pull(task_ids=KAFKA_LISTENER, key=SCHEMA) + logger.info("Schema", method=run.__name__, schema=schema) + s3_manager = S3Manager(os.getenv(DATA_BUCKET), logger) + json_data = json.loads(s3_manager.load(schema.get("s3_key"))) + if not json_data: + logger.error("No records found", method=run.__name__, records_key=schema.get("s3_key")) + return {"statusCode": 400, "body": "No records found"} + logger.info( + "Storing parsed arXiv summary records)} records", + method=run.__name__, + num_records=len(json_data["records"]), + ) + for record in json_data["records"]: + s3_manager.upload_to_s3( + f"{config.get(RECORDS_PREFIX)}/{record.get(IDENTIFIER)}/{ABSTRACT}.json", record.get(ABSTRACT) + ) + return {"statusCode": 200, "body": "Success"} + except Exception as e: + logger.error("Error running save_summaries_to_datalake_task", method=run.__name__, error=e) + raise e + + +def get_config(context: dict) -> dict: + """ + Gets the config from the environment variables. + + Returns: + dict: The config. + """ + try: + logger.info("Getting config", method=get_config.__name__, task_name=TASK_NAME) + config = { + AWS_REGION: os.getenv(AWS_REGION), + ENVIRONMENT_NAME: os.getenv(ENVIRONMENT_NAME), + DATA_BUCKET: os.getenv(DATA_BUCKET), + RECORDS_PREFIX: os.getenv(RECORDS_PREFIX), + SERVICE_NAME: TASK_NAME, + SERVICE_VERSION: os.getenv(SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION), + } + + if ( + not config.get(AWS_REGION) + or not config.get(ENVIRONMENT_NAME) + or not config.get(DATA_BUCKET) + or not config.get(RECORDS_PREFIX) + or not config.get(SERVICE_NAME) + or not config.get(SERVICE_VERSION) + ): + raise ValueError("Missing config values") + except Exception as e: + logger.error("Error getting config", method=get_config.__name__, task_name=TASK_NAME, error=e) + raise e diff --git a/orchestration/airflow/dags/shared/utils/constants.py b/orchestration/airflow/dags/shared/utils/constants.py index 4522a6ec..5919d7af 100644 --- a/orchestration/airflow/dags/shared/utils/constants.py +++ b/orchestration/airflow/dags/shared/utils/constants.py @@ -170,6 +170,7 @@ ORCHESTRATION_HOST_PRIVATE_IP = "ORCHESTRATION_HOST_PRIVATE_IP" RAW_DATA_KEYS = "RAW_DATA_KEYS" RECORDS_PREFIX = "RECORDS_PREFIX" +SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION = "SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION" SCHEMA = "SCHEMA" SERVICE_NAME = "SERVICE_NAME" SERVICE_VERSION = "SERVICE_VERSION"