Skip to content

Commit

Permalink
Merge pull request #227 from risenberg-cyberark/kinesis
Browse files Browse the repository at this point in the history
feat: Add Kinesis lambda event support to Parser utility
  • Loading branch information
heitorlessa authored Dec 4, 2020
2 parents b82ea7a + 43e175d commit ee18f83
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 0 deletions.
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/parser/envelopes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope
from .sqs import SqsEnvelope

__all__ = [
"CloudWatchLogsEnvelope",
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
"KinesisDataStreamEnvelope",
"SnsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
Expand Down
43 changes: 43 additions & 0 deletions aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from typing import Any, Dict, List, Optional, Union

from ..models import KinesisDataStreamModel
from ..types import Model
from .base import BaseEnvelope

logger = logging.getLogger(__name__)


class KinesisDataStreamEnvelope(BaseEnvelope):
"""Kinesis Data Stream Envelope to extract array of Records
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and npt as JSON (and vice versa)
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
"""Parses records found with model provided
Parameters
----------
data : Dict
Lambda event to be parsed
model : Model
Data model provided to parse after extracting data using envelope
Returns
-------
List
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data)
output = []
logger.debug(f"Parsing Kinesis records in `body` with {model}")
for record in parsed_envelope.Records:
output.append(self._parse(data=record.kinesis.data.decode("utf-8"), model=model))
return output
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
from .s3 import S3Model, S3RecordModel
from .ses import SesModel, SesRecordModel
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
Expand All @@ -19,6 +20,9 @@
"EventBridgeModel",
"DynamoDBStreamChangedRecordModel",
"DynamoDBStreamRecordModel",
"KinesisDataStreamModel",
"KinesisDataStreamRecord",
"KinesisDataStreamRecordPayload",
"S3Model",
"S3RecordModel",
"SesModel",
Expand Down
41 changes: 41 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import base64
import logging
from binascii import Error as BinAsciiError
from typing import List

from pydantic import BaseModel, validator
from pydantic.types import PositiveInt
from typing_extensions import Literal

logger = logging.getLogger(__name__)


class KinesisDataStreamRecordPayload(BaseModel):
kinesisSchemaVersion: str
partitionKey: str
sequenceNumber: PositiveInt
data: bytes # base64 encoded str is parsed into bytes
approximateArrivalTimestamp: float

@validator("data", pre=True)
def data_base64_decode(cls, value):
try:
logger.debug("Decoding base64 Kinesis data record before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")


class KinesisDataStreamRecord(BaseModel):
eventSource: Literal["aws:kinesis"]
eventVersion: str
eventID: str
eventName: Literal["aws:kinesis:record"]
invokeIdentityArn: str
awsRegion: str
eventSourceARN: str
kinesis: KinesisDataStreamRecordPayload


class KinesisDataStreamModel(BaseModel):
Records: List[KinesisDataStreamRecord]
2 changes: 2 additions & 0 deletions docs/content/utilities/parser.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Model name | Description
**AlbModel** | Lambda Event Source payload for Amazon Application Load Balancer
**CloudwatchLogsModel** | Lambda Event Source payload for Amazon CloudWatch Logs
**S3Model** | Lambda Event Source payload for Amazon S3
**KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams

You can extend them to include your own models, and yet have all other known fields parsed along the way.

Expand Down Expand Up @@ -296,6 +297,7 @@ Envelope name | Behaviour | Return
**EventBridgeEnvelope** | 1. Parses data using `EventBridgeModel`. <br/> 2. Parses `detail` key using your model and returns it. | `Model`
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]`

### Bringing your own envelope

Expand Down
5 changes: 5 additions & 0 deletions tests/functional/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class MyAdvancedSnsBusiness(SnsModel):
Records: List[MyAdvancedSnsRecordModel]


class MyKinesisBusiness(BaseModel):
message: str
username: str


class MyCloudWatchBusiness(BaseModel):
my_message: str
user: str
106 changes: 106 additions & 0 deletions tests/functional/parser/test_kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import Any, List

import pytest

from aws_lambda_powertools.utilities.parser import ValidationError, envelopes, event_parser
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamModel, KinesisDataStreamRecordPayload
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyKinesisBusiness
from tests.functional.parser.utils import load_event


@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)
def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):
assert len(event) == 1
record: KinesisDataStreamModel = event[0]
assert record.message == "test message"
assert record.username == "test"


@event_parser(model=KinesisDataStreamModel)
def handle_kinesis_no_envelope(event: KinesisDataStreamModel, _: LambdaContext):
records = event.Records
assert len(records) == 2
record: KinesisDataStreamModel = records[0]

assert record.awsRegion == "us-east-2"
assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"
assert record.eventName == "aws:kinesis:record"
assert record.eventSource == "aws:kinesis"
assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
assert record.eventVersion == "1.0"
assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"

kinesis: KinesisDataStreamRecordPayload = record.kinesis
assert kinesis.approximateArrivalTimestamp == 1545084650.987
assert kinesis.kinesisSchemaVersion == "1.0"
assert kinesis.partitionKey == "1"
assert kinesis.sequenceNumber == 49590338271490256608559692538361571095921575989136588898
assert kinesis.data == b"Hello, this is a test."


def test_kinesis_trigger_event():
event_dict = {
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=",
"approximateArrivalTimestamp": 1545084650.987,
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
}
]
}

handle_kinesis(event_dict, LambdaContext())


def test_kinesis_trigger_bad_base64_event():
event_dict = {
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "bad",
"approximateArrivalTimestamp": 1545084650.987,
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
}
]
}
with pytest.raises(ValidationError):
handle_kinesis_no_envelope(event_dict, LambdaContext())


def test_kinesis_trigger_event_no_envelope():
event_dict = load_event("kinesisStreamEvent.json")
handle_kinesis_no_envelope(event_dict, LambdaContext())


def test_validate_event_does_not_conform_with_model_no_envelope():
event_dict: Any = {"hello": "s"}
with pytest.raises(ValidationError):
handle_kinesis_no_envelope(event_dict, LambdaContext())


def test_validate_event_does_not_conform_with_model():
event_dict: Any = {"hello": "s"}
with pytest.raises(ValidationError):
handle_kinesis(event_dict, LambdaContext())

0 comments on commit ee18f83

Please sign in to comment.