diff --git a/Dockerfile.motoserver b/Dockerfile.motoserver new file mode 100644 index 00000000..63bb2925 --- /dev/null +++ b/Dockerfile.motoserver @@ -0,0 +1,8 @@ +# This Dockerfile is used only because we rely on curl, and can be removed if we upgrade +# to a version of moto > 4.1.12 + +# The version of moto is pinned to 4.1.0 because s3parcp is unable to parse date strings in later version +# see: https://github.com/aws/aws-sdk-go-v2/issues/1981 +FROM motoserver/moto:4.1.0 + +RUN apt-get update && apt-get install -y curl diff --git a/bin/init_moto.sh b/bin/init_moto.sh index 4cecc470..bc58c77e 100755 --- a/bin/init_moto.sh +++ b/bin/init_moto.sh @@ -6,7 +6,7 @@ moto_server --host 0.0.0.0 --port $MOTO_PORT & # Initialize data once server is ready -sleep 1 && curl -X POST "http://motoserver.czidnet:${MOTO_PORT}/moto-api/recorder/replay-recording" +sleep 3 && curl -X POST "http://motoserver.czidnet:${MOTO_PORT}/moto-api/recorder/replay-recording" # Go back to moto server wait diff --git a/docker-compose.yml b/docker-compose.yml index 6cd7ea76..750cac19 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,9 @@ services: # To use it from the CLI: aws --endpoint-url=http://localhost:4000 s3 ls # To reset all services without restarting the container: curl -X POST http://localhost:4000/moto-api/reset motoserver: - image: motoserver/moto:4.2.3 + build: + context: . + dockerfile: Dockerfile.motoserver ports: - "4000:4000" environment: @@ -13,11 +15,27 @@ services: - MOTO_ENABLE_RECORDING=True - MOTO_S3_CUSTOM_ENDPOINTS=http://motoserver.czidnet:4000 - S3_IGNORE_SUBDOMAIN_BUCKETNAME=True + - MOTO_DOCKER_NETWORK_NAME=czidnet + - MOTO_DOCKER_NETWORK_MODE=overlay volumes: - .moto_recording:/moto/moto_recording - ./bin:/moto/bin - entrypoint: [] + - "/var/run/docker.sock:/var/run/docker.sock" + entrypoint: ["/bin/bash"] command: ["/moto/bin/init_moto.sh"] + stepfunctions_local: + container_name: stepfunctions_local_workflows + image: amazon/aws-stepfunctions-local + environment: + - BATCH_ENDPOINT=http://motoserver.czidnet:4000 + - LAMBDA_ENDPOINT=http://motoserver.czidnet:4000 + - AWS_ACCOUNT_ID=123456789012 + ports: + - "8083:8083" + networks: + default: + aliases: + - sfn.czidnet networks: default: diff --git a/workflows/Makefile b/workflows/Makefile index 4f137ef3..6582a4c0 100644 --- a/workflows/Makefile +++ b/workflows/Makefile @@ -57,11 +57,11 @@ local-pgconsole: ## Connect to the local postgres database. .PHONY: local-swipe-plugin-tests local-swipe-plugin-start: local-envfile - $(docker_compose) --profile moto --env-file .env.localdev up --force-recreate -d + $(docker_compose) --env-file .env.localdev up --force-recreate -d .PHONY: local-swipe-plugin-deploy-mock local-swipe-plugin-deploy-mock: local-swipe-plugin-start - - source environment.test; aws ssm put-parameter --name /mock-aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id --value ami-12345678 --type String --endpoint-url http://localhost:9000 --no-cli-pager + - source environment.test; aws ssm put-parameter --name /mock-aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id --value ami-12345678 --type String --endpoint-url http://localhost:4000 --no-cli-pager source environment.test && \ cd terraform_test && \ find . -name '*tfstate*' | xargs rm -f && \ diff --git a/workflows/docker-compose.yml b/workflows/docker-compose.yml index 09a1d7dc..8643764b 100644 --- a/workflows/docker-compose.yml +++ b/workflows/docker-compose.yml @@ -4,8 +4,8 @@ x-aws-variables: &aws-variables ? AWS_ACCESS_KEY_ID ? AWS_SECRET_ACCESS_KEY ? AWS_SESSION_TOKEN - ? AWS_REGION=us-west-2 - ? AWS_DEFAULT_REGION=us-west-2 + ? AWS_REGION + ? AWS_DEFAULT_REGION x-db-variables: &db-variables ? PLATFORMICS_DATABASE_HOST=postgres.czidnet @@ -54,44 +54,14 @@ services: tty: true # Helps with pdb environment: <<: [*aws-variables, *db-variables, *workflow-variables] - CERBOS_URL: "http://cerbos.cerbos-system.svc.cluster.local:3592" + CERBOS_URL: http://wf-cerbos:3592 JWK_PUBLIC_KEY_FILE: "/workflows/test_infra/fixtures/public_key.pem" JWK_PRIVATE_KEY_FILE: "/workflows/test_infra/fixtures/private_key.pem" DEFAULT_UPLOAD_BUCKET: "local-bucket" - DEFAULT_UPLOAD_PROTOCOL: "S3" BOTO_ENDPOINT_URL: "http://motoserver.czidnet:4000" ENTITY_SERVICE_URL: "http://entity-service:8008" - AWS_REGION: "us-west-2" ENTITY_SERVICE_AUTH_TOKEN: "eyJhbGciOiJFQ0RILUVTIiwiZW5jIjoiQTI1NkNCQy1IUzUxMiIsImVwayI6eyJjcnYiOiJQLTM4NCIsImt0eSI6IkVDIiwieCI6Ik5Nc3ZKbXVuYnBXY0VsdVlJTmRVeVVIcUkzbjZCR2VQd2V3ajRXS0pVdEt0QXhmUUtrVE81M2kzQ2dSZkZYVEQiLCJ5IjoiYmp6TkJuZjExWjRIV3dBVm95UVpSOGRWSERicW9wTjhvQkJZYnIxQlBiU1llZHdaWkVuYzJqS21rY0xxcloxTiJ9LCJraWQiOiItQmx2bF9wVk5LU2JRQ2N5dGV4UzNfMk5MaHBia2J6LVk5VFFjbkY5S1drIiwidHlwIjoiSldFIn0..Ymjmtj6nXp8r8AFe8AgI1g.e_39w7OXGJaOVKL_QkP38rvlcEeSgFQsxT0rTdCgI5E-b328zlVHagLSFZ_Iqvqiy6Z8KcU4pLJ3NTaW3Ys_YQsnUn6yUEvwqOY2KESB0mT0Bp3qpNYRBZJVA8PW43YAbOnO7h7ZTwQZJfwMzwhcaaYeZW8pN9rvcNtQX3rgBubSuR-LHKL6k4uAMPh9A8ZxXKZgpI6tpJPxE-uspvYi-foW8VyjZtwOUMvMp3lfZPyL1oQIv_rEUhOGNO_lfi339QcT6F7DwBjXK6C_7U65F-dFZScnReLnVczPfHhJ7z3NnVt46sFcddgZpLIpQyzA6puCcDoRm5ZZCVvm8h-LHVy-9dGWLVxBRhGRdBwBhbiVu2O_CNeVabXl8JhAs3oeV2zDgYfOj_-kkHWsbgHZ0y-tc-HtgoXzsUqaRP1IXQ3g3VDES7UjsaKsfxgURH5EIsrdWwFrWHGoLNfGwwPSwTBI5Mul7LT10-Pg_uBBCiHfQIDqerRQeADRFhV_07GYatBDt-RxwNL4bO59V8ewCzhpdCYRpL363HGldT1Pic-SpTk2NsY2t8MA6__FhJU9JSKYwJpeKMaGLUHA_40PEQ.gb5q-WZTU-ZKpV7WYFbMGMEF2CZIBrFlCUeaZ5ffPDU" - motoserver: - profiles: ["moto"] - container_name: motoserver_workflows - image: motoserver/moto:3.0.4 - environment: - - MOTO_DOCKER_NETWORK_NAME=czidnet - - MOTO_DOCKER_NETWORK_MODE=overlay - ports: - - "9000:5000" - volumes: - - "/var/run/docker.sock:/var/run/docker.sock" - networks: - default: - aliases: - - czidnet - stepfunctions_local: - profiles: ["moto"] - container_name: stepfunctions_local_workflows - image: amazon/aws-stepfunctions-local - environment: - - BATCH_ENDPOINT=http://czidnet:5000 - - LAMBDA_ENDPOINT=http://czidnet:5000 - - AWS_ACCOUNT_ID=123456789012 - ports: - - "8083:8083" - networks: - default: - aliases: - - sfn.czidnet + DEFAULT_UPLOAD_PROTOCOL: S3 # don't need this in workflows wf-cerbos: image: ghcr.io/cerbos/cerbos:0.29.0 volumes: diff --git a/workflows/plugins/workflow_runners/swipe/workflow_runner_swipe.py b/workflows/plugins/workflow_runners/swipe/workflow_runner_swipe.py index 2447400c..4b2f45b8 100644 --- a/workflows/plugins/workflow_runners/swipe/workflow_runner_swipe.py +++ b/workflows/plugins/workflow_runners/swipe/workflow_runner_swipe.py @@ -10,7 +10,7 @@ # TODO: maybe split out these decisions into another module, or a YAML file?? if os.environ.get("ENVIRONMENT", None) == "test": sfn = boto3.client("stepfunctions", endpoint_url="http://sfn.czidnet:8083") - sts = boto3.client("sts", endpoint_url="http://czidnet:5000") + sts = boto3.client("sts", endpoint_url="http://motoserver.czidnet:4000") REGION = "us-east-1" SFN_NAME = "swipe-test-default-wdl" else: diff --git a/workflows/scripts/create_db.py b/workflows/scripts/create_db.py index 000c5444..b84b8665 100644 --- a/workflows/scripts/create_db.py +++ b/workflows/scripts/create_db.py @@ -1,16 +1,18 @@ from platformics.api.core.settings import APISettings from sqlalchemy_utils import create_database, database_exists + def create_db(): settings = APISettings.parse_obj({}) db_uri = settings.SYNC_DB_URI if database_exists(db_uri): print("Database already exists!") - + else: print("Database does not exist, creating database") create_database(db_uri) print("Database created") + if __name__ == "__main__": - create_db() \ No newline at end of file + create_db() diff --git a/workflows/terraform_test/main.tf b/workflows/terraform_test/main.tf index 98a62a92..119173e8 100644 --- a/workflows/terraform_test/main.tf +++ b/workflows/terraform_test/main.tf @@ -1,20 +1,20 @@ module "swipetest" { - source = "git@github.com:chanzuckerberg/swipe.git" + source = "github.com/chanzuckerberg/swipe?ref=v1.4.8" call_cache = true ami_ssm_parameter = "/mock-aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id" miniwdl_dir = "/tmp/" app_name = "swipe-test" batch_ec2_instance_types = ["optimal"] - aws_endpoint_url = "http://czidnet:5000" + aws_endpoint_url = "http://motoserver.czidnet:4000" docker_network = "czidnet" use_spot = false # Moto doesn't know how to use SPOT extra_env_vars = { "AWS_ACCESS_KEY_ID" : "role-account-id", "AWS_SECRET_ACCESS_KEY" : "role-secret-key", "AWS_SESSION_TOKEN" : "session-token", - "AWS_ENDPOINT_URL" : "http://czidnet:5000", + "AWS_ENDPOINT_URL" : "http://motoserver.czidnet:4000", "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" : "container-credentials-relative-uri", - "S3PARCP_S3_URL" : "http://czidnet:5000", + "S3PARCP_S3_URL" : "http://motoserver.czidnet:4000", } sqs_queues = { @@ -35,4 +35,5 @@ module "swipetest" { workspace_s3_prefixes = ["swipe-test"] output_status_json_files = true + step_notifications = true } \ No newline at end of file diff --git a/workflows/terraform_test/providers.tf b/workflows/terraform_test/providers.tf index 98be82f9..c7817069 100644 --- a/workflows/terraform_test/providers.tf +++ b/workflows/terraform_test/providers.tf @@ -9,18 +9,18 @@ terraform { provider "aws" { endpoints { - batch = "http://localhost:9000" - cloudwatch = "http://localhost:9000" - cloudwatchevents = "http://localhost:9000" - ec2 = "http://localhost:9000" - iam = "http://localhost:9000" - lambda = "http://localhost:9000" - s3 = "http://localhost:9000" - secretsmanager = "http://localhost:9000" - sns = "http://localhost:9000" - sqs = "http://localhost:9000" - ssm = "http://localhost:9000" + batch = "http://localhost:4000" + cloudwatch = "http://localhost:4000" + cloudwatchevents = "http://localhost:4000" + ec2 = "http://localhost:4000" + iam = "http://localhost:4000" + lambda = "http://localhost:4000" + s3 = "http://localhost:4000" + secretsmanager = "http://localhost:4000" + sns = "http://localhost:4000" + sqs = "http://localhost:4000" + ssm = "http://localhost:4000" stepfunctions = "http://localhost:8083" - sts = "http://localhost:9000" + sts = "http://localhost:4000" } } \ No newline at end of file diff --git a/workflows/terraform_test/run_swipe_plugin_test.py b/workflows/terraform_test/run_swipe_plugin_test.py index 447083af..191192e2 100644 --- a/workflows/terraform_test/run_swipe_plugin_test.py +++ b/workflows/terraform_test/run_swipe_plugin_test.py @@ -2,13 +2,53 @@ import time import json import boto3 +from typing import Dict, List from workflow_runner_swipe import SwipeWorkflowRunner +class AWSMock: + def __init__( + self, + endpoint_url="http://motoserver.czidnet:4000", + sfn_endpoint_url="http://sfn.czidnet:8083", + aws_region="us-east-1", + ) -> None: + self.s3 = boto3.resource("s3", endpoint_url=endpoint_url, region_name=aws_region) + self.sqs = boto3.client("sqs", endpoint_url=endpoint_url, region_name=aws_region) + self.sfn = boto3.client("stepfunctions", endpoint_url=sfn_endpoint_url, region_name=aws_region) + + def get_sqs_url(self) -> str: + return self.sqs.list_queues()["QueueUrls"][0] + + def retrieve_message(self, url: str) -> Dict: + """Retrieve a single SQS message and delete it from queue""" + resp = self.sqs.receive_message( + QueueUrl=url, + MaxNumberOfMessages=1, + ) + # If no messages, just return + if not resp.get("Messages", None): + return {} + + message = resp["Messages"][0] + receipt_handle = message["ReceiptHandle"] + self.sqs.delete_message( + QueueUrl=url, + ReceiptHandle=receipt_handle, + ) + return json.loads(message["Body"]) + + def get_sfn_execution_status(self, sfn_arn) -> List: + return self.sfn.describe_execution(executionArn=sfn_arn)["status"] + + class TestSFNWDL(unittest.TestCase): def setUp(self) -> None: - self.s3 = boto3.resource("s3", endpoint_url="http://czidnet:5000") + self.s3 = boto3.resource("s3", endpoint_url="http://motoserver.czidnet:4000") self.test_bucket = self.s3.create_bucket(Bucket="swipe-test") + + self.batch = boto3.client("batch", endpoint_url="http://motoserver.czidnet:4000") + self.logs = boto3.client("logs", endpoint_url="http://motoserver.czidnet:4000") ## TODO Loop through multiple wdl files to read everything into the test bucket with open("terraform_test/test_wdl.wdl") as f: self.wdl_one = f.read() @@ -18,6 +58,46 @@ def setUp(self) -> None: self.input_obj = self.test_bucket.Object("input.txt") self.input_obj.put(Body="hello".encode()) + self.aws = AWSMock() + + def print_execution(self, events): + import sys + + seen_events = set() + for event in sorted( + events, + key=lambda x: x["id"], + ): + if event["id"] not in seen_events: + details = {} + for key in event.keys(): + if key.endswith("EventDetails") and event[key]: + details = event[key] + print( + event["timestamp"], + event["type"], + details.get("resourceType", ""), + details.get("resource", ""), + details.get("name", ""), + json.loads(details.get("parameters", "{}")).get("FunctionName", ""), + file=sys.stderr, + ) + if "taskSubmittedEventDetails" in event: + if event.get("taskSubmittedEventDetails", {}).get("resourceType") == "batch": + job_id = json.loads(event["taskSubmittedEventDetails"]["output"])["JobId"] + print(f"Batch job ID {job_id}", file=sys.stderr) + job_desc = self.batch.describe_jobs(jobs=[job_id])["jobs"][0] + try: + log_group_name = job_desc["container"]["logConfiguration"]["options"]["awslogs-group"] + except KeyError: + log_group_name = "/aws/batch/job" + response = self.logs.get_log_events( + logGroupName=log_group_name, + logStreamName=job_desc["container"]["logStreamName"], + ) + for log_event in response["events"]: + print(log_event["message"], file=sys.stderr) + seen_events.add(event["id"]) def test_simple_swipe_workflow(self): """A simple test to test whether the SWIPE plugin works""" @@ -33,17 +113,29 @@ def test_simple_swipe_workflow(self): }, ) workflow_json = json.loads(workflow_output) - sfn = boto3.client("stepfunctions", endpoint_url="http://sfn.czidnet:8083") - arn = workflow_json["response"]["executionArn"] + breakout = 0 - while sfn.describe_execution(executionArn=arn)["status"] not in ["SUCCEEDED", "FAILED"]: + while (arn := self.aws.get_sfn_execution_status(workflow_json["response"]["executionArn"])) not in [ + "SUCCEEDED", + "FAILED", + ]: time.sleep(1) breakout += 1 if breakout == 120: # make sure weird conditions don't hang the tests break - self.assertEqual(sfn.describe_execution(executionArn=arn)["status"], "SUCCEEDED") + step_notifications = [] + stage_notifications = [] + while message := self.aws.retrieve_message(self.aws.get_sqs_url()): + if message["source"] == "aws.batch": + step_notifications.append(message["detail"]) + elif message["source"] == "aws.states": + stage_notifications.append(message["detail"]) + + self.assertEqual(arn, "SUCCEEDED") + self.assertEqual(len(step_notifications), 3) + self.assertEqual(len(stage_notifications), 1) if __name__ == "__main__": diff --git a/workflows/test/test_workflows_db.py b/workflows/test/test_workflows_db.py index cff82f23..b6b95197 100644 --- a/workflows/test/test_workflows_db.py +++ b/workflows/test/test_workflows_db.py @@ -8,7 +8,9 @@ import os -test_db = factories.postgresql_noproc(host=os.getenv("PLATFORMICS_DATABASE_HOST"), password=os.getenv("PLATFORMICS_DATABASE_PASSWORD")) +test_db = factories.postgresql_noproc( + host=os.getenv("PLATFORMICS_DATABASE_HOST"), password=os.getenv("PLATFORMICS_DATABASE_PASSWORD") +) def get_db_uri(protocol, db_user, db_pass, db_host, db_port, db_name):