+max-line-length = 120
+prepare: clean
+ mkdir -p ./packages
+ pip3 install --target ./packages -r requirements.txt
+ cd packages && zip -r ../opensearch-gha-jobs-deployment.zip .
+ zip -g opensearch-gha-jobs-deployment.zip lambda_function.py
+deploy: prepare
+ aws lambda update-function-code --function-name opensearch-gha-jobs --zip-file fileb://opensearch-gha-jobs-deployment.zip
+ rm -rf opensearch-gha-jobs-deployment.zip packages
+This lambda is used to indexed the content of `torch-workflow-job`
+DynamoDB table onto the OpenSearch cluster `gha-job`. This is done by
+listening to the stream of `INSERT`, `MODIFY`, and `REMOVE` events
+coming to the DynamoDB table, extracting the documents, and indexing
+them on the OpenSeach cluster.
+Because the JSON structure of a DynamoDB event includes some simple
+datatype annotation ([link](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html)).
+The lambda performs some transformation to convert it back to a regular
+JSON data structure.
+### Deployment
+A new version of the lambda can be deployed using `make deploy` and it
+is done so automatically as part of the CI.
+import json
+import re
+from collections import defaultdict
+from enum import Enum
+from typing import Any, Dict, Optional, Union
+from warnings import warn
+import boto3
+from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection
+ "search-gha-jobs-po2dvxh7kcayevbmm6ih2vr4ka.us-east-1.es.amazonaws.com"
+OPENSEARCH_REGION = "us-east-1"
+ "arn:aws:dynamodb:.*?:.*?:table/(?P
+class EventType(Enum):
+def lambda_handler(event: Any, context: Any) -> None:
+ credentials = boto3.Session().get_credentials()
+ aws_auth = AWSV4SignerAuth(credentials, OPENSEARCH_REGION, "es")
+ opensearch_client = OpenSearch(
+ hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
+ http_auth=aws_auth,
+ use_ssl=True,
+ verify_certs=True,
+ connection_class=RequestsHttpConnection,
+ )
+ counts = defaultdict(int)
+ # The input of this lambda is a stream of DynamoDB event that we want to
+ # indexed on OpenSearch
+ for record in event["Records"]:
+ event_name = record.get("eventName", "")
+ try:
+ if (
+ event_name == EventType.INSERT.value
+ or event_name == EventType.MODIFY.value
+ ):
+ upsert_document(opensearch_client, record)
+ elif event_name == EventType.REMOVE.value:
+ remove_document(opensearch_client, record)
+ else:
+ warn(f"Unrecognized event type {event_name} in {json.dumps(record)}")
+ counts[event_name] += 1
+ except Exception as error:
+ warn(f"Failed to process {json.dumps(record)}: {error}")
+ print(f"Finish processing {json.dumps(counts)}")
+def extract_dynamodb_table(record: Any) -> Optional[str]:
+ """
+ Extract the DynamoDB table name from the source ARN. This will be used later as
+ the index name
+ """
+ s = record.get("eventSourceARN", "")
+ if not m:
+ warn(f"Invalid value {s}, expecting a DynamoDB table")
+ return
+ return m.group("table").lower()
+def extract_dynamodb_key(record: Any) -> Optional[str]:
+ keys = unmarshal({"M": record.get("dynamodb", {}).get("Keys", {})})
+ if not keys:
+ return
+ return "|".join(keys.values())
+def to_number(s: str) -> Union[int, float]:
+ try:
+ return int(s)
+ except ValueError:
+ return float(s)
+def unmarshal(doc: Dict[Any, Any]) -> Any:
+ """
+ Convert the DynamoDB stream record into a regular JSON document. This is done recursively.
+ At the top level, it will be a dictionary of type M (Map). Here is the list of DynamoDB
+ attributes to handle:
+ https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html
+ """
+ for k, v in list(doc.items()):
+ if k == "NULL":
+ return
+ if k == "S" or k == "BOOL":
+ return v
+ if k == "N":
+ return to_number(v)
+ if k == "M":
+ return {sk: unmarshal(sv) for sk, sv in v.items()}
+ if k == "BS" or k == "L":
+ return [unmarshal(item) for item in v]
+ if k == "SS":
+ return v.copy()
+ if k == "NS":
+ return [to_number(item) for item in v]
+ return {}
+def upsert_document(client: OpenSearch, record: Any) -> None:
+ """
+ Insert a new doc or modify an existing document. The latter happens when the workflow job is
+ updated (new step, finishing). A record from torchci-workflow-job looks as follows
+ {
+ "eventID": "...",
+ "eventName": "MODIFY",
+ "eventVersion": "1.1",
+ "eventSource": "aws:dynamodb",
+ "awsRegion": "us-east-1",
+ "dynamodb": {
+ "ApproximateCreationDateTime": 1691722869,
+ "Keys": {
+ "dynamoKey": {
+ "S": "pytorch/pytorch/15806159447"
+ }
+ },
+ "NewImage": {
+ "workflow_name": {
+ "S": "pull"
+ },
+ "steps": {
+ "L": [
+ {
+ "M": {
+ "conclusion": {
+ "NULL": True
+ },
+ "number": {
+ "N": "1"
+ },
+ "completed_at": {
+ "NULL": True
+ },
+ "name": {
+ "S": "Set up job"
+ },
+ "started_at": {
+ "S": "..."
+ },
+ "status": {
+ "S": "in_progress"
+ }
+ }
+ }
+ ]
+ },
+ ... all other fields ...
+ },
+ "OldImage": {
+ "workflow_name": {
+ "S": "pull"
+ },
+ "steps": {
+ "L": []
+ },
+ ... all other fields ...
+ },
+ "SequenceNumber": "...",
+ "SizeBytes": 1763,
+ "StreamViewType": "NEW_AND_OLD_IMAGES"
+ },
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:...:table/torchci-workflow-job/stream/..."
+ }
+ """
+ index = extract_dynamodb_table(record)
+ if not index:
+ return
+ # Create index using the table name if it's not there yet
+ if not client.indices.exists(index):
+ # https://www.elastic.co/guide/en/elasticsearch/reference/current/coerce.html
+ client.indices.create(index, body={"settings": {"index.mapping.coerce": True}})
+ body = unmarshal({"M": record.get("dynamodb", {}).get("NewImage", {})})
+ if not body:
+ return
+ id = extract_dynamodb_key(record)
+ if not id:
+ return
+ print(f"UPSERTING {id} INTO {index}")
+ client.index(index=index, body=body, id=id, refresh=True)
+def remove_document(client: OpenSearch, record: Any) -> None:
+ """
+ Remove a document. This is here for completeness as we don't remove records from DynamoDB
+ """
+ index = extract_dynamodb_table(record)
+ if not index:
+ return
+ id = extract_dynamodb_key(record)
+ if not id:
+ return
+ print(f"DELETING {id} FROM {index}")
+ client.delete(index=index, id=id, refresh=True)
+from unittest import TestCase
+from unittest.mock import Mock
+from lambda_function import (
+ extract_dynamodb_key,
+ extract_dynamodb_table,
+ remove_document,
+ to_number,
+ unmarshal,
+ upsert_document,
+def test_extract_dynamodb_table():
+ cases = [
+ {
+ "arn": "",
+ "expected": None,
+ "description": "Invalid input - empty input",
+ },
+ {
+ "arn": "FOOBAR",
+ "expected": None,
+ "description": "Invalid input - not in ARN format",
+ },
+ {
+ "arn": "arn:aws:dynamodb:us-east-1:12345:table/torchci-workflow-job/stream/2022-01-14T01:31:51.775",
+ "expected": "torchci-workflow-job",
+ "description": "An event coming from DynamoDB",
+ },
+ ]
+ for case in cases:
+ arn = case["arn"]
+ expected = case["expected"]
+ description = case["description"]
+ TestCase().assertEqual(
+ expected, extract_dynamodb_table({"eventSourceARN": arn}), description
+ )
+def test_extract_dynamodb_key():
+ cases = [
+ {
+ "input": {},
+ "expected": None,
+ "description": "Invalid input - empty input",
+ },
+ {
+ "input": {"FOO": "BAR"},
+ "expected": None,
+ "description": "Invalid input - not a valid record",
+ },
+ {
+ "input": {
+ "dynamodb": {},
+ },
+ "expected": None,
+ "description": "Invalid input - no key",
+ },
+ {
+ "input": {
+ "dynamodb": {"Keys": {}},
+ },
+ "expected": None,
+ "description": "Invalid input - empty key",
+ },
+ {
+ "input": {
+ "dynamodb": {"Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}},
+ },
+ "expected": "pytorch/pytorch/123",
+ "description": "Valid record with a dynamo key",
+ },
+ {
+ "input": {
+ "dynamodb": {
+ "Keys": {
+ "dynamoKey": {"S": "pytorch/pytorch/123"},
+ "dummyKey": {"S": "dummy"},
+ }
+ },
+ },
+ "expected": "pytorch/pytorch/123|dummy",
+ "description": "Valid record with multiple keys",
+ },
+ ]
+ for case in cases:
+ input = case["input"]
+ expected = case["expected"]
+ description = case["description"]
+ TestCase().assertEqual(expected, extract_dynamodb_key(input), description)
+def test_to_number():
+ v = to_number("3")
+ TestCase().assertEqual(3, v, "Converting DynamoDB number to int")
+ TestCase().assertTrue(isinstance(v, int), "Converting DynamoDB number to int")
+ v = to_number("3.0")
+ TestCase().assertEqual(3.0, v, "Converting DynamoDB number to float")
+ TestCase().assertTrue(isinstance(v, float), "Converting DynamoDB number to float")
+def test_unmarshal():
+ cases = [
+ {
+ "input": {
+ "runner_id": {"N": "5075952"},
+ "dynamoKey": {"S": "pytorch/pytorch/15806102004"},
+ "head_branch": {"S": "export-D48055141"},
+ "test": {"BOOL": True},
+ "runner_group_name": {"BS": [{"S": "Default"}]},
+ "runner_name": {"S": "i-0b85c433d29e0c108"},
+ "created_at": {"S": "2023-08-11T02:55:33Z"},
+ "steps": {"L": []},
+ "check_run_url": {
+ "S": "https://api.github.com/repos/pytorch/pytorch/check-runs/15806102004"
+ },
+ "head_sha": {"S": "7b34438ac2f380f68436ae2f0287054065c9837e"},
+ "url": {
+ "S": "https://api.github.com/repos/pytorch/pytorch/actions/jobs/15806102004"
+ },
+ "labels": {"L": [{"S": "linux.4xlarge.nvidia.gpu"}]},
+ "conclusion": {"NULL": True},
+ "completed_at": {"NULL": True},
+ "run_url": {
+ "S": "https://api.github.com/repos/pytorch/pytorch/actions/runs/5828283457"
+ },
+ "html_url": {
+ "S": "https://github.com/pytorch/pytorch/actions/runs/5828283457/job/15806102004"
+ },
+ "name": {
+ "S": "linux-bionic-cuda12.1-py3.10-gcc9 / test (default, 5, 5, linux.4xlarge.nvidia.gpu)"
+ },
+ "run_attempt": {"N": "1"},
+ "started_at": {"S": "2023-08-11T02:55:33Z"},
+ "id": {"N": "15806102004"},
+ "runner_group_id": {"N": "1"},
+ "node_id": {"NS": ["1", "2", "3"]},
+ "status": {"S": "queued"},
+ },
+ "expected": {
+ "runner_id": 5075952,
+ "dynamoKey": "pytorch/pytorch/15806102004",
+ "head_branch": "export-D48055141",
+ "test": True,
+ "runner_group_name": ["Default"],
+ "runner_name": "i-0b85c433d29e0c108",
+ "created_at": "2023-08-11T02:55:33Z",
+ "steps": [],
+ "check_run_url": "https://api.github.com/repos/pytorch/pytorch/check-runs/15806102004",
+ "head_sha": "7b34438ac2f380f68436ae2f0287054065c9837e",
+ "url": "https://api.github.com/repos/pytorch/pytorch/actions/jobs/15806102004",
+ "labels": ["linux.4xlarge.nvidia.gpu"],
+ "conclusion": None,
+ "completed_at": None,
+ "run_url": "https://api.github.com/repos/pytorch/pytorch/actions/runs/5828283457",
+ "html_url": "https://github.com/pytorch/pytorch/actions/runs/5828283457/job/15806102004",
+ "name": "linux-bionic-cuda12.1-py3.10-gcc9 / test (default, 5, 5, linux.4xlarge.nvidia.gpu)",
+ "run_attempt": 1,
+ "started_at": "2023-08-11T02:55:33Z",
+ "id": 15806102004,
+ "runner_group_id": 1,
+ "node_id": [1, 2, 3],
+ "status": "queued",
+ },
+ }
+ ]
+ for case in cases:
+ input = case["input"]
+ expected = case["expected"]
+ assert expected == unmarshal({"M": input})
+def test_remove_document():
+ cases = [
+ {
+ "input": {},
+ "removed": False,
+ "description": "Invalid input - empty record",
+ },
+ {
+ "input": {
+ "eventName": "REMOVE",
+ },
+ "removed": False,
+ "description": "Invalid input - no table name",
+ },
+ {
+ "input": {
+ "eventName": "REMOVE",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456",
+ },
+ "removed": False,
+ "description": "Invalid input - no ID",
+ },
+ {
+ "input": {
+ "eventName": "REMOVE",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456",
+ "dynamodb": {"Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}},
+ },
+ "removed": True,
+ "description": "Remove one record",
+ },
+ ]
+ for case in cases:
+ mock_client = Mock()
+ mock_client.delete.return_value = "OK"
+ input = case["input"]
+ remove_document(mock_client, input)
+ if case["removed"]:
+ mock_client.delete.assert_called_once()
+ else:
+ mock_client.delete.assert_not_called()
+def test_upsert_document():
+ cases = [
+ {
+ "input": {},
+ "upserted": False,
+ "description": "Invalid input - empty record",
+ },
+ {
+ "input": {
+ "eventName": "INSERT",
+ },
+ "upserted": False,
+ "description": "Invalid input - no table name",
+ },
+ {
+ "input": {
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456",
+ },
+ "upserted": False,
+ "description": "Invalid input - no ID",
+ },
+ {
+ "input": {
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456",
+ "dynamodb": {"Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}}},
+ },
+ "upserted": False,
+ "description": "Invalid input - No document body",
+ },
+ {
+ "input": {
+ "eventName": "INSERT",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456",
+ "dynamodb": {
+ "Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}},
+ "NewImage": {
+ "workflow_name": {"S": "pull"},
+ },
+ },
+ },
+ "upserted": True,
+ "description": "Insert one document",
+ },
+ {
+ "input": {
+ "eventName": "MODIFY",
+ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123:table/torchci-workflow-job/stream/456",
+ "dynamodb": {
+ "Keys": {"dynamoKey": {"S": "pytorch/pytorch/123"}},
+ "NewImage": {
+ "workflow_name": {"S": "pull"},
+ "steps": {
+ "L": [
+ {
+ "M": {
+ "conclusion": {"NULL": True},
+ "number": {"N": "1"},
+ "completed_at": {"NULL": True},
+ "name": {"S": "Set up job"},
+ "started_at": {"S": "..."},
+ "status": {"S": "in_progress"},
+ }
+ }
+ ]
+ },
+ },
+ "OldImage": {
+ "workflow_name": {"S": "pull"},
+ "steps": {"L": []},
+ },
+ },
+ },
+ "upserted": True,
+ "description": "Modify one document",
+ },
+ ]
+ for case in cases:
+ mock_client = Mock()
+ mock_client.indices.exists.return_value = True
+ mock_client.index.return_value = "OK"
+ input = case["input"]
+ upsert_document(mock_client, input)
+ if case["upserted"]:
+ mock_client.index.assert_called_once()
+ else:
+ mock_client.index.assert_not_called()