Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added study distribution #144

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
default_install_hook_types: [pre-commit, pre-push]
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.7
rev: v0.8.6
hooks:
- name: Ruff linting
id: ruff
Expand Down
20 changes: 18 additions & 2 deletions MAINTAINER.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,24 @@ The SAM framework extends native cloudformation, usually with a lighter syntax,

### Sync --watch mode gotchas

- If you modify S3 bucket permissions while in watch mode, changes to the bucket may generate a permission denied message. You'll need to delete the bucket and bring down the deployment before restarting to apply your changes.
- If you modify S3 bucket permissions while in watch mode, changes to the bucket may generate a permission denied message. You'll need to delete the bucket and bring down the deployment before restarting to apply your changes, or manually update the S3 bucket permissions.

- Similarly, if you end up in a ROLLBACK_FAILED state, usually the only recourse is to bring the deployment down and resync, do a regular deployment deployment, or manually initiate a rollback from the AWS console.

Using deploy is a little safer than sync in this regard, though it does take longer for each deployment. Use your best judgement.
Using deploy is a little safer than sync in this regard, though it does take longer for each deployment. Use your best judgement.

## Testing ECR containers locally

If you want to test a lambda ECR container, do the following from a console at the root of this repo:
- `docker buildx build -t <your image name> -f src/<dockerfile> src/`
- `docker run \
-e AWS_REGION='us-east-1' \
-e AWS_ACCESS_KEY_ID='<your key id' \
-e AWS_SECRET_ACCESS_KEY='<your access key>' \
-e AWS_SESSION_TOKEN='<your session token>' \
-e <any other env vars you need> \
-p 9000:8080 \
<your image name>`

You can then post messages to your endpoints on port 9000 on localhost, and it should interact with the rest of your
AWS account (if required).
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies= [
"arrow >=1.2.3",
"awswrangler >=3.5, <4",
"boto3",
"cumulus-library >=4.1",
"Jinja2 >=3.1.4, <4",
"pandas >=2, <3",
"requests", # scripts only
Expand Down Expand Up @@ -46,11 +47,13 @@ test = [
"moto[s3,athena,sns] == 4.1.5",
"pytest",
"pytest-cov",
"pytest-mock"
"pytest-mock",
"pytest-subprocess",
"responses",
]
dev = [
"pre-commit",
"ruff < 0.6",
"ruff < 0.9",
"sqlfluff >= 3.2.5"
]

Expand Down
1 change: 1 addition & 0 deletions src/dashboard/get_chart_data/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import boto3
import jinja2
import pandas

from shared import decorators, enums, errors, functions

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
Expand Down
1 change: 1 addition & 0 deletions src/dashboard/get_csv/get_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import boto3
import botocore

from shared import decorators, enums, functions


Expand Down
1 change: 1 addition & 0 deletions src/dashboard/get_metadata/get_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os

import boto3

from shared.decorators import generic_error_handler
from shared.functions import http_response, read_metadata

Expand Down
1 change: 1 addition & 0 deletions src/dashboard/get_study_periods/get_study_periods.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os

import boto3

from shared import decorators, enums, functions


Expand Down
Empty file.
51 changes: 51 additions & 0 deletions src/dashboard/post_distribute/post_distribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Handler for validating the payload contents and submitting to queue"""

import json
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
import os

import boto3
import requests

from shared import decorators, functions

sns_client = boto3.client("sns", os.environ.get("AWS_REGION"))
valid_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~:/?#[]@!$&'()*+,"


def validate_github_url(config):
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
if not config["url"].startswith("https://github.com/smart-on-fhir/") or any(
c not in valid_chars for c in config["url"]
):
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(f"{config['url']} is not an official Cumulus study.")
res = requests.get(config["url"], timeout=10)
if res.status_code != 200:
raise ValueError(f"{config['url']} is not a valid git repository")
if "tag" in config and config["tag"] is not None:
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
res = requests.get(config["url"] + f'/tree/{config["tag"]}', timeout=10)
if res.status_code != 200:
raise ValueError(f"{config['tag']} is not a valid tag")


def validate_body(body: dict):
"""Selects the appropriate handler for processing study requests"""
for key in body.keys():
match key:
case "study_name":
pass
case "github":
validate_github_url(body[key])
case _:
raise ValueError(f"Invalid key {body[key]} received.")


@decorators.generic_error_handler(msg="Error generating distributed request")
def distribute_handler(event: dict, context):
"""Creates a distribution packages and queues for delivery"""
del context
body = json.loads(event["body"])
validate_body(body)
topic_sns_arn = os.environ.get("TOPIC_QUEUE_API_ARN")
sns_client.publish(TopicArn=topic_sns_arn, Message=event["body"], Subject=body["study_name"])
# TODO: should we create an ID/API for the dashboard to track request status?
res = functions.http_response(200, f'Preparing to queue {body["study_name"]}.')
return res
1 change: 1 addition & 0 deletions src/dashboard/post_distribute/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requests
1 change: 1 addition & 0 deletions src/dashboard/post_distribute/shared
Empty file.
102 changes: 102 additions & 0 deletions src/dashboard/queue_distribute/queue_distribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Handler for submitting studies to an SNS queue for distribution to remote sites.

In the long term, this module (or a submodule imported by this module) will be responsible
for parsing the output of a dashboard guided workflow/query builder generated payload,
and converting it to a cumulus library compatible study.

It also enables distribution of a smart-on-fhir owned study directly from github,
which is the mode it operates in today.

Due to needing cumulus library, this handler is different from all the other lambda
handlers, in that it is packaged in a docker image and loaded from an elastic container
registry. This has a few architectural implications not present in other lambdas - notably,
the home directory is static and we have to write any data to disk inside of /tmp as the
only writable location.

"""

import json
import os
import pathlib
import subprocess

import boto3
from cumulus_library import cli

from shared import decorators, functions

# lambda performance tuning - moving these two variables to be global in the module
# means that their initialization happens during build, rather than invocation.
# This helps reduce the spinup time, especially for the boto client, and since
# there are some hefty bits in here already with the docker spinup, shaving a second
# or two off here is helpful to keep us under the ten second timeout.
#
# https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html#function-code

sns_client = boto3.client("sns", os.environ.get("AWS_REGION"))
BASE_DIR = "/tmp" # noqa: S108


def get_study_from_github(config):
try:
subprocess.run(["/usr/bin/git", "clone", config["url"], f"{BASE_DIR}/studies"], check=True) # noqa: S603
if "tag" in config and config["tag"] is not None:
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
subprocess.run( # noqa: S603
["/usr/bin/git", "checkout", "tag"],
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
cwd=f"{BASE_DIR}/studies/{config['url'].split('/')[-2]}",
)

except subprocess.CalledProcessError:
# TODO: retry/backoff logic? or do we just have a user queue again?
raise ValueError(f"{config['url']} is unavailable")


def prepare_study(body: dict):
write_path = pathlib.Path(f"{BASE_DIR}/prepared")
write_path.mkdir(parents=True, exist_ok=True)
cli.main(
cli_args=[
"build",
"-t",
body["study_name"],
"-s",
f"{BASE_DIR}/studies",
"--prepare",
f"{write_path}",
],
)
return pathlib.Path(f"{write_path}") / f"{body['study_name']}.zip"


def process_body(body: dict):
"""Selects the appropriate handler for processing study requests"""
for key in body.keys():
match key:
case "study_name":
pass
case "github":
get_study_from_github(body[key])
case _:
raise ValueError(f"Invalid key {key} received.")


@decorators.generic_error_handler(msg="Error generating distributed request")
def queue_handler(event: dict, context):
"""Creates a distribution packages and queues for delivery"""
del context
body = event["Records"][0]["Sns"]["Message"]
body = json.loads(body)
process_body(body)
payload = prepare_study(body)
topic_sns_arn = os.environ.get("TOPIC_STUDY_PAYLOAD_ARN")
with open(payload, "rb") as f:
file = f.read()
sns_client.publish(
TopicArn=topic_sns_arn,
Message=json.dumps({"study": body["study_name"]}),
MessageGroupId=body["study_name"],
Subject=body["study_name"],
MessageAttributes={"study": {"DataType": "Binary", "BinaryValue": file}},
)
res = functions.http_response(200, f'Study {body["study_name"]} queued.')
return res
1 change: 1 addition & 0 deletions src/dashboard/queue_distribute/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cumulus-library >= 4.1.3
1 change: 1 addition & 0 deletions src/dashboard/queue_distribute/shared
17 changes: 17 additions & 0 deletions src/queue_distribute.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM public.ecr.aws/lambda/python:3.11

RUN yum update git -y
RUN yum install git -y

WORKDIR ${LAMBDA_TASK_ROOT}
COPY dashboard/queue_distribute/requirements.txt .
RUN pip install -r requirements.txt
COPY dashboard/queue_distribute/queue_distribute.py .
COPY shared shared

# Force setup of some initial matplotlib configuration artifacts
RUN mkdir /tmp/matlplotlib
ENV MPLCONFIGDIR=/tmp/matlplotlib
RUN cumulus-library version

CMD ["queue_distribute.queue_handler"]
1 change: 1 addition & 0 deletions src/site_upload/cache_api/cache_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import awswrangler
import boto3

from shared import decorators, enums, functions


Expand Down
1 change: 1 addition & 0 deletions src/site_upload/fetch_upload_url/fetch_upload_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import boto3
import botocore.exceptions

from shared.decorators import generic_error_handler
from shared.enums import BucketPath
from shared.functions import get_s3_json_as_dict, http_response
Expand Down
1 change: 1 addition & 0 deletions src/site_upload/powerset_merge/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import awswrangler
import pandas
from pandas.core.indexes.range import RangeIndex

from shared import awswrangler_functions, decorators, enums, functions, pandas_functions, s3_manager

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
Expand Down
1 change: 1 addition & 0 deletions src/site_upload/process_flat/process_flat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

import awswrangler

from shared import awswrangler_functions, decorators, enums, functions, pandas_functions, s3_manager

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
Expand Down
1 change: 1 addition & 0 deletions src/site_upload/process_upload/process_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os

import boto3

from shared import decorators, enums, functions

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
Expand Down
1 change: 1 addition & 0 deletions src/site_upload/study_period/study_period.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import awswrangler
import boto3

from shared import awswrangler_functions, decorators, enums, functions


Expand Down
Loading
Loading