Skip to content

Commit

Permalink
Add save summaries to data lake
Browse files Browse the repository at this point in the history
Update changelog and version for last merge
  • Loading branch information
Brad-Edwards committed May 17, 2024
1 parent 0de912c commit 57d8d2a
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/infra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,15 @@ jobs:
echo "arxiv_sets=$(awk -F'=' '/^arxiv_sets/ {gsub(/ /, "", $2); print $2}' infra/core/${{ env.ENV_NAME }}.tfvars)" >> $GITHUB_OUTPUT
echo "aws_region=$(grep '^aws_region' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "backend_dynamodb_table=$(grep '^backend_dynamodb_table' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "create_intermediate_json_task_version=$(grep '^create_intermediate_json_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "data_ingestion_key_prefix=$(grep '^data_ingestion_key_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "etl_key_prefix=$(grep '^etl_key_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "fetch_from_arxiv_task_version=$(grep '^fetch_from_arxiv_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "infra_config_bucket=$(grep '^infra_config_bucket =' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "most_recent_research_records_version=$(grep '^most_recent_research_records_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "neo4j_connection_retries=$(grep '^neo4j_connection_retries' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "records_prefix=$(grep '^records_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "save_summaries_to_datalake_task_version=$(grep '^save_summaries_to_datalake_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "terraform_outputs_prefix=$(grep '^terraform_outputs_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
- name: Set Region
Expand Down Expand Up @@ -353,6 +356,7 @@ jobs:
echo 'ARXIV_SETS=${{ steps.vars.outputs.arxiv_sets }}' >> .env
echo "AWS_GLUE_REGISTRY_NAME=${{ steps.terraform_vars.outputs.aws_glue_registry_name }}" >> .env
echo "AWS_REGION=${{ steps.vars.outputs.aws_region }}" >> .env
echo "CREATE_INTERMEDIATE_JSON_TASK_VERSION=${{ steps.vars.outputs.create_intermediate_json_task_version }}" >> .env
echo "DATA_BUCKET=${{ steps.terraform_vars.outputs.data_bucket }}" >> .env
echo "DATA_INGESTION_KEY_PREFIX=${{ steps.vars.outputs.data_ingestion_key_prefix }}" >> .env
echo "ETL_KEY_PREFIX=${{ steps.vars.outputs.etl_key_prefix }}" >> .env
Expand All @@ -362,6 +366,8 @@ jobs:
echo "NEO4J_CONNECTION_RETRIES=${{ steps.vars.outputs.neo4j_connection_retries }}" >> .env
echo "NEO4J_URI=neo4j://${{ steps.terraform_vars.outputs.neo4j_instance_private_ip }}:7687" >> .env
echo "ORCHESTRATION_HOST_PRIVATE_IP=${{ steps.terraform_vars.outputs.orchestration_host_private_ip }}" >> .env
echo "RECORDS_PREFIX=${{ steps.vars.outputs.records_prefix }}" >> .env
echo "SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION=${{ steps.vars.outputs.save_summaries_to_datalake_task_version }}" >> .env
cat .env
echo "Copying file to s3..."
aws s3 cp .env s3://${{ steps.vars.outputs.infra_config_bucket }}/orchestration/${{ env.ENV_NAME }}/airflow/dags/.env
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.3-alpha] - 2024-05-16

### Changed

- Use single host in dev for orchestration and neo4j

## [0.3.2-alpha] - 2024-05-04

### Added
Expand Down
4 changes: 3 additions & 1 deletion infra/core/dev.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# **********************************************************

app_name = "atomiklabs"
app_version = "0.3.2-alpha"
app_version = "0.3.3-alpha"
availability_zones = ["us-east-1a", "us-east-1b", "us-east-1c"]
aws_region = "us-east-1"
backend_dynamodb_table = "terraform-state-locks"
Expand Down Expand Up @@ -60,6 +60,8 @@ arxiv_api_max_retries = 5
arxiv_base_url = "http://export.arxiv.org/oai2"
arxiv_ingestion_day_span = 5
arxiv_sets = ["cs"]
create_intermediate_json_task_version = "0.1.0"
default_lambda_runtime = "python3.10"
fetch_from_arxiv_task_version = "0.1.0"
most_recent_research_records_version = "0.0.2"
save_summaries_to_datalake_task_version = "0.0.1"
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import json
import os
from logging.config import dictConfig

import structlog
from shared.utils.constants import LOGGING_CONFIG
from dotenv import load_dotenv
from shared.database.s3_manager import S3Manager
from shared.utils.constants import (
AIRFLOW_DAGS_ENV_PATH,
AWS_REGION,
DATA_BUCKET,
ENVIRONMENT_NAME,
KAFKA_LISTENER,
LOGGING_CONFIG,
RECORDS_PREFIX,
SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION,
SCHEMA,
SERVICE_NAME,
SERVICE_VERSION,
)

dictConfig(LOGGING_CONFIG)

Expand All @@ -21,7 +37,71 @@
)

logger = structlog.get_logger()
load_dotenv(dotenv_path=AIRFLOW_DAGS_ENV_PATH)

ABSTRACT = "abstract"
ABSTRACT_URL = "abstract_url"
DATE = "date"
IDENTIFIER = "identifier"
PRIMARY_CATEGORY = "primary_category"
TITLE = "title"

TASK_NAME = "save_summaries_to_datalake"


def run(**context: dict):
logger.info("Running save_summaries_to_datalake_task")
try:
logger.info("Running save_summaries_to_datalake_task")
config = get_config(context)
schema = context["ti"].xcom_pull(task_ids=KAFKA_LISTENER, key=SCHEMA)
logger.info("Schema", method=run.__name__, schema=schema)
s3_manager = S3Manager(os.getenv(DATA_BUCKET), logger)
json_data = json.loads(s3_manager.load(schema.get("s3_key")))
if not json_data:
logger.error("No records found", method=run.__name__, records_key=schema.get("s3_key"))
return {"statusCode": 400, "body": "No records found"}
logger.info(
"Storing parsed arXiv summary records)} records",
method=run.__name__,
num_records=len(json_data["records"]),
)
for record in json_data["records"]:
s3_manager.upload_to_s3(
f"{config.get(RECORDS_PREFIX)}/{record.get(IDENTIFIER)}/{ABSTRACT}.json", record.get(ABSTRACT)
)
return {"statusCode": 200, "body": "Success"}
except Exception as e:
logger.error("Error running save_summaries_to_datalake_task", method=run.__name__, error=e)
raise e


def get_config(context: dict) -> dict:
"""
Gets the config from the environment variables.
Returns:
dict: The config.
"""
try:
logger.info("Getting config", method=get_config.__name__, task_name=TASK_NAME)
config = {
AWS_REGION: os.getenv(AWS_REGION),
ENVIRONMENT_NAME: os.getenv(ENVIRONMENT_NAME),
DATA_BUCKET: os.getenv(DATA_BUCKET),
RECORDS_PREFIX: os.getenv(RECORDS_PREFIX),
SERVICE_NAME: TASK_NAME,
SERVICE_VERSION: os.getenv(SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION),
}

if (
not config.get(AWS_REGION)
or not config.get(ENVIRONMENT_NAME)
or not config.get(DATA_BUCKET)
or not config.get(RECORDS_PREFIX)
or not config.get(SERVICE_NAME)
or not config.get(SERVICE_VERSION)
):
raise ValueError("Missing config values")
except Exception as e:
logger.error("Error getting config", method=get_config.__name__, task_name=TASK_NAME, error=e)
raise e
1 change: 1 addition & 0 deletions orchestration/airflow/dags/shared/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
ORCHESTRATION_HOST_PRIVATE_IP = "ORCHESTRATION_HOST_PRIVATE_IP"
RAW_DATA_KEYS = "RAW_DATA_KEYS"
RECORDS_PREFIX = "RECORDS_PREFIX"
SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION = "SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION"
SCHEMA = "SCHEMA"
SERVICE_NAME = "SERVICE_NAME"
SERVICE_VERSION = "SERVICE_VERSION"
Expand Down

0 comments on commit 57d8d2a

Please sign in to comment.