From 92c0439c7af39b80bbaade3bf8d07f66ee9e01e4 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Tue, 17 Dec 2024 10:36:03 -0500 Subject: [PATCH 1/7] Added study distribution --- pyproject.toml | 4 +- src/dashboard/post_distribute/__init__.py | 0 .../post_distribute/post_distribute.py | 89 +++++++++++++++++ .../post_distribute/requirements.txt | 1 + src/dashboard/post_distribute/shared | 1 + src/post_distribute.Dockerfile | 14 +++ template.yaml | 73 ++++++++++++++ tests/conftest.py | 1 + tests/dashboard/test_post_distribute.py | 95 +++++++++++++++++++ 9 files changed, 277 insertions(+), 1 deletion(-) create mode 100644 src/dashboard/post_distribute/__init__.py create mode 100644 src/dashboard/post_distribute/post_distribute.py create mode 100644 src/dashboard/post_distribute/requirements.txt create mode 120000 src/dashboard/post_distribute/shared create mode 100644 src/post_distribute.Dockerfile create mode 100644 tests/dashboard/test_post_distribute.py diff --git a/pyproject.toml b/pyproject.toml index 3d6d6a0..b460265 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies= [ "arrow >=1.2.3", "awswrangler >=3.5, <4", "boto3", + "cumulus-library >=4.1", "Jinja2 >=3.1.4, <4", "pandas >=2, <3", "requests", # scripts only @@ -46,7 +47,8 @@ test = [ "moto[s3,athena,sns] == 4.1.5", "pytest", "pytest-cov", - "pytest-mock" + "pytest-mock", + "responses" ] dev = [ "pre-commit", diff --git a/src/dashboard/post_distribute/__init__.py b/src/dashboard/post_distribute/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dashboard/post_distribute/post_distribute.py b/src/dashboard/post_distribute/post_distribute.py new file mode 100644 index 0000000..cbd9732 --- /dev/null +++ b/src/dashboard/post_distribute/post_distribute.py @@ -0,0 +1,89 @@ +import json +import os +import pathlib + +import boto3 +import requests +from cumulus_library import cli +from shared import decorators, functions + +# lambda performance tuning - moving these outside mean that they will not +# contribute to the lambda init window +sns_client = boto3.client("sns", os.environ.get("AWS_REGION")) +# in dockerized lambdas, `/tmp` is the only valid write location +BASE_DIR = "/tmp" # noqa: S108 + + +def get_study_from_github(url): + if "smart-on-fhir" not in url: + raise ValueError(f"{url} is not an official Cumulus study.") + if not url.endswith("/"): + url = url + "/" + api_url = ( + url.replace("https://github.com/", "https://api.github.com/repos/") + + "git/trees/main?recursive=1" + ) + raw_url_base = ( + url.replace("https://github.com/", "https://raw.githubusercontent.com/") + "main/" + ) + study_name = url.split("/")[-2] + res = requests.get(api_url, timeout=10) + if res.status_code != 200: + raise ValueError(f"{url} is not a valid git repository") + files = res.json()["tree"] + for file in files: + if file["type"] != "blob": + continue + write_path = pathlib.Path(f"{BASE_DIR}/studies") / study_name / file["path"] + write_path.parent.mkdir(parents=True, exist_ok=True) + with requests.get(raw_url_base + file["path"], timeout=10) as res: + with open(write_path, "w", encoding="UTF-8") as f: + f.write(res.text) + + +def prepare_study(body: dict): + write_path = pathlib.Path(f"{BASE_DIR}/prepared") + write_path.mkdir(parents=True, exist_ok=True) + cli.main( + cli_args=[ + "build", + "-t", + body["study_name"], + "-s", + f"{BASE_DIR}/studies", + "--prepare", + f"{BASE_DIR}/prepared", + ], + ) + return pathlib.Path(f"/{BASE_DIR}/prepared") / f"{body['study_name']}.zip" + + +def process_body(body: dict): + """Selects the appropriate handler for processing study requests""" + for key in body.keys(): + match key: + case "study_name": + pass + case "github": + get_study_from_github(body[key]) + case _: + raise ValueError(f"Invalid key {key} received.") + + +@decorators.generic_error_handler(msg="Error generating distributed request") +def distribute_handler(event: dict, context): + """Creates a distribution packages and queues for delivery""" + del context + body = json.loads(event["body"]) + process_body(body) + payload = prepare_study(body) + topic_sns_arn = os.environ.get("TOPIC_STUDY_PAYLOAD_ARN") + with open(payload, "rb") as f: + sns_client.publish( + TopicArn=topic_sns_arn, + Message=str(f.read()), + MessageGroupId=body["study_name"], + Subject=body["study_name"], + ) + res = functions.http_response(200, f'Study {body["study_name"]} queued.') + return res diff --git a/src/dashboard/post_distribute/requirements.txt b/src/dashboard/post_distribute/requirements.txt new file mode 100644 index 0000000..23bb0cc --- /dev/null +++ b/src/dashboard/post_distribute/requirements.txt @@ -0,0 +1 @@ +cumulus-library >= 4.1.3 \ No newline at end of file diff --git a/src/dashboard/post_distribute/shared b/src/dashboard/post_distribute/shared new file mode 120000 index 0000000..0ab0cb2 --- /dev/null +++ b/src/dashboard/post_distribute/shared @@ -0,0 +1 @@ +../../shared \ No newline at end of file diff --git a/src/post_distribute.Dockerfile b/src/post_distribute.Dockerfile new file mode 100644 index 0000000..5949530 --- /dev/null +++ b/src/post_distribute.Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.11 + +WORKDIR ${LAMBDA_TASK_ROOT} +COPY dashboard/post_distribute/requirements.txt . +RUN pip install -r requirements.txt +COPY dashboard/post_distribute/post_distribute.py . +COPY shared shared + +# Force setup of some initial matplotlib configuration artifacts +RUN mkdir /tmp/matlplotlib +ENV MPLCONFIGDIR=/tmp/matlplotlib +RUN cumulus-library version + +CMD ["post_distribute.distribute_handler"] \ No newline at end of file diff --git a/template.yaml b/template.yaml index 49e711a..16aaeae 100644 --- a/template.yaml +++ b/template.yaml @@ -64,6 +64,8 @@ Parameters: - Text - JSON Default: JSON + RemoteAccounts: + Type: CommaDelimitedList Resources: @@ -742,6 +744,53 @@ Resources: LogGroupName: !Sub "/aws/lambda/${DashboardGetStudyPeriodsFunction}" RetentionInDays: !Ref RetentionTime + DashboardPostDistributeStudyFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub 'CumulusAggDashboardDistributeStudy-${DeployStage}' + PackageType: Image + ImageConfig: + Command: ["post_distribute.distribute_handler"] + LoggingConfig: + ApplicationLogLevel: !Ref LogLevel + LogFormat: !Ref LogFormat + LogGroup: !Sub "/aws/lambda/CumulusAggDashboardDistributeStudy-${DeployStage}" + MemorySize: 512 + Timeout: 100 + Environment: + Variables: + BUCKET_NAME: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}' + TOPIC_STUDY_PAYLOAD_ARN: !Ref SNSStudyPayload + Events: + PostDistributeStudyAPI: + Type: Api + Properties: + RestApiId: !Ref DashboardApiGateway + Path: /distribute/ + Method: Post + Policies: + - S3ReadPolicy: + BucketName: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}' + - Statement: + - Sid: KMSDecryptPolicy + Effect: Allow + Action: + - kms:Decrypt + Resource: + - !ImportValue cumulus-kms-KMSKeyArn + - SNSPublishMessagePolicy: + TopicName: !GetAtt SNSStudyPayload.TopicName + Metadata: + Dockerfile: post_distribute.Dockerfile + DockerContext: ./src/ + DockerTag: v1 + + DashboardPostDistributeStudyLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${DashboardPostDistributeStudyFunction}" + RetentionInDays: !Ref RetentionTime + ### Lambda permissions ProcessUploadFunctionPermission: @@ -787,6 +836,30 @@ Resources: - Key: Name Value: !Sub 'SNSTopicCacheAPI-${DeployStage}' + SNSStudyPayload: + Type: AWS::SNS::Topic + Properties: + ContentBasedDeduplication: True + FifoTopic: True + TopicName: !Sub 'SNSStudyPayload-${DeployStage}.fifo' + Tags: + - Key: Name + Value: !Sub 'SNSStudyPayload-${DeployStage}.fifo' + + SNSStudyPayloadPolicy: + Type: AWS::SNS::TopicPolicy + Properties: + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + AWS: !Ref RemoteAccounts + Action: + - sns:Subscribe + Resource: !Ref SNSStudyPayload + Topics: + - !Ref SNSStudyPayload ### S3 Buckets diff --git a/tests/conftest.py b/tests/conftest.py index 711eda2..caddaef 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -149,6 +149,7 @@ def mock_notification(): sns_client.create_topic(Name="test-flat") sns_client.create_topic(Name="test-meta") sns_client.create_topic(Name="test-cache") + sns_client.create_topic(Name="test-payload") yield sns.stop() diff --git a/tests/dashboard/test_post_distribute.py b/tests/dashboard/test_post_distribute.py new file mode 100644 index 0000000..c538d8f --- /dev/null +++ b/tests/dashboard/test_post_distribute.py @@ -0,0 +1,95 @@ +import json +import os +from unittest import mock + +import pytest +import responses +from freezegun import freeze_time + +from src.dashboard.post_distribute import post_distribute + + +@mock.patch.dict( + os.environ, {"TOPIC_STUDY_PAYLOAD_ARN": "test-payload", "AWS_REGION": "us-east-1"}, clear=True +) +@pytest.mark.parametrize( + "name,url,expected_status", + [ + ( + "test_study", + "https://github.com/smart-on-fhir/test_study/", + 200, + ), + ("invalid_study", "https://github.com/smart-on-fhir/invalid_study", 500), + ("non_cumulus_repo", "https://github.com/user/non_cumulus_repo", 500), + ], +) +@responses.activate +@freeze_time("2020-01-01") +def test_process_github(mock_notification, tmp_path, name, url, expected_status, monkeypatch): + responses.add( + responses.GET, + "https://api.github.com/repos/smart-on-fhir/test_study/git/trees/main?recursive=1", + json={ + "tree": [ + { + "path": ".github", + "type": "tree", + }, + { + "path": "test.sql", + "type": "blob", + }, + { + "path": "manifest.toml", + "type": "blob", + }, + ] + }, + ) + responses.add( + responses.GET, + "https://api.github.com/repos/smart-on-fhir/invalid_study/git/trees/main?recursive=1", + status=404, + ) + responses.add( + responses.GET, + "https://raw.githubusercontent.com/smart-on-fhir/test_study/main/test.sql", + body="""CREATE TABLE test_study__table AS +SELECT * from core__patient""", + ) + responses.add( + responses.GET, + "https://raw.githubusercontent.com/smart-on-fhir/test_study/main/manifest.toml", + body="""study_prefix="test_study" +[file_config] +file_names=[ + "test.sql" +]""", + ) + monkeypatch.setattr(post_distribute, "BASE_DIR", tmp_path) + mock_sns = mock.MagicMock() + monkeypatch.setattr(post_distribute, "sns_client", mock_sns) + res = post_distribute.distribute_handler( + {"body": json.dumps({"github": url, "study_name": name})}, {} + ) + assert res["statusCode"] == expected_status + if expected_status == 200: + assert mock_sns.publish.is_called() + expected_message = { + "TopicArn": "test-payload", + "MessageGroupId": "test_study", + "Subject": "test_study", + } + for k, v in mock_sns.publish.call_args[1].items(): + if k == "Message": + # zipping these files is not 100% stochastic due to the tmpdir name, so + # we'll just check for the expected file in the zip binary string + assert "0000.test.00.create_table_test_study__table" in v + else: + assert expected_message[k] == v + + +def test_invalid_key(): + res = post_distribute.distribute_handler({"body": json.dumps({"bad_para,": "foo"})}, {}) + assert res["statusCode"] == 500 From 1e606a9baee875345456f648077cc905a5be852a Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 6 Jan 2025 12:59:03 -0500 Subject: [PATCH 2/7] Split validation from queue, PR feedback --- MAINTAINER.md | 20 +++- pyproject.toml | 3 +- .../post_distribute/post_distribute.py | 94 +++++----------- .../post_distribute/requirements.txt | 2 +- src/dashboard/queue_distribute/__init__.py | 0 .../queue_distribute/queue_distribute.py | 103 ++++++++++++++++++ .../queue_distribute/requirements.txt | 1 + src/dashboard/queue_distribute/shared | 1 + ...Dockerfile => queue_distribute.Dockerfile} | 9 +- template.yaml | 59 ++++++++-- tests/dashboard/test_post_distribute.py | 79 ++++++-------- tests/dashboard/test_queue_distribute.py | 99 +++++++++++++++++ .../mock_payloads/test_study/foo.sql | 1 + .../mock_payloads/test_study/foobar.sql | 1 + .../mock_payloads/test_study/manifest.toml | 7 ++ 15 files changed, 351 insertions(+), 128 deletions(-) create mode 100644 src/dashboard/queue_distribute/__init__.py create mode 100644 src/dashboard/queue_distribute/queue_distribute.py create mode 100644 src/dashboard/queue_distribute/requirements.txt create mode 120000 src/dashboard/queue_distribute/shared rename src/{post_distribute.Dockerfile => queue_distribute.Dockerfile} (59%) create mode 100644 tests/dashboard/test_queue_distribute.py create mode 100644 tests/test_data/mock_payloads/test_study/foo.sql create mode 100644 tests/test_data/mock_payloads/test_study/foobar.sql create mode 100644 tests/test_data/mock_payloads/test_study/manifest.toml diff --git a/MAINTAINER.md b/MAINTAINER.md index 8b1d1c3..1c2812d 100644 --- a/MAINTAINER.md +++ b/MAINTAINER.md @@ -58,8 +58,24 @@ The SAM framework extends native cloudformation, usually with a lighter syntax, ### Sync --watch mode gotchas -- If you modify S3 bucket permissions while in watch mode, changes to the bucket may generate a permission denied message. You'll need to delete the bucket and bring down the deployment before restarting to apply your changes. +- If you modify S3 bucket permissions while in watch mode, changes to the bucket may generate a permission denied message. You'll need to delete the bucket and bring down the deployment before restarting to apply your changes, or manually update the S3 bucket permissions. - Similarly, if you end up in a ROLLBACK_FAILED state, usually the only recourse is to bring the deployment down and resync, do a regular deployment deployment, or manually initiate a rollback from the AWS console. -Using deploy is a little safer than sync in this regard, though it does take longer for each deployment. Use your best judgement. \ No newline at end of file +Using deploy is a little safer than sync in this regard, though it does take longer for each deployment. Use your best judgement. + +## Testing ECR containers locally + +If you want to test a lambda ECR container, do the following from a console at the root of this repo: +- `docker buildx build -t -f src/ src/` +- `docker run \ +-e AWS_REGION='us-east-1' \ +-e AWS_ACCESS_KEY_ID=' \ +-p 9000:8080 \ +` + +You can then post messages to your endpoints on port 9000 on localhost, and it should interact with the rest of your +AWS account (if required). \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b460265..708f359 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,8 @@ test = [ "pytest", "pytest-cov", "pytest-mock", - "responses" + "pytest-subprocess", + "responses", ] dev = [ "pre-commit", diff --git a/src/dashboard/post_distribute/post_distribute.py b/src/dashboard/post_distribute/post_distribute.py index cbd9732..0e91c57 100644 --- a/src/dashboard/post_distribute/post_distribute.py +++ b/src/dashboard/post_distribute/post_distribute.py @@ -1,89 +1,51 @@ +""" Handler for validating the payload contents and submitting to queue + +""" import json import os -import pathlib import boto3 import requests -from cumulus_library import cli -from shared import decorators, functions +from shared import functions -# lambda performance tuning - moving these outside mean that they will not -# contribute to the lambda init window sns_client = boto3.client("sns", os.environ.get("AWS_REGION")) -# in dockerized lambdas, `/tmp` is the only valid write location -BASE_DIR = "/tmp" # noqa: S108 - - -def get_study_from_github(url): - if "smart-on-fhir" not in url: - raise ValueError(f"{url} is not an official Cumulus study.") - if not url.endswith("/"): - url = url + "/" - api_url = ( - url.replace("https://github.com/", "https://api.github.com/repos/") - + "git/trees/main?recursive=1" - ) - raw_url_base = ( - url.replace("https://github.com/", "https://raw.githubusercontent.com/") + "main/" - ) - study_name = url.split("/")[-2] - res = requests.get(api_url, timeout=10) +valid_chars ="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~:/?#[]@!$&'()*+," + +def validate_github_url(config): + if ( + not config['url'].startswith("https://github.com/smart-on-fhir/") + or any(c not in valid_chars for c in config['url']) + ): + raise ValueError(f"{config['url']} is not an official Cumulus study.") + res = requests.get(config['url'], timeout=10) if res.status_code != 200: - raise ValueError(f"{url} is not a valid git repository") - files = res.json()["tree"] - for file in files: - if file["type"] != "blob": - continue - write_path = pathlib.Path(f"{BASE_DIR}/studies") / study_name / file["path"] - write_path.parent.mkdir(parents=True, exist_ok=True) - with requests.get(raw_url_base + file["path"], timeout=10) as res: - with open(write_path, "w", encoding="UTF-8") as f: - f.write(res.text) - - -def prepare_study(body: dict): - write_path = pathlib.Path(f"{BASE_DIR}/prepared") - write_path.mkdir(parents=True, exist_ok=True) - cli.main( - cli_args=[ - "build", - "-t", - body["study_name"], - "-s", - f"{BASE_DIR}/studies", - "--prepare", - f"{BASE_DIR}/prepared", - ], - ) - return pathlib.Path(f"/{BASE_DIR}/prepared") / f"{body['study_name']}.zip" - - -def process_body(body: dict): + raise ValueError(f"{config['url']} is not a valid git repository") + if 'tag' in config and config['tag'] is not None: + res = requests.get(config['url'] + f'/tree/{config["tag"]}', timeout=10) + if res.status_code != 200: + raise ValueError(f"{config['tag']} is not a valid tag") + + +def validate_body(body: dict): """Selects the appropriate handler for processing study requests""" for key in body.keys(): match key: case "study_name": pass case "github": - get_study_from_github(body[key]) + validate_github_url(body[key]) case _: raise ValueError(f"Invalid key {key} received.") -@decorators.generic_error_handler(msg="Error generating distributed request") +#@decorators.generic_error_handler(msg="Error generating distributed request") def distribute_handler(event: dict, context): """Creates a distribution packages and queues for delivery""" del context body = json.loads(event["body"]) - process_body(body) - payload = prepare_study(body) - topic_sns_arn = os.environ.get("TOPIC_STUDY_PAYLOAD_ARN") - with open(payload, "rb") as f: - sns_client.publish( - TopicArn=topic_sns_arn, - Message=str(f.read()), - MessageGroupId=body["study_name"], - Subject=body["study_name"], - ) - res = functions.http_response(200, f'Study {body["study_name"]} queued.') + validate_body(body) + topic_sns_arn = os.environ.get("TOPIC_QUEUE_API_ARN") + sns_client.publish(TopicArn=topic_sns_arn, Message=event["body"], Subject=body["study_name"]) + # TODO: should we create an ID/API for the dashboard to track request status? + res = functions.http_response(200, f'Preparing to queue {body["study_name"]}.') return res diff --git a/src/dashboard/post_distribute/requirements.txt b/src/dashboard/post_distribute/requirements.txt index 23bb0cc..663bd1f 100644 --- a/src/dashboard/post_distribute/requirements.txt +++ b/src/dashboard/post_distribute/requirements.txt @@ -1 +1 @@ -cumulus-library >= 4.1.3 \ No newline at end of file +requests \ No newline at end of file diff --git a/src/dashboard/queue_distribute/__init__.py b/src/dashboard/queue_distribute/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dashboard/queue_distribute/queue_distribute.py b/src/dashboard/queue_distribute/queue_distribute.py new file mode 100644 index 0000000..18fcdb9 --- /dev/null +++ b/src/dashboard/queue_distribute/queue_distribute.py @@ -0,0 +1,103 @@ +""" Handler for submitting studies to an SNS queue for distribution to remote sites. + +In the long term, this module (or a submodule imported by this module) will be responsible +for parsing the output of a dashboard guided workflow/query builder generated payload, +and converting it to a cumulus library compatible study. + +It also enables distribution of a smart-on-fhir owned study directly from github, +which is the mode it operates in today. + +Due to needing cumulus library, this handler is different from all the other lambda +handlers, in that it is packaged in a docker image and loaded from an elastic container +registry. This has a few architectural implications not present in other lambdas - notably, +the home directory is static and we have to write any data to disk inside of /tmp as the +only writable location. + +""" +import json +import os +import pathlib +import subprocess + +import boto3 +from cumulus_library import cli +from shared import decorators, functions + +# lambda performance tuning - moving these two variables to be global in the module +# means that their initialization happens during build, rather than invocation. +# This helps reduce the spinup time, especially for the boto client, and since +# there are some hefty bits in here already with the docker spinup, shaving a second +# or two off here is helpful to keep us under the ten second timeout. +# +# https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html#function-code + +sns_client = boto3.client("sns", os.environ.get("AWS_REGION")) +BASE_DIR = "/tmp" # noqa: S108 + +def get_study_from_github(config): + try: + subprocess.run(['/usr/bin/git', 'clone', config['url'], f"{BASE_DIR}/studies"], check=True) # noqa: S603 + if 'tag' in config and config['tag'] is not None: + subprocess.run( # noqa: S603 + ['/usr/bin/git', 'checkout','tag'], + cwd = f"{BASE_DIR}/studies/{config['url'].split('/')[-2]}", + ) + + except subprocess.CalledProcessError: + # TODO: retry/backoff logic? or do we just have a user queue again? + raise ValueError(f"{config['url']} is unavailable") + +def prepare_study(body: dict): + write_path = pathlib.Path(f"{BASE_DIR}/prepared") + write_path.mkdir(parents=True, exist_ok=True) + cli.main( + cli_args=[ + "build", + "-t", + body["study_name"], + "-s", + f"{BASE_DIR}/studies", + "--prepare", + f"{write_path}", + ], + ) + return pathlib.Path(f"{write_path}") / f"{body['study_name']}.zip" + + +def process_body(body: dict): + """Selects the appropriate handler for processing study requests""" + for key in body.keys(): + match key: + case "study_name": + pass + case "github": + get_study_from_github(body[key]) + case _: + raise ValueError(f"Invalid key {key} received.") + + +@decorators.generic_error_handler(msg="Error generating distributed request") +def queue_handler(event: dict, context): + """Creates a distribution packages and queues for delivery""" + del context + body = event["Records"][0]["Sns"]["Message"] + body = json.loads(body) + process_body(body) + payload = prepare_study(body) + topic_sns_arn = os.environ.get("TOPIC_STUDY_PAYLOAD_ARN") + with open(payload, "rb") as f: + file = f.read() + sns_client.publish( + TopicArn=topic_sns_arn, + Message=json.dumps({'study': body["study_name"]}), + MessageGroupId=body["study_name"], + Subject=body["study_name"], + MessageAttributes={ + 'study':{ + "DataType": "Binary", + "BinaryValue": file + } + } + ) + res = functions.http_response(200, f'Study {body["study_name"]} queued.') + return res diff --git a/src/dashboard/queue_distribute/requirements.txt b/src/dashboard/queue_distribute/requirements.txt new file mode 100644 index 0000000..23bb0cc --- /dev/null +++ b/src/dashboard/queue_distribute/requirements.txt @@ -0,0 +1 @@ +cumulus-library >= 4.1.3 \ No newline at end of file diff --git a/src/dashboard/queue_distribute/shared b/src/dashboard/queue_distribute/shared new file mode 120000 index 0000000..0ab0cb2 --- /dev/null +++ b/src/dashboard/queue_distribute/shared @@ -0,0 +1 @@ +../../shared \ No newline at end of file diff --git a/src/post_distribute.Dockerfile b/src/queue_distribute.Dockerfile similarity index 59% rename from src/post_distribute.Dockerfile rename to src/queue_distribute.Dockerfile index 5949530..5d28d77 100644 --- a/src/post_distribute.Dockerfile +++ b/src/queue_distribute.Dockerfile @@ -1,9 +1,12 @@ FROM public.ecr.aws/lambda/python:3.11 +RUN yum update git -y +RUN yum install git -y + WORKDIR ${LAMBDA_TASK_ROOT} -COPY dashboard/post_distribute/requirements.txt . +COPY dashboard/queue_distribute/requirements.txt . RUN pip install -r requirements.txt -COPY dashboard/post_distribute/post_distribute.py . +COPY dashboard/queue_distribute/queue_distribute.py . COPY shared shared # Force setup of some initial matplotlib configuration artifacts @@ -11,4 +14,4 @@ RUN mkdir /tmp/matlplotlib ENV MPLCONFIGDIR=/tmp/matlplotlib RUN cumulus-library version -CMD ["post_distribute.distribute_handler"] \ No newline at end of file +CMD ["queue_distribute.queue_handler"] \ No newline at end of file diff --git a/template.yaml b/template.yaml index 16aaeae..0f0115e 100644 --- a/template.yaml +++ b/template.yaml @@ -748,19 +748,18 @@ Resources: Type: AWS::Serverless::Function Properties: FunctionName: !Sub 'CumulusAggDashboardDistributeStudy-${DeployStage}' - PackageType: Image - ImageConfig: - Command: ["post_distribute.distribute_handler"] + CodeUri: ./src/dashboard/post_distribute + Handler: post_distribute.distribute_handler + Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel LogFormat: !Ref LogFormat LogGroup: !Sub "/aws/lambda/CumulusAggDashboardDistributeStudy-${DeployStage}" - MemorySize: 512 - Timeout: 100 + MemorySize: 128 + Timeout: 30 Environment: Variables: - BUCKET_NAME: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}' - TOPIC_STUDY_PAYLOAD_ARN: !Ref SNSStudyPayload + TOPIC_QUEUE_API_ARN: !Ref SNSTopicQueueDistribution Events: PostDistributeStudyAPI: Type: Api @@ -768,6 +767,38 @@ Resources: RestApiId: !Ref DashboardApiGateway Path: /distribute/ Method: Post + Policies: + - SNSPublishMessagePolicy: + TopicName: !GetAtt SNSTopicQueueDistribution.TopicName + + DashboardPostDistributeStudyLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${DashboardPostDistributeStudyFunction}" + RetentionInDays: !Ref RetentionTime + + DashboardQueueDistributeStudyFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub 'CumulusAggDashboardQueueStudy-${DeployStage}' + PackageType: Image + ImageConfig: + Command: ["queue_distribute.queue_handler"] + LoggingConfig: + ApplicationLogLevel: !Ref LogLevel + LogFormat: !Ref LogFormat + LogGroup: !Sub "/aws/lambda/CumulusAggDashboardQueueStudy-${DeployStage}" + MemorySize: 512 + Timeout: 100 + Environment: + Variables: + BUCKET_NAME: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}' + TOPIC_STUDY_PAYLOAD_ARN: !Ref SNSStudyPayload + Events: + ProcessQueueDistributeSNSEvent: + Type: SNS + Properties: + Topic: !Ref SNSTopicQueueDistribution Policies: - S3ReadPolicy: BucketName: !Sub '${BucketNameParameter}-${AWS::AccountId}-${DeployStage}' @@ -781,14 +812,14 @@ Resources: - SNSPublishMessagePolicy: TopicName: !GetAtt SNSStudyPayload.TopicName Metadata: - Dockerfile: post_distribute.Dockerfile + Dockerfile: queue_distribute.Dockerfile DockerContext: ./src/ DockerTag: v1 - DashboardPostDistributeStudyLogGroup: + DashboardQueueDistributeStudyLogGroup: Type: AWS::Logs::LogGroup Properties: - LogGroupName: !Sub "/aws/lambda/${DashboardPostDistributeStudyFunction}" + LogGroupName: !Sub "/aws/lambda/${DashboardQueueDistributeStudyFunction}" RetentionInDays: !Ref RetentionTime ### Lambda permissions @@ -836,6 +867,14 @@ Resources: - Key: Name Value: !Sub 'SNSTopicCacheAPI-${DeployStage}' + SNSTopicQueueDistribution: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub 'SNSTopicQueueDistribution-${DeployStage}' + Tags: + - Key: Name + Value: !Sub 'SNSTopicQueueDistribution-${DeployStage}' + SNSStudyPayload: Type: AWS::SNS::Topic Properties: diff --git a/tests/dashboard/test_post_distribute.py b/tests/dashboard/test_post_distribute.py index c538d8f..369c381 100644 --- a/tests/dashboard/test_post_distribute.py +++ b/tests/dashboard/test_post_distribute.py @@ -10,68 +10,60 @@ @mock.patch.dict( - os.environ, {"TOPIC_STUDY_PAYLOAD_ARN": "test-payload", "AWS_REGION": "us-east-1"}, clear=True + os.environ, {"TOPIC_QUEUE_API_ARN": "test-payload", "AWS_REGION": "us-east-1"}, clear=True ) @pytest.mark.parametrize( - "name,url,expected_status", + "name,url,tag,expected_status", [ ( "test_study", - "https://github.com/smart-on-fhir/test_study/", + "https://github.com/smart-on-fhir/test_study", + None, 200, ), - ("invalid_study", "https://github.com/smart-on-fhir/invalid_study", 500), - ("non_cumulus_repo", "https://github.com/user/non_cumulus_repo", 500), + ( + "test_study", + "https://github.com/smart-on-fhir/test_study", + "tag", + 200, + ), + ( + "invalid_study", + "https://github.com/smart-on-fhir/invalid_study", + None, + 500, + ), + ( + "non_cumulus_repo", + "https://github.com/user/non_cumulus_repo", + None, + 500 + ), ], ) @responses.activate @freeze_time("2020-01-01") -def test_process_github(mock_notification, tmp_path, name, url, expected_status, monkeypatch): - responses.add( - responses.GET, - "https://api.github.com/repos/smart-on-fhir/test_study/git/trees/main?recursive=1", - json={ - "tree": [ - { - "path": ".github", - "type": "tree", - }, - { - "path": "test.sql", - "type": "blob", - }, - { - "path": "manifest.toml", - "type": "blob", - }, - ] - }, - ) +def test_process_github(mock_notification, tmp_path, name, url, tag, expected_status, monkeypatch): responses.add( responses.GET, - "https://api.github.com/repos/smart-on-fhir/invalid_study/git/trees/main?recursive=1", - status=404, + "https://github.com/smart-on-fhir/test_study", + status=200 ) responses.add( responses.GET, - "https://raw.githubusercontent.com/smart-on-fhir/test_study/main/test.sql", - body="""CREATE TABLE test_study__table AS -SELECT * from core__patient""", + "https://github.com/smart-on-fhir/test_study/tree/tag", + status=200 ) responses.add( responses.GET, - "https://raw.githubusercontent.com/smart-on-fhir/test_study/main/manifest.toml", - body="""study_prefix="test_study" -[file_config] -file_names=[ - "test.sql" -]""", + "https://github.com/smart-on-fhir/invalid_study", + status=404, ) - monkeypatch.setattr(post_distribute, "BASE_DIR", tmp_path) + mock_sns = mock.MagicMock() monkeypatch.setattr(post_distribute, "sns_client", mock_sns) res = post_distribute.distribute_handler( - {"body": json.dumps({"github": url, "study_name": name})}, {} + {"body": json.dumps({"github": {"url": url, "tag": tag}, "study_name": name})}, {} ) assert res["statusCode"] == expected_status if expected_status == 200: @@ -80,15 +72,12 @@ def test_process_github(mock_notification, tmp_path, name, url, expected_status, "TopicArn": "test-payload", "MessageGroupId": "test_study", "Subject": "test_study", + "Message": json.dumps({"github": {"url":url, "tag":tag}, "study_name": name}) } for k, v in mock_sns.publish.call_args[1].items(): - if k == "Message": - # zipping these files is not 100% stochastic due to the tmpdir name, so - # we'll just check for the expected file in the zip binary string - assert "0000.test.00.create_table_test_study__table" in v - else: - assert expected_message[k] == v + assert expected_message[k] == v + def test_invalid_key(): res = post_distribute.distribute_handler({"body": json.dumps({"bad_para,": "foo"})}, {}) diff --git a/tests/dashboard/test_queue_distribute.py b/tests/dashboard/test_queue_distribute.py new file mode 100644 index 0000000..96f85e0 --- /dev/null +++ b/tests/dashboard/test_queue_distribute.py @@ -0,0 +1,99 @@ +import json +import os +import pathlib +import shutil +import subprocess +from unittest import mock + +import pytest +import responses +from freezegun import freeze_time + +from src.dashboard.queue_distribute import queue_distribute + + +def error_callback(process): + process.returncode = 1 + raise subprocess.CalledProcessError(1, "Error cloning") + +@mock.patch.dict( + os.environ, {"TOPIC_STUDY_PAYLOAD_ARN": "test-payload", "AWS_REGION": "us-east-1"}, clear=True +) +@pytest.mark.parametrize( + "name,url,tag,expected_status", + [ + ( + "test_study", + "https://github.com/smart-on-fhir/test_study/", + None, + 200, + ), + ( + "test_study", + "https://github.com/smart-on-fhir/test_study/", + "tag", + 200, + ), + ( + "invalid_study", + "https://github.com/smart-on-fhir/invalid_study", + None, + 500 + ), + + ], +) +@responses.activate +def test_process_github( + mock_notification, tmp_path, name, url, tag, expected_status, monkeypatch, fp +): + fp.allow_unregistered(True) # fp is provided by pytest-subprocess + if name == "invalid_study": + fp.register(["/usr/bin/git","clone",url,f"{tmp_path}/studies"], callback=error_callback) + else: + fp.register(["/usr/bin/git","clone",url,f"{tmp_path}/studies"]) + (tmp_path /"studies").mkdir() + shutil.copytree( + pathlib.Path.cwd() / f"./tests/test_data/mock_payloads/{name}/", + tmp_path / f"studies/{name}" + ) + + monkeypatch.setattr(queue_distribute, "BASE_DIR", tmp_path) + mock_sns = mock.MagicMock() + monkeypatch.setattr(queue_distribute, "sns_client", mock_sns) + with freeze_time("2020-01-01"): #Don't use as a fixture for this test; collides with fp mock + res = queue_distribute.queue_handler( + { + "Records": [ + { + "Sns":{ + "Message": json.dumps({ + "github": { + "url": url, "tag": tag + }, + "study_name": name}) + } + } + ] + }, + {} + ) + assert res["statusCode"] == expected_status + if expected_status == 200: + assert mock_sns.publish.is_called() + expected_message = { + "TopicArn": "test-payload", + "MessageGroupId": "test_study", + 'Message': '{"study": "test_study"}', + "Subject": "test_study", + + } + for k, v in mock_sns.publish.call_args[1].items(): + if k == "MessageAttributes": + assert name in str(v['study']['BinaryValue']) + else: + assert expected_message[k] == v + if tag=='tag': + files = [p for p in (tmp_path / f"studies/{name}").iterdir() if p.is_file()] + files = [file.stem for file in files] + assert 'tag' in files diff --git a/tests/test_data/mock_payloads/test_study/foo.sql b/tests/test_data/mock_payloads/test_study/foo.sql new file mode 100644 index 0000000..fe23484 --- /dev/null +++ b/tests/test_data/mock_payloads/test_study/foo.sql @@ -0,0 +1 @@ +CREATE TABLE test_study__foo AS SELECT * FROM bar; \ No newline at end of file diff --git a/tests/test_data/mock_payloads/test_study/foobar.sql b/tests/test_data/mock_payloads/test_study/foobar.sql new file mode 100644 index 0000000..8c96850 --- /dev/null +++ b/tests/test_data/mock_payloads/test_study/foobar.sql @@ -0,0 +1 @@ +CREATE TABLE test_study__foobar AS SELECT * FROM bar; \ No newline at end of file diff --git a/tests/test_data/mock_payloads/test_study/manifest.toml b/tests/test_data/mock_payloads/test_study/manifest.toml new file mode 100644 index 0000000..cedc544 --- /dev/null +++ b/tests/test_data/mock_payloads/test_study/manifest.toml @@ -0,0 +1,7 @@ +study_prefix = "test_study" + +[file_config] +file_names = [ + "foo.sql", + "foobar.sql" +] \ No newline at end of file From 7541a56bde69f836e4fdb725847da1382b1c7dec Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 6 Jan 2025 13:11:16 -0500 Subject: [PATCH 3/7] ruff uprev --- .pre-commit-config.yaml | 2 +- pyproject.toml | 2 +- .../get_chart_data/get_chart_data.py | 1 + src/dashboard/get_csv/get_csv.py | 1 + src/dashboard/get_metadata/get_metadata.py | 1 + .../get_study_periods/get_study_periods.py | 1 + .../post_distribute/post_distribute.py | 22 ++++---- .../queue_distribute/queue_distribute.py | 25 +++++----- src/site_upload/cache_api/cache_api.py | 1 + .../fetch_upload_url/fetch_upload_url.py | 1 + .../powerset_merge/powerset_merge.py | 1 + src/site_upload/process_flat/process_flat.py | 1 + .../process_upload/process_upload.py | 1 + src/site_upload/study_period/study_period.py | 1 + tests/dashboard/test_post_distribute.py | 28 +++-------- tests/dashboard/test_queue_distribute.py | 50 ++++++++----------- 16 files changed, 64 insertions(+), 75 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ca474bd..5285abf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ default_install_hook_types: [pre-commit, pre-push] repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.5.7 + rev: v0.8.6 hooks: - name: Ruff linting id: ruff diff --git a/pyproject.toml b/pyproject.toml index 708f359..8de6203 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ test = [ ] dev = [ "pre-commit", - "ruff < 0.6", + "ruff < 0.9", "sqlfluff >= 3.2.5" ] diff --git a/src/dashboard/get_chart_data/get_chart_data.py b/src/dashboard/get_chart_data/get_chart_data.py index ab61fb1..a1c3559 100644 --- a/src/dashboard/get_chart_data/get_chart_data.py +++ b/src/dashboard/get_chart_data/get_chart_data.py @@ -10,6 +10,7 @@ import boto3 import jinja2 import pandas + from shared import decorators, enums, errors, functions log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO") diff --git a/src/dashboard/get_csv/get_csv.py b/src/dashboard/get_csv/get_csv.py index 59000cd..a785001 100644 --- a/src/dashboard/get_csv/get_csv.py +++ b/src/dashboard/get_csv/get_csv.py @@ -2,6 +2,7 @@ import boto3 import botocore + from shared import decorators, enums, functions diff --git a/src/dashboard/get_metadata/get_metadata.py b/src/dashboard/get_metadata/get_metadata.py index 477433d..bc84c61 100644 --- a/src/dashboard/get_metadata/get_metadata.py +++ b/src/dashboard/get_metadata/get_metadata.py @@ -3,6 +3,7 @@ import os import boto3 + from shared.decorators import generic_error_handler from shared.functions import http_response, read_metadata diff --git a/src/dashboard/get_study_periods/get_study_periods.py b/src/dashboard/get_study_periods/get_study_periods.py index d979155..afbd850 100644 --- a/src/dashboard/get_study_periods/get_study_periods.py +++ b/src/dashboard/get_study_periods/get_study_periods.py @@ -3,6 +3,7 @@ import os import boto3 + from shared import decorators, enums, functions diff --git a/src/dashboard/post_distribute/post_distribute.py b/src/dashboard/post_distribute/post_distribute.py index 0e91c57..c4c5dd4 100644 --- a/src/dashboard/post_distribute/post_distribute.py +++ b/src/dashboard/post_distribute/post_distribute.py @@ -1,30 +1,30 @@ -""" Handler for validating the payload contents and submitting to queue +"""Handler for validating the payload contents and submitting to queue""" -""" import json import os import boto3 import requests + from shared import functions sns_client = boto3.client("sns", os.environ.get("AWS_REGION")) -valid_chars ="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~:/?#[]@!$&'()*+," +valid_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~:/?#[]@!$&'()*+," + def validate_github_url(config): - if ( - not config['url'].startswith("https://github.com/smart-on-fhir/") - or any(c not in valid_chars for c in config['url']) + if not config["url"].startswith("https://github.com/smart-on-fhir/") or any( + c not in valid_chars for c in config["url"] ): raise ValueError(f"{config['url']} is not an official Cumulus study.") - res = requests.get(config['url'], timeout=10) + res = requests.get(config["url"], timeout=10) if res.status_code != 200: raise ValueError(f"{config['url']} is not a valid git repository") - if 'tag' in config and config['tag'] is not None: - res = requests.get(config['url'] + f'/tree/{config["tag"]}', timeout=10) + if "tag" in config and config["tag"] is not None: + res = requests.get(config["url"] + f'/tree/{config["tag"]}', timeout=10) if res.status_code != 200: raise ValueError(f"{config['tag']} is not a valid tag") - + def validate_body(body: dict): """Selects the appropriate handler for processing study requests""" @@ -38,7 +38,7 @@ def validate_body(body: dict): raise ValueError(f"Invalid key {key} received.") -#@decorators.generic_error_handler(msg="Error generating distributed request") +# @decorators.generic_error_handler(msg="Error generating distributed request") def distribute_handler(event: dict, context): """Creates a distribution packages and queues for delivery""" del context diff --git a/src/dashboard/queue_distribute/queue_distribute.py b/src/dashboard/queue_distribute/queue_distribute.py index 18fcdb9..383ac23 100644 --- a/src/dashboard/queue_distribute/queue_distribute.py +++ b/src/dashboard/queue_distribute/queue_distribute.py @@ -1,4 +1,4 @@ -""" Handler for submitting studies to an SNS queue for distribution to remote sites. +"""Handler for submitting studies to an SNS queue for distribution to remote sites. In the long term, this module (or a submodule imported by this module) will be responsible for parsing the output of a dashboard guided workflow/query builder generated payload, @@ -14,6 +14,7 @@ only writable location. """ + import json import os import pathlib @@ -21,6 +22,7 @@ import boto3 from cumulus_library import cli + from shared import decorators, functions # lambda performance tuning - moving these two variables to be global in the module @@ -34,19 +36,21 @@ sns_client = boto3.client("sns", os.environ.get("AWS_REGION")) BASE_DIR = "/tmp" # noqa: S108 + def get_study_from_github(config): try: - subprocess.run(['/usr/bin/git', 'clone', config['url'], f"{BASE_DIR}/studies"], check=True) # noqa: S603 - if 'tag' in config and config['tag'] is not None: + subprocess.run(["/usr/bin/git", "clone", config["url"], f"{BASE_DIR}/studies"], check=True) # noqa: S603 + if "tag" in config and config["tag"] is not None: subprocess.run( # noqa: S603 - ['/usr/bin/git', 'checkout','tag'], - cwd = f"{BASE_DIR}/studies/{config['url'].split('/')[-2]}", + ["/usr/bin/git", "checkout", "tag"], + cwd=f"{BASE_DIR}/studies/{config['url'].split('/')[-2]}", ) - + except subprocess.CalledProcessError: # TODO: retry/backoff logic? or do we just have a user queue again? raise ValueError(f"{config['url']} is unavailable") + def prepare_study(body: dict): write_path = pathlib.Path(f"{BASE_DIR}/prepared") write_path.mkdir(parents=True, exist_ok=True) @@ -89,15 +93,10 @@ def queue_handler(event: dict, context): file = f.read() sns_client.publish( TopicArn=topic_sns_arn, - Message=json.dumps({'study': body["study_name"]}), + Message=json.dumps({"study": body["study_name"]}), MessageGroupId=body["study_name"], Subject=body["study_name"], - MessageAttributes={ - 'study':{ - "DataType": "Binary", - "BinaryValue": file - } - } + MessageAttributes={"study": {"DataType": "Binary", "BinaryValue": file}}, ) res = functions.http_response(200, f'Study {body["study_name"]} queued.') return res diff --git a/src/site_upload/cache_api/cache_api.py b/src/site_upload/cache_api/cache_api.py index e1de6fb..96b8d63 100644 --- a/src/site_upload/cache_api/cache_api.py +++ b/src/site_upload/cache_api/cache_api.py @@ -5,6 +5,7 @@ import awswrangler import boto3 + from shared import decorators, enums, functions diff --git a/src/site_upload/fetch_upload_url/fetch_upload_url.py b/src/site_upload/fetch_upload_url/fetch_upload_url.py index 8dbb126..6df6e0b 100644 --- a/src/site_upload/fetch_upload_url/fetch_upload_url.py +++ b/src/site_upload/fetch_upload_url/fetch_upload_url.py @@ -6,6 +6,7 @@ import boto3 import botocore.exceptions + from shared.decorators import generic_error_handler from shared.enums import BucketPath from shared.functions import get_s3_json_as_dict, http_response diff --git a/src/site_upload/powerset_merge/powerset_merge.py b/src/site_upload/powerset_merge/powerset_merge.py index 35b654d..532af90 100644 --- a/src/site_upload/powerset_merge/powerset_merge.py +++ b/src/site_upload/powerset_merge/powerset_merge.py @@ -7,6 +7,7 @@ import awswrangler import pandas from pandas.core.indexes.range import RangeIndex + from shared import awswrangler_functions, decorators, enums, functions, pandas_functions, s3_manager log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO") diff --git a/src/site_upload/process_flat/process_flat.py b/src/site_upload/process_flat/process_flat.py index b44f65a..94bb735 100644 --- a/src/site_upload/process_flat/process_flat.py +++ b/src/site_upload/process_flat/process_flat.py @@ -2,6 +2,7 @@ import os import awswrangler + from shared import awswrangler_functions, decorators, enums, functions, pandas_functions, s3_manager log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO") diff --git a/src/site_upload/process_upload/process_upload.py b/src/site_upload/process_upload/process_upload.py index b960807..2e9d848 100644 --- a/src/site_upload/process_upload/process_upload.py +++ b/src/site_upload/process_upload/process_upload.py @@ -4,6 +4,7 @@ import os import boto3 + from shared import decorators, enums, functions log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO") diff --git a/src/site_upload/study_period/study_period.py b/src/site_upload/study_period/study_period.py index 0c2d85f..49b7221 100644 --- a/src/site_upload/study_period/study_period.py +++ b/src/site_upload/study_period/study_period.py @@ -5,6 +5,7 @@ import awswrangler import boto3 + from shared import awswrangler_functions, decorators, enums, functions diff --git a/tests/dashboard/test_post_distribute.py b/tests/dashboard/test_post_distribute.py index 369c381..327d22d 100644 --- a/tests/dashboard/test_post_distribute.py +++ b/tests/dashboard/test_post_distribute.py @@ -1,3 +1,5 @@ +"""Validates that we've received a usable request for a study to distribute""" + import json import os from unittest import mock @@ -33,33 +35,20 @@ None, 500, ), - ( - "non_cumulus_repo", - "https://github.com/user/non_cumulus_repo", - None, - 500 - ), + ("non_cumulus_repo", "https://github.com/user/non_cumulus_repo", None, 500), ], ) @responses.activate @freeze_time("2020-01-01") def test_process_github(mock_notification, tmp_path, name, url, tag, expected_status, monkeypatch): - responses.add( - responses.GET, - "https://github.com/smart-on-fhir/test_study", - status=200 - ) - responses.add( - responses.GET, - "https://github.com/smart-on-fhir/test_study/tree/tag", - status=200 - ) + responses.add(responses.GET, "https://github.com/smart-on-fhir/test_study", status=200) + responses.add(responses.GET, "https://github.com/smart-on-fhir/test_study/tree/tag", status=200) responses.add( responses.GET, "https://github.com/smart-on-fhir/invalid_study", status=404, ) - + mock_sns = mock.MagicMock() monkeypatch.setattr(post_distribute, "sns_client", mock_sns) res = post_distribute.distribute_handler( @@ -72,12 +61,11 @@ def test_process_github(mock_notification, tmp_path, name, url, tag, expected_st "TopicArn": "test-payload", "MessageGroupId": "test_study", "Subject": "test_study", - "Message": json.dumps({"github": {"url":url, "tag":tag}, "study_name": name}) + "Message": json.dumps({"github": {"url": url, "tag": tag}, "study_name": name}), } for k, v in mock_sns.publish.call_args[1].items(): - assert expected_message[k] == v - + def test_invalid_key(): res = post_distribute.distribute_handler({"body": json.dumps({"bad_para,": "foo"})}, {}) diff --git a/tests/dashboard/test_queue_distribute.py b/tests/dashboard/test_queue_distribute.py index 96f85e0..5135d13 100644 --- a/tests/dashboard/test_queue_distribute.py +++ b/tests/dashboard/test_queue_distribute.py @@ -16,6 +16,7 @@ def error_callback(process): process.returncode = 1 raise subprocess.CalledProcessError(1, "Error cloning") + @mock.patch.dict( os.environ, {"TOPIC_STUDY_PAYLOAD_ARN": "test-payload", "AWS_REGION": "us-east-1"}, clear=True ) @@ -34,49 +35,41 @@ def error_callback(process): "tag", 200, ), - ( - "invalid_study", - "https://github.com/smart-on-fhir/invalid_study", - None, - 500 - ), - + ("invalid_study", "https://github.com/smart-on-fhir/invalid_study", None, 500), ], ) @responses.activate def test_process_github( mock_notification, tmp_path, name, url, tag, expected_status, monkeypatch, fp ): - fp.allow_unregistered(True) # fp is provided by pytest-subprocess + fp.allow_unregistered(True) # fp is provided by pytest-subprocess if name == "invalid_study": - fp.register(["/usr/bin/git","clone",url,f"{tmp_path}/studies"], callback=error_callback) + fp.register(["/usr/bin/git", "clone", url, f"{tmp_path}/studies"], callback=error_callback) else: - fp.register(["/usr/bin/git","clone",url,f"{tmp_path}/studies"]) - (tmp_path /"studies").mkdir() + fp.register(["/usr/bin/git", "clone", url, f"{tmp_path}/studies"]) + (tmp_path / "studies").mkdir() shutil.copytree( - pathlib.Path.cwd() / f"./tests/test_data/mock_payloads/{name}/", - tmp_path / f"studies/{name}" + pathlib.Path.cwd() / f"./tests/test_data/mock_payloads/{name}/", + tmp_path / f"studies/{name}", ) - + monkeypatch.setattr(queue_distribute, "BASE_DIR", tmp_path) mock_sns = mock.MagicMock() monkeypatch.setattr(queue_distribute, "sns_client", mock_sns) - with freeze_time("2020-01-01"): #Don't use as a fixture for this test; collides with fp mock + with freeze_time("2020-01-01"): # Don't use as a fixture for this test; collides with fp mock res = queue_distribute.queue_handler( - { + { "Records": [ { - "Sns":{ - "Message": json.dumps({ - "github": { - "url": url, "tag": tag - }, - "study_name": name}) + "Sns": { + "Message": json.dumps( + {"github": {"url": url, "tag": tag}, "study_name": name} + ) } } ] }, - {} + {}, ) assert res["statusCode"] == expected_status if expected_status == 200: @@ -84,16 +77,15 @@ def test_process_github( expected_message = { "TopicArn": "test-payload", "MessageGroupId": "test_study", - 'Message': '{"study": "test_study"}', + "Message": '{"study": "test_study"}', "Subject": "test_study", - } for k, v in mock_sns.publish.call_args[1].items(): if k == "MessageAttributes": - assert name in str(v['study']['BinaryValue']) - else: + assert name in str(v["study"]["BinaryValue"]) + else: assert expected_message[k] == v - if tag=='tag': + if tag == "tag": files = [p for p in (tmp_path / f"studies/{name}").iterdir() if p.is_file()] files = [file.stem for file in files] - assert 'tag' in files + assert "tag" in files From 3655c13bc4ccea08114fc09a01021ff00947425a Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 6 Jan 2025 13:29:27 -0500 Subject: [PATCH 4/7] decorator cleanup --- src/dashboard/post_distribute/post_distribute.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dashboard/post_distribute/post_distribute.py b/src/dashboard/post_distribute/post_distribute.py index c4c5dd4..e14735d 100644 --- a/src/dashboard/post_distribute/post_distribute.py +++ b/src/dashboard/post_distribute/post_distribute.py @@ -6,7 +6,7 @@ import boto3 import requests -from shared import functions +from shared import decorators, functions sns_client = boto3.client("sns", os.environ.get("AWS_REGION")) valid_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~:/?#[]@!$&'()*+," @@ -35,10 +35,10 @@ def validate_body(body: dict): case "github": validate_github_url(body[key]) case _: - raise ValueError(f"Invalid key {key} received.") + raise ValueError(f"Invalid key {body[key]} received.") -# @decorators.generic_error_handler(msg="Error generating distributed request") +@decorators.generic_error_handler(msg="Error generating distributed request") def distribute_handler(event: dict, context): """Creates a distribution packages and queues for delivery""" del context From 628df1816167adb1550fccdaa4a25a1e1da490bf Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 6 Jan 2025 14:58:59 -0500 Subject: [PATCH 5/7] Submodule, urlparse --- .github/workflows/ci.yaml | 2 ++ .gitmodules | 3 +++ .../post_distribute/post_distribute.py | 8 +++++-- .../queue_distribute/queue_distribute.py | 10 ++++---- tests/dashboard/test_queue_distribute.py | 23 +++++++++++++------ .../cumulus-aggregator-test-study | 1 + .../mock_payloads/test_study/foo.sql | 1 - .../mock_payloads/test_study/foobar.sql | 1 - .../mock_payloads/test_study/manifest.toml | 7 ------ 9 files changed, 32 insertions(+), 24 deletions(-) create mode 100644 .gitmodules create mode 160000 tests/test_data/mock_payloads/cumulus-aggregator-test-study delete mode 100644 tests/test_data/mock_payloads/test_study/foo.sql delete mode 100644 tests/test_data/mock_payloads/test_study/foobar.sql delete mode 100644 tests/test_data/mock_payloads/test_study/manifest.toml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 461e242..385e295 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,6 +24,8 @@ jobs: run: | python -m pip install --upgrade pip pip install ".[test]" + git submodule init + git submodule update - name: Test with pytest run: | export AWS_DEFAULT_REGION=us-east-1; python -m pytest --cov-report xml --cov=src tests diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..5f499a5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "tests/test_data/mock_payloads/cumulus-aggregator-test-study"] + path = tests/test_data/mock_payloads/cumulus-aggregator-test-study + url = https://github.com/smart-on-fhir/cumulus-aggregator-test-study diff --git a/src/dashboard/post_distribute/post_distribute.py b/src/dashboard/post_distribute/post_distribute.py index e14735d..3e0ccfe 100644 --- a/src/dashboard/post_distribute/post_distribute.py +++ b/src/dashboard/post_distribute/post_distribute.py @@ -2,6 +2,7 @@ import json import os +import urllib import boto3 import requests @@ -13,8 +14,11 @@ def validate_github_url(config): - if not config["url"].startswith("https://github.com/smart-on-fhir/") or any( - c not in valid_chars for c in config["url"] + parsed_url = urllib.parse.urlparse(config["url"]) + if ( + not parsed_url.netloc == "github.com" + or not parsed_url.path.startswith("/smart-on-fhir") + or any(c not in valid_chars for c in config["url"]) ): raise ValueError(f"{config['url']} is not an official Cumulus study.") res = requests.get(config["url"], timeout=10) diff --git a/src/dashboard/queue_distribute/queue_distribute.py b/src/dashboard/queue_distribute/queue_distribute.py index 383ac23..ce55087 100644 --- a/src/dashboard/queue_distribute/queue_distribute.py +++ b/src/dashboard/queue_distribute/queue_distribute.py @@ -39,12 +39,10 @@ def get_study_from_github(config): try: - subprocess.run(["/usr/bin/git", "clone", config["url"], f"{BASE_DIR}/studies"], check=True) # noqa: S603 - if "tag" in config and config["tag"] is not None: - subprocess.run( # noqa: S603 - ["/usr/bin/git", "checkout", "tag"], - cwd=f"{BASE_DIR}/studies/{config['url'].split('/')[-2]}", - ) + args = ["--depth", "1", config["url"], f"{BASE_DIR}/studies"] + if config["tag"]: + args = ["--branch", config["tag"], *args] + subprocess.run(["/usr/bin/git", "clone", *args], check=True) # noqa: S603 except subprocess.CalledProcessError: # TODO: retry/backoff logic? or do we just have a user queue again? diff --git a/tests/dashboard/test_queue_distribute.py b/tests/dashboard/test_queue_distribute.py index 5135d13..5ad36e0 100644 --- a/tests/dashboard/test_queue_distribute.py +++ b/tests/dashboard/test_queue_distribute.py @@ -25,13 +25,13 @@ def error_callback(process): [ ( "test_study", - "https://github.com/smart-on-fhir/test_study/", + "https://github.com/smart-on-fhir/cumulus-aggregator-test-study/", None, 200, ), ( "test_study", - "https://github.com/smart-on-fhir/test_study/", + "https://github.com/smart-on-fhir/cumulus-aggregator-test-study/", "tag", 200, ), @@ -43,15 +43,23 @@ def test_process_github( mock_notification, tmp_path, name, url, tag, expected_status, monkeypatch, fp ): fp.allow_unregistered(True) # fp is provided by pytest-subprocess + + args = ["--depth", "1", url, f"{tmp_path}/studies"] + if tag: + args = ["--branch", tag, *args] if name == "invalid_study": - fp.register(["/usr/bin/git", "clone", url, f"{tmp_path}/studies"], callback=error_callback) + fp.register(["/usr/bin/git", "clone", *args], callback=error_callback) else: - fp.register(["/usr/bin/git", "clone", url, f"{tmp_path}/studies"]) + fp.register(["/usr/bin/git", "clone", *args]) (tmp_path / "studies").mkdir() + study_dir = tmp_path / f"studies/{name}" shutil.copytree( - pathlib.Path.cwd() / f"./tests/test_data/mock_payloads/{name}/", - tmp_path / f"studies/{name}", + pathlib.Path.cwd() / "./tests/test_data/mock_payloads/cumulus-aggregator-test-study", + study_dir, ) + if tag: + # if we're checking out a tag, make sure we've set up an actual git repo + subprocess.run(["git", "checkout", "tag"], cwd=study_dir) monkeypatch.setattr(queue_distribute, "BASE_DIR", tmp_path) mock_sns = mock.MagicMock() @@ -88,4 +96,5 @@ def test_process_github( if tag == "tag": files = [p for p in (tmp_path / f"studies/{name}").iterdir() if p.is_file()] files = [file.stem for file in files] - assert "tag" in files + print(type(files[0])) + assert "tag" not in files diff --git a/tests/test_data/mock_payloads/cumulus-aggregator-test-study b/tests/test_data/mock_payloads/cumulus-aggregator-test-study new file mode 160000 index 0000000..cfe01c4 --- /dev/null +++ b/tests/test_data/mock_payloads/cumulus-aggregator-test-study @@ -0,0 +1 @@ +Subproject commit cfe01c40a62e706051cde6d10c5c794c66ce026d diff --git a/tests/test_data/mock_payloads/test_study/foo.sql b/tests/test_data/mock_payloads/test_study/foo.sql deleted file mode 100644 index fe23484..0000000 --- a/tests/test_data/mock_payloads/test_study/foo.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE TABLE test_study__foo AS SELECT * FROM bar; \ No newline at end of file diff --git a/tests/test_data/mock_payloads/test_study/foobar.sql b/tests/test_data/mock_payloads/test_study/foobar.sql deleted file mode 100644 index 8c96850..0000000 --- a/tests/test_data/mock_payloads/test_study/foobar.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE TABLE test_study__foobar AS SELECT * FROM bar; \ No newline at end of file diff --git a/tests/test_data/mock_payloads/test_study/manifest.toml b/tests/test_data/mock_payloads/test_study/manifest.toml deleted file mode 100644 index cedc544..0000000 --- a/tests/test_data/mock_payloads/test_study/manifest.toml +++ /dev/null @@ -1,7 +0,0 @@ -study_prefix = "test_study" - -[file_config] -file_names = [ - "foo.sql", - "foobar.sql" -] \ No newline at end of file From b536ec525477f37800097b7a114424967d18edbb Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 6 Jan 2025 16:30:34 -0500 Subject: [PATCH 6/7] coverage --- .../queue_distribute/queue_distribute.py | 4 ---- tests/dashboard/test_post_distribute.py | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/dashboard/queue_distribute/queue_distribute.py b/src/dashboard/queue_distribute/queue_distribute.py index ce55087..2f4c454 100644 --- a/src/dashboard/queue_distribute/queue_distribute.py +++ b/src/dashboard/queue_distribute/queue_distribute.py @@ -70,12 +70,8 @@ def process_body(body: dict): """Selects the appropriate handler for processing study requests""" for key in body.keys(): match key: - case "study_name": - pass case "github": get_study_from_github(body[key]) - case _: - raise ValueError(f"Invalid key {key} received.") @decorators.generic_error_handler(msg="Error generating distributed request") diff --git a/tests/dashboard/test_post_distribute.py b/tests/dashboard/test_post_distribute.py index 327d22d..b7c4b7e 100644 --- a/tests/dashboard/test_post_distribute.py +++ b/tests/dashboard/test_post_distribute.py @@ -35,6 +35,12 @@ None, 500, ), + ( + "invalid_tag", + "https://github.com/smart-on-fhir/invalid_tag", + "tag", + 500, + ), ("non_cumulus_repo", "https://github.com/user/non_cumulus_repo", None, 500), ], ) @@ -48,6 +54,16 @@ def test_process_github(mock_notification, tmp_path, name, url, tag, expected_st "https://github.com/smart-on-fhir/invalid_study", status=404, ) + responses.add( + responses.GET, + "https://github.com/smart-on-fhir/invalid_tag", + status=200, + ) + responses.add( + responses.GET, + "https://github.com/smart-on-fhir/invalid_tag/tree/tag", + status=404, + ) mock_sns = mock.MagicMock() monkeypatch.setattr(post_distribute, "sns_client", mock_sns) From b20e82b2e4f6e046506925173c28a7693d8ab320 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Tue, 7 Jan 2025 09:06:21 -0500 Subject: [PATCH 7/7] security slash --- src/dashboard/post_distribute/post_distribute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dashboard/post_distribute/post_distribute.py b/src/dashboard/post_distribute/post_distribute.py index 3e0ccfe..06bf7cb 100644 --- a/src/dashboard/post_distribute/post_distribute.py +++ b/src/dashboard/post_distribute/post_distribute.py @@ -17,7 +17,7 @@ def validate_github_url(config): parsed_url = urllib.parse.urlparse(config["url"]) if ( not parsed_url.netloc == "github.com" - or not parsed_url.path.startswith("/smart-on-fhir") + or not parsed_url.path.startswith("/smart-on-fhir/") or any(c not in valid_chars for c in config["url"]) ): raise ValueError(f"{config['url']} is not an official Cumulus study.")