Skip to content

Commit

Permalink
Add a lambda to index torchci-workflow-job table (#4469)
Browse files Browse the repository at this point in the history
This goes with
pytorch-labs/pytorch-gha-infra#232

### Testing

`make deploy`
  • Loading branch information
huydhn authored Aug 14, 2023
1 parent ed7237a commit 02689f0
Show file tree
Hide file tree
Showing 6 changed files with 566 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
11 changes: 11 additions & 0 deletions aws/lambda/opensearch-gha-jobs/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
prepare: clean
mkdir -p ./packages
pip3 install --target ./packages -r requirements.txt
cd packages && zip -r ../opensearch-gha-jobs-deployment.zip .
zip -g opensearch-gha-jobs-deployment.zip lambda_function.py

deploy: prepare
aws lambda update-function-code --function-name opensearch-gha-jobs --zip-file fileb://opensearch-gha-jobs-deployment.zip

clean:
rm -rf opensearch-gha-jobs-deployment.zip packages
15 changes: 15 additions & 0 deletions aws/lambda/opensearch-gha-jobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
This lambda is used to indexed the content of `torch-workflow-job`
DynamoDB table onto the OpenSearch cluster `gha-job`. This is done by
listening to the stream of `INSERT`, `MODIFY`, and `REMOVE` events
coming to the DynamoDB table, extracting the documents, and indexing
them on the OpenSeach cluster.

Because the JSON structure of a DynamoDB event includes some simple
datatype annotation ([link](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html)).
The lambda performs some transformation to convert it back to a regular
JSON data structure.

### Deployment

A new version of the lambda can be deployed using `make deploy` and it
is done so automatically as part of the CI.
221 changes: 221 additions & 0 deletions aws/lambda/opensearch-gha-jobs/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import json
import re
from collections import defaultdict
from enum import Enum
from typing import Any, Dict, Optional, Union
from warnings import warn

import boto3
from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection


OPENSEARCH_ENDPOINT = (
"search-gha-jobs-po2dvxh7kcayevbmm6ih2vr4ka.us-east-1.es.amazonaws.com"
)
OPENSEARCH_REGION = "us-east-1"
DYNAMODB_TABLE_REGEX = re.compile(
"arn:aws:dynamodb:.*?:.*?:table/(?P<table>[0-9a-zA-Z_-]+)/.+"
)


class EventType(Enum):
INSERT = "INSERT"
REMOVE = "REMOVE"
MODIFY = "MODIFY"


def lambda_handler(event: Any, context: Any) -> None:
credentials = boto3.Session().get_credentials()
aws_auth = AWSV4SignerAuth(credentials, OPENSEARCH_REGION, "es")
opensearch_client = OpenSearch(
hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
)

counts = defaultdict(int)
# The input of this lambda is a stream of DynamoDB event that we want to
# indexed on OpenSearch
for record in event["Records"]:
event_name = record.get("eventName", "")
try:
if (
event_name == EventType.INSERT.value
or event_name == EventType.MODIFY.value
):
upsert_document(opensearch_client, record)
elif event_name == EventType.REMOVE.value:
remove_document(opensearch_client, record)
else:
warn(f"Unrecognized event type {event_name} in {json.dumps(record)}")

counts[event_name] += 1
except Exception as error:
warn(f"Failed to process {json.dumps(record)}: {error}")

print(f"Finish processing {json.dumps(counts)}")


def extract_dynamodb_table(record: Any) -> Optional[str]:
"""
Extract the DynamoDB table name from the source ARN. This will be used later as
the index name
"""
s = record.get("eventSourceARN", "")
m = DYNAMODB_TABLE_REGEX.match(s)
if not m:
warn(f"Invalid value {s}, expecting a DynamoDB table")
return

return m.group("table").lower()


def extract_dynamodb_key(record: Any) -> Optional[str]:
keys = unmarshal({"M": record.get("dynamodb", {}).get("Keys", {})})
if not keys:
return
return "|".join(keys.values())


def to_number(s: str) -> Union[int, float]:
try:
return int(s)
except ValueError:
return float(s)


def unmarshal(doc: Dict[Any, Any]) -> Any:
"""
Convert the DynamoDB stream record into a regular JSON document. This is done recursively.
At the top level, it will be a dictionary of type M (Map). Here is the list of DynamoDB
attributes to handle:
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html
"""
for k, v in list(doc.items()):
if k == "NULL":
return

if k == "S" or k == "BOOL":
return v

if k == "N":
return to_number(v)

if k == "M":
return {sk: unmarshal(sv) for sk, sv in v.items()}

if k == "BS" or k == "L":
return [unmarshal(item) for item in v]

if k == "SS":
return v.copy()

if k == "NS":
return [to_number(item) for item in v]

return {}


def upsert_document(client: OpenSearch, record: Any) -> None:
"""
Insert a new doc or modify an existing document. The latter happens when the workflow job is
updated (new step, finishing). A record from torchci-workflow-job looks as follows
{
"eventID": "...",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1691722869,
"Keys": {
"dynamoKey": {
"S": "pytorch/pytorch/15806159447"
}
},
"NewImage": {
"workflow_name": {
"S": "pull"
},
"steps": {
"L": [
{
"M": {
"conclusion": {
"NULL": True
},
"number": {
"N": "1"
},
"completed_at": {
"NULL": True
},
"name": {
"S": "Set up job"
},
"started_at": {
"S": "..."
},
"status": {
"S": "in_progress"
}
}
}
]
},
... all other fields ...
},
"OldImage": {
"workflow_name": {
"S": "pull"
},
"steps": {
"L": []
},
... all other fields ...
},
"SequenceNumber": "...",
"SizeBytes": 1763,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:...:table/torchci-workflow-job/stream/..."
}
"""
index = extract_dynamodb_table(record)
if not index:
return

# Create index using the table name if it's not there yet
if not client.indices.exists(index):
# https://www.elastic.co/guide/en/elasticsearch/reference/current/coerce.html
client.indices.create(index, body={"settings": {"index.mapping.coerce": True}})

body = unmarshal({"M": record.get("dynamodb", {}).get("NewImage", {})})
if not body:
return

id = extract_dynamodb_key(record)
if not id:
return

print(f"UPSERTING {id} INTO {index}")
client.index(index=index, body=body, id=id, refresh=True)


def remove_document(client: OpenSearch, record: Any) -> None:
"""
Remove a document. This is here for completeness as we don't remove records from DynamoDB
"""
index = extract_dynamodb_table(record)
if not index:
return

id = extract_dynamodb_key(record)
if not id:
return

print(f"DELETING {id} FROM {index}")
client.delete(index=index, id=id, refresh=True)
3 changes: 3 additions & 0 deletions aws/lambda/opensearch-gha-jobs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3==1.28.24
opensearch-py==2.3.0
pytest==7.4.0
Loading

0 comments on commit 02689f0

Please sign in to comment.