Skip to content

Commit

Permalink
Added study distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Dec 17, 2024
1 parent 512074e commit 1fe6c8e
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies= [
"arrow >=1.2.3",
"awswrangler >=3.5, <4",
"boto3",
"cumulus-library >=4.1",
"Jinja2 >=3.1.4, <4",
"pandas >=2, <3",
"requests", # scripts only
Expand Down Expand Up @@ -46,7 +47,8 @@ test = [
"moto[s3,athena,sns] == 4.1.5",
"pytest",
"pytest-cov",
"pytest-mock"
"pytest-mock",
"responses"
]
dev = [
"pre-commit",
Expand Down
Empty file.
89 changes: 89 additions & 0 deletions src/dashboard/post_distribute/post_distribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import os
import pathlib

import boto3
import requests
from cumulus_library import cli
from shared import decorators, functions

# lambda performance tuning - moving these outside mean that they will not
# contribute to the lambda init window
sns_client = boto3.client("sns", os.environ.get("AWS_REGION"))
# in dockerized lambdas, `/tmp` is the only valid write location
BASE_DIR = "/tmp" # noqa: S108


def get_study_from_github(url):
if "smart-on-fhir" not in url:
raise ValueError(f"{url} is not an official Cumulus study.")
if not url.endswith("/"):
url = url + "/"
api_url = (
url.replace("https://github.com/", "https://api.github.com/repos/")
+ "git/trees/main?recursive=1"
)
raw_url_base = (
url.replace("https://github.com/", "https://raw.githubusercontent.com/") + "main/"
)
study_name = url.split("/")[-2]
res = requests.get(api_url, timeout=10)
if res.status_code != 200:
raise ValueError(f"{url} is not a valid git repository")
files = res.json()["tree"]
for file in files:
if file["type"] != "blob":
continue
write_path = pathlib.Path(f"{BASE_DIR}/studies") / study_name / file["path"]
write_path.parent.mkdir(parents=True, exist_ok=True)
with requests.get(raw_url_base + file["path"], timeout=10) as res:
with open(write_path, "w", encoding="UTF-8") as f:
f.write(res.text)


def prepare_study(body: dict):
write_path = pathlib.Path(f"{BASE_DIR}/prepared")
write_path.mkdir(parents=True, exist_ok=True)
cli.main(
cli_args=[
"build",
"-t",
body["study_name"],
"-s",
f"{BASE_DIR}/studies",
"--prepare",
f"{BASE_DIR}/prepared",
],
)
return pathlib.Path(f"/{BASE_DIR}/prepared") / f"{body['study_name']}.zip"


def process_body(body: dict):
"""Selects the appropriate handler for processing study requests"""
for key in body.keys():
match key:
case "study_name":
pass
case "github":
get_study_from_github(body[key])
case _:
raise ValueError(f"Invalid key {key} received.")


@decorators.generic_error_handler(msg="Error generating distributed request")
def distribute_handler(event: dict, context):
"""Creates a distribution packages and queues for delivery"""
del context
body = json.loads(event["body"])
process_body(body)
payload = prepare_study(body)
topic_sns_arn = os.environ.get("TOPIC_STUDY_PAYLOAD_ARN")
with open(payload, "rb") as f:
sns_client.publish(
TopicArn=topic_sns_arn,
Message=str(f.read()),
MessageGroupId=body["study_name"],
Subject=body["study_name"],
)
res = functions.http_response(200, f'Study {body["study_name"]} queued.')
return res
1 change: 1 addition & 0 deletions src/dashboard/post_distribute/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cumulus-library >= 4.1.3
1 change: 1 addition & 0 deletions src/dashboard/post_distribute/shared
14 changes: 14 additions & 0 deletions src/post_distribute.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM public.ecr.aws/lambda/python:3.11

WORKDIR ${LAMBDA_TASK_ROOT}
COPY dashboard/post_distribute/requirements.txt .
RUN pip install -r requirements.txt
COPY dashboard/post_distribute/post_distribute.py .
COPY shared shared

# Force setup of some initial matplotlib configuration artifacts
RUN mkdir /tmp/matlplotlib
ENV MPLCONFIGDIR=/tmp/matlplotlib
RUN cumulus-library version

CMD ["post_distribute.distribute_handler"]
76 changes: 76 additions & 0 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Parameters:
- Text
- JSON
Default: JSON
RemoteAccounts:
Type: CommaDelimitedList


Resources:
Expand Down Expand Up @@ -742,6 +744,56 @@ Resources:
LogGroupName: !Sub "/aws/lambda/${DashboardGetStudyPeriodsFunction}"
RetentionInDays: !Ref RetentionTime

DashboardPostDistributeStudyFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub 'CumulusAggDashboardDistributeStudy-${DeployStage}'
PackageType: Image
ImageConfig:
Command: ["post_distribute.distribute_handler"]
# CodeUri: ./src/dashboard/post_distribute
# Handler: post_distribute.distribute_handler
# Runtime: "python3.11"
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggDashboardDistributeStudy-${DeployStage}"
MemorySize: 512
Timeout: 100
Environment:
Variables:
BUCKET_NAME: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}'
TOPIC_STUDY_PAYLOAD_ARN: !Ref SNSStudyPayload
Events:
PostDistributeStudyAPI:
Type: Api
Properties:
RestApiId: !Ref DashboardApiGateway
Path: /distribute/
Method: Post
Policies:
- S3ReadPolicy:
BucketName: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}'
- Statement:
- Sid: KMSDecryptPolicy
Effect: Allow
Action:
- kms:Decrypt
Resource:
- !ImportValue cumulus-kms-KMSKeyArn
- SNSPublishMessagePolicy:
TopicName: !GetAtt SNSStudyPayload.TopicName
Metadata:
Dockerfile: post_distribute.Dockerfile
DockerContext: ./src/
DockerTag: v1

DashboardPostDistributeStudyLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${DashboardPostDistributeStudyFunction}"
RetentionInDays: !Ref RetentionTime

### Lambda permissions

ProcessUploadFunctionPermission:
Expand Down Expand Up @@ -787,6 +839,30 @@ Resources:
- Key: Name
Value: !Sub 'SNSTopicCacheAPI-${DeployStage}'

SNSStudyPayload:
Type: AWS::SNS::Topic
Properties:
ContentBasedDeduplication: True
FifoTopic: True
TopicName: !Sub 'SNSStudyPayload-${DeployStage}.fifo'
Tags:
- Key: Name
Value: !Sub 'SNSStudyPayload-${DeployStage}.fifo'

SNSStudyPayloadPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
AWS: !Ref RemoteAccounts
Action:
- sns:Subscribe
Resource: !Ref SNSStudyPayload
Topics:
- !Ref SNSStudyPayload

### S3 Buckets

Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def mock_notification():
sns_client.create_topic(Name="test-flat")
sns_client.create_topic(Name="test-meta")
sns_client.create_topic(Name="test-cache")
sns_client.create_topic(Name="test-payload")
yield
sns.stop()

Expand Down
95 changes: 95 additions & 0 deletions tests/dashboard/test_post_distribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
import os
from unittest import mock

import pytest
import responses
from freezegun import freeze_time

from src.dashboard.post_distribute import post_distribute


@mock.patch.dict(
os.environ, {"TOPIC_STUDY_PAYLOAD_ARN": "test-payload", "AWS_REGION": "us-east-1"}, clear=True
)
@pytest.mark.parametrize(
"name,url,expected_status",
[
(
"test_study",
"https://github.com/smart-on-fhir/test_study/",
200,
),
("invalid_study", "https://github.com/smart-on-fhir/invalid_study", 500),
("non_cumulus_repo", "https://github.com/user/non_cumulus_repo", 500),
],
)
@responses.activate
@freeze_time("2020-01-01")
def test_process_github(mock_notification, tmp_path, name, url, expected_status, monkeypatch):
responses.add(
responses.GET,
"https://api.github.com/repos/smart-on-fhir/test_study/git/trees/main?recursive=1",
json={
"tree": [
{
"path": ".github",
"type": "tree",
},
{
"path": "test.sql",
"type": "blob",
},
{
"path": "manifest.toml",
"type": "blob",
},
]
},
)
responses.add(
responses.GET,
"https://api.github.com/repos/smart-on-fhir/invalid_study/git/trees/main?recursive=1",
status=404,
)
responses.add(
responses.GET,
"https://raw.githubusercontent.com/smart-on-fhir/test_study/main/test.sql",
body="""CREATE TABLE test_study__table AS
SELECT * from core__patient""",
)
responses.add(
responses.GET,
"https://raw.githubusercontent.com/smart-on-fhir/test_study/main/manifest.toml",
body="""study_prefix="test_study"
[file_config]
file_names=[
"test.sql"
]""",
)
monkeypatch.setattr(post_distribute, "BASE_DIR", tmp_path)
mock_sns = mock.MagicMock()
monkeypatch.setattr(post_distribute, "sns_client", mock_sns)
res = post_distribute.distribute_handler(
{"body": json.dumps({"github": url, "study_name": name})}, {}
)
assert res["statusCode"] == expected_status
if expected_status == 200:
assert mock_sns.publish.is_called()
expected_message = {
"TopicArn": "test-payload",
"MessageGroupId": "test_study",
"Subject": "test_study",
}
for k, v in mock_sns.publish.call_args[1].items():
if k == "Message":
# zipping these files is not 100% stochastic due to the tmpdir name, so
# we'll just check for the expected file in the zip binary string
assert "0000.test.00.create_table_test_study__table" in v
else:
assert expected_message[k] == v


def test_invalid_key():
res = post_distribute.distribute_handler({"body": json.dumps({"bad_para,": "foo"})}, {})
assert res["statusCode"] == 500

0 comments on commit 1fe6c8e

Please sign in to comment.