From 02689f01dc0f2f258636c8b24474cc38c9be7b07 Mon Sep 17 00:00:00 2001 From: Huy Do Date: Mon, 14 Aug 2023 11:11:55 -0700 Subject: [PATCH] Add a lambda to index torchci-workflow-job table (#4469) This goes with https://github.com/fairinternal/pytorch-gha-infra/pull/232 ### Testing `make deploy` --- .flake8 | 2 + aws/lambda/opensearch-gha-jobs/Makefile | 11 + aws/lambda/opensearch-gha-jobs/README.md | 15 + .../opensearch-gha-jobs/lambda_function.py | 221 ++++++++++++ .../opensearch-gha-jobs/requirements.txt | 3 + .../test_lambda_function.py | 314 ++++++++++++++++++ 6 files changed, 566 insertions(+) create mode 100644 .flake8 create mode 100644 aws/lambda/opensearch-gha-jobs/Makefile create mode 100644 aws/lambda/opensearch-gha-jobs/README.md create mode 100644 aws/lambda/opensearch-gha-jobs/lambda_function.py create mode 100644 aws/lambda/opensearch-gha-jobs/requirements.txt create mode 100644 aws/lambda/opensearch-gha-jobs/test_lambda_function.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000000..6deafc2617 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 120 diff --git a/aws/lambda/opensearch-gha-jobs/Makefile b/aws/lambda/opensearch-gha-jobs/Makefile new file mode 100644 index 0000000000..74b09d45b8 --- /dev/null +++ b/aws/lambda/opensearch-gha-jobs/Makefile @@ -0,0 +1,11 @@ +prepare: clean + mkdir -p ./packages + pip3 install --target ./packages -r requirements.txt + cd packages && zip -r ../opensearch-gha-jobs-deployment.zip . + zip -g opensearch-gha-jobs-deployment.zip lambda_function.py + +deploy: prepare + aws lambda update-function-code --function-name opensearch-gha-jobs --zip-file fileb://opensearch-gha-jobs-deployment.zip + +clean: + rm -rf opensearch-gha-jobs-deployment.zip packages diff --git a/aws/lambda/opensearch-gha-jobs/README.md b/aws/lambda/opensearch-gha-jobs/README.md new file mode 100644 index 0000000000..c03e1938a5 --- /dev/null +++ b/aws/lambda/opensearch-gha-jobs/README.md @@ -0,0 +1,15 @@ +This lambda is used to indexed the content of `torch-workflow-job` +DynamoDB table onto the OpenSearch cluster `gha-job`. This is done by +listening to the stream of `INSERT`, `MODIFY`, and `REMOVE` events +coming to the DynamoDB table, extracting the documents, and indexing +them on the OpenSeach cluster. + +Because the JSON structure of a DynamoDB event includes some simple +datatype annotation ([link](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html)). +The lambda performs some transformation to convert it back to a regular +JSON data structure. + +### Deployment + +A new version of the lambda can be deployed using `make deploy` and it +is done so automatically as part of the CI. diff --git a/aws/lambda/opensearch-gha-jobs/lambda_function.py b/aws/lambda/opensearch-gha-jobs/lambda_function.py new file mode 100644 index 0000000000..88e807662e --- /dev/null +++ b/aws/lambda/opensearch-gha-jobs/lambda_function.py @@ -0,0 +1,221 @@ +import json +import re +from collections import defaultdict +from enum import Enum +from typing import Any, Dict, Optional, Union +from warnings import warn + +import boto3 +from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection + + +OPENSEARCH_ENDPOINT = ( + "search-gha-jobs-po2dvxh7kcayevbmm6ih2vr4ka.us-east-1.es.amazonaws.com" +) +OPENSEARCH_REGION = "us-east-1" +DYNAMODB_TABLE_REGEX = re.compile( + "arn:aws:dynamodb:.*?:.*?:table/(?P[0-9a-zA-Z_-]+)/.+" +) + + +class EventType(Enum): + INSERT = "INSERT" + REMOVE = "REMOVE" + MODIFY = "MODIFY" + + +def lambda_handler(event: Any, context: Any) -> None: + credentials = boto3.Session().get_credentials() + aws_auth = AWSV4SignerAuth(credentials, OPENSEARCH_REGION, "es") + opensearch_client = OpenSearch( + hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}], + http_auth=aws_auth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + ) + + counts = defaultdict(int) + # The input of this lambda is a stream of DynamoDB event that we want to + # indexed on OpenSearch + for record in event["Records"]: + event_name = record.get("eventName", "") + try: + if ( + event_name == EventType.INSERT.value + or event_name == EventType.MODIFY.value + ): + upsert_document(opensearch_client, record) + elif event_name == EventType.REMOVE.value: + remove_document(opensearch_client, record) + else: + warn(f"Unrecognized event type {event_name} in {json.dumps(record)}") + + counts[event_name] += 1 + except Exception as error: + warn(f"Failed to process {json.dumps(record)}: {error}") + + print(f"Finish processing {json.dumps(counts)}") + + +def extract_dynamodb_table(record: Any) -> Optional[str]: + """ + Extract the DynamoDB table name from the source ARN. This will be used later as + the index name + """ + s = record.get("eventSourceARN", "") + m = DYNAMODB_TABLE_REGEX.match(s) + if not m: + warn(f"Invalid value {s}, expecting a DynamoDB table") + return + + return m.group("table").lower() + + +def extract_dynamodb_key(record: Any) -> Optional[str]: + keys = unmarshal({"M": record.get("dynamodb", {}).get("Keys", {})}) + if not keys: + return + return "|".join(keys.values()) + + +def to_number(s: str) -> Union[int, float]: + try: + return int(s) + except ValueError: + return float(s) + + +def unmarshal(doc: Dict[Any, Any]) -> Any: + """ + Convert the DynamoDB stream record into a regular JSON document. This is done recursively. + At the top level, it will be a dictionary of type M (Map). Here is the list of DynamoDB + attributes to handle: + + https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html + """ + for k, v in list(doc.items()): + if k == "NULL": + return + + if k == "S" or k == "BOOL": + return v + + if k == "N": + return to_number(v) + + if k == "M": + return {sk: unmarshal(sv) for sk, sv in v.items()} + + if k == "BS" or k == "L": + return [unmarshal(item) for item in v] + + if k == "SS": + return v.copy() + + if k == "NS": + return [to_number(item) for item in v] + + return {} + + +def upsert_document(client: OpenSearch, record: Any) -> None: + """ + Insert a new doc or modify an existing document. The latter happens when the workflow job is + updated (new step, finishing). A record from torchci-workflow-job looks as follows + { + "eventID": "...", + "eventName": "MODIFY", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "ApproximateCreationDateTime": 1691722869, + "Keys": { + "dynamoKey": { + "S": "pytorch/pytorch/15806159447" + } + }, + "NewImage": { + "workflow_name": { + "S": "pull" + }, + "steps": { + "L": [ + { + "M": { + "conclusion": { + "NULL": True + }, + "number": { + "N": "1" + }, + "completed_at": { + "NULL": True + }, + "name": { + "S": "Set up job" + }, + "started_at": { + "S": "..." + }, + "status": { + "S": "in_progress" + } + } + } + ] + }, + ... all other fields ... + }, + "OldImage": { + "workflow_name": { + "S": "pull" + }, + "steps": { + "L": [] + }, + ... all other fields ... + }, + "SequenceNumber": "...", + "SizeBytes": 1763, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "arn:aws:dynamodb:us-east-1:...:table/torchci-workflow-job/stream/..." + } + """ + index = extract_dynamodb_table(record) + if not index: + return + + # Create index using the table name if it's not there yet + if not client.indices.exists(index): + # https://www.elastic.co/guide/en/elasticsearch/reference/current/coerce.html + client.indices.create(index, body={"settings": {"index.mapping.coerce": True}}) + + body = unmarshal({"M": record.get("dynamodb", {}).get("NewImage", {})}) + if not body: + return + + id = extract_dynamodb_key(record) + if not id: + return + + print(f"UPSERTING {id} INTO {index}") + client.index(index=index, body=body, id=id, refresh=True) + + +def remove_document(client: OpenSearch, record: Any) -> None: + """ + Remove a document. This is here for completeness as we don't remove records from DynamoDB + """ + index = extract_dynamodb_table(record) + if not index: + return + + id = extract_dynamodb_key(record) + if not id: + return + + print(f"DELETING {id} FROM {index}") + client.delete(index=index, id=id, refresh=True) diff --git a/aws/lambda/opensearch-gha-jobs/requirements.txt b/aws/lambda/opensearch-gha-jobs/requirements.txt new file mode 100644 index 0000000000..ecbfdd716d --- /dev/null +++ b/aws/lambda/opensearch-gha-jobs/requirements.txt @@ -0,0 +1,3 @@ +boto3==1.28.24 +opensearch-py==2.3.0 +pytest==7.4.0 diff --git a/aws/lambda/opensearch-gha-jobs/test_lambda_function.py b/aws/lambda/opensearch-gha-jobs/test_lambda_function.py new file mode 100644 index 0000000000..52ebb9c727 --- /dev/null +++ b/aws/lambda/opensearch-gha-jobs/test_lambda_function.py @@ -0,0 +1,314 @@ +from unittest import TestCase +from unittest.mock import Mock + +from lambda_function import ( + extract_dynamodb_key, + extract_dynamodb_table, + remove_document, + to_number, + unmarshal, + upsert_document, +) + + +def test_extract_dynamodb_table(): + cases = [ + { + "arn": "", + "expected": None, + "description": "Invalid input - empty input", + }, + { + "arn": "FOOBAR", + "expected": None, + "description": "Invalid input - not in ARN format", + }, + { + "arn": "arn:aws:dynamodb:us-east-1:12345:table/torchci-workflow-job/stream/2022-01-14T01:31:51.775", + "expected": "torchci-workflow-job", + "description": "An event coming from DynamoDB", + }, + ] + + for case in cases: + arn = case["arn"] + expected = case["expected"] + description = case["description"] + TestCase().assertEqual( + expected, extract_dynamodb_table({"eventSourceARN": arn}), description + ) + + +def test_extract_dynamodb_key(): + cases = [ + { + "input": {}, + "expected": None, + "description": "Invalid input - empty input", + }, + { + "input": {"FOO": "BAR"}, + "expected": None, + "description": "Invalid input - not a valid record", + }, + { + "input": { + "dynamodb": {}, + }, + "expected": None, + "description": "Invalid input - no key", + }, + { + "input": { + "dynamodb": {"Keys": {}}, + }, + "expected": None, + "description": "Invalid input - empty key", + }, + { + "input": { + "dynamodb": {"Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}}, + }, + "expected": "pytorch/pytorch/123", + "description": "Valid record with a dynamo key", + }, + { + "input": { + "dynamodb": { + "Keys": { + "dynamoKey": {"S": "pytorch/pytorch/123"}, + "dummyKey": {"S": "dummy"}, + } + }, + }, + "expected": "pytorch/pytorch/123|dummy", + "description": "Valid record with multiple keys", + }, + ] + + for case in cases: + input = case["input"] + expected = case["expected"] + description = case["description"] + TestCase().assertEqual(expected, extract_dynamodb_key(input), description) + + +def test_to_number(): + v = to_number("3") + TestCase().assertEqual(3, v, "Converting DynamoDB number to int") + TestCase().assertTrue(isinstance(v, int), "Converting DynamoDB number to int") + + v = to_number("3.0") + TestCase().assertEqual(3.0, v, "Converting DynamoDB number to float") + TestCase().assertTrue(isinstance(v, float), "Converting DynamoDB number to float") + + +def test_unmarshal(): + cases = [ + { + "input": { + "runner_id": {"N": "5075952"}, + "dynamoKey": {"S": "pytorch/pytorch/15806102004"}, + "head_branch": {"S": "export-D48055141"}, + "test": {"BOOL": True}, + "runner_group_name": {"BS": [{"S": "Default"}]}, + "runner_name": {"S": "i-0b85c433d29e0c108"}, + "created_at": {"S": "2023-08-11T02:55:33Z"}, + "steps": {"L": []}, + "check_run_url": { + "S": "https://api.github.com/repos/pytorch/pytorch/check-runs/15806102004" + }, + "head_sha": {"S": "7b34438ac2f380f68436ae2f0287054065c9837e"}, + "url": { + "S": "https://api.github.com/repos/pytorch/pytorch/actions/jobs/15806102004" + }, + "labels": {"L": [{"S": "linux.4xlarge.nvidia.gpu"}]}, + "conclusion": {"NULL": True}, + "completed_at": {"NULL": True}, + "run_url": { + "S": "https://api.github.com/repos/pytorch/pytorch/actions/runs/5828283457" + }, + "html_url": { + "S": "https://github.com/pytorch/pytorch/actions/runs/5828283457/job/15806102004" + }, + "name": { + "S": "linux-bionic-cuda12.1-py3.10-gcc9 / test (default, 5, 5, linux.4xlarge.nvidia.gpu)" + }, + "run_attempt": {"N": "1"}, + "started_at": {"S": "2023-08-11T02:55:33Z"}, + "id": {"N": "15806102004"}, + "runner_group_id": {"N": "1"}, + "node_id": {"NS": ["1", "2", "3"]}, + "status": {"S": "queued"}, + }, + "expected": { + "runner_id": 5075952, + "dynamoKey": "pytorch/pytorch/15806102004", + "head_branch": "export-D48055141", + "test": True, + "runner_group_name": ["Default"], + "runner_name": "i-0b85c433d29e0c108", + "created_at": "2023-08-11T02:55:33Z", + "steps": [], + "check_run_url": "https://api.github.com/repos/pytorch/pytorch/check-runs/15806102004", + "head_sha": "7b34438ac2f380f68436ae2f0287054065c9837e", + "url": "https://api.github.com/repos/pytorch/pytorch/actions/jobs/15806102004", + "labels": ["linux.4xlarge.nvidia.gpu"], + "conclusion": None, + "completed_at": None, + "run_url": "https://api.github.com/repos/pytorch/pytorch/actions/runs/5828283457", + "html_url": "https://github.com/pytorch/pytorch/actions/runs/5828283457/job/15806102004", + "name": "linux-bionic-cuda12.1-py3.10-gcc9 / test (default, 5, 5, linux.4xlarge.nvidia.gpu)", + "run_attempt": 1, + "started_at": "2023-08-11T02:55:33Z", + "id": 15806102004, + "runner_group_id": 1, + "node_id": [1, 2, 3], + "status": "queued", + }, + } + ] + + for case in cases: + input = case["input"] + expected = case["expected"] + assert expected == unmarshal({"M": input}) + + +def test_remove_document(): + cases = [ + { + "input": {}, + "removed": False, + "description": "Invalid input - empty record", + }, + { + "input": { + "eventName": "REMOVE", + }, + "removed": False, + "description": "Invalid input - no table name", + }, + { + "input": { + "eventName": "REMOVE", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456", + }, + "removed": False, + "description": "Invalid input - no ID", + }, + { + "input": { + "eventName": "REMOVE", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456", + "dynamodb": {"Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}}, + }, + "removed": True, + "description": "Remove one record", + }, + ] + + for case in cases: + mock_client = Mock() + mock_client.delete.return_value = "OK" + + input = case["input"] + remove_document(mock_client, input) + + if case["removed"]: + mock_client.delete.assert_called_once() + else: + mock_client.delete.assert_not_called() + + +def test_upsert_document(): + cases = [ + { + "input": {}, + "upserted": False, + "description": "Invalid input - empty record", + }, + { + "input": { + "eventName": "INSERT", + }, + "upserted": False, + "description": "Invalid input - no table name", + }, + { + "input": { + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456", + }, + "upserted": False, + "description": "Invalid input - no ID", + }, + { + "input": { + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456", + "dynamodb": {"Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}}, + }, + "upserted": False, + "description": "Invalid input - No document body", + }, + { + "input": { + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456", + "dynamodb": { + "Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}, + "NewImage": { + "workflow_name": {"S": "pull"}, + }, + }, + }, + "upserted": True, + "description": "Insert one document", + }, + { + "input": { + "eventName": "MODIFY", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456", + "dynamodb": { + "Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}, + "NewImage": { + "workflow_name": {"S": "pull"}, + "steps": { + "L": [ + { + "M": { + "conclusion": {"NULL": True}, + "number": {"N": "1"}, + "completed_at": {"NULL": True}, + "name": {"S": "Set up job"}, + "started_at": {"S": "..."}, + "status": {"S": "in_progress"}, + } + } + ] + }, + }, + "OldImage": { + "workflow_name": {"S": "pull"}, + "steps": {"L": []}, + }, + }, + }, + "upserted": True, + "description": "Modify one document", + }, + ] + + for case in cases: + mock_client = Mock() + mock_client.indices.exists.return_value = True + mock_client.index.return_value = "OK" + + input = case["input"] + upsert_document(mock_client, input) + + if case["upserted"]: + mock_client.index.assert_called_once() + else: + mock_client.index.assert_not_called()