Skip to content

Commit

Permalink
Improve Kinesis event recort type
Browse files Browse the repository at this point in the history
Make the encryption type an enum.
Make sequence number and partition key non-optional.

Signed-off-by: David Calavera <[email protected]>
  • Loading branch information
calavera committed Jun 1, 2024
1 parent 31250f3 commit 6b92db9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
29 changes: 26 additions & 3 deletions lambda-events/src/event/kinesis/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,24 @@ pub struct KinesisEventRecord {
pub struct KinesisRecord {
pub approximate_arrival_timestamp: SecondTimestamp,
pub data: Base64Data,
pub encryption_type: Option<String>,
#[serde(default)]
pub partition_key: Option<String>,
pub encryption_type: KinesisEncryptionType,
#[serde(default)]
pub sequence_number: Option<String>,
pub partition_key: String,
#[serde(default)]
pub sequence_number: String,
#[serde(default)]
pub kinesis_schema_version: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum KinesisEncryptionType {
#[default]
None,
Kms,
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -80,6 +89,20 @@ mod test {
fn example_kinesis_event() {
let data = include_bytes!("../../fixtures/example-kinesis-event.json");
let parsed: KinesisEvent = serde_json::from_slice(data).unwrap();
assert_eq!(KinesisEncryptionType::None, parsed.records[0].kinesis.encryption_type);

let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: KinesisEvent = serde_json::from_slice(output.as_bytes()).unwrap();
assert_eq!(parsed, reparsed);
}

#[test]
#[cfg(feature = "kinesis")]
fn example_kinesis_event_encrypted() {
let data = include_bytes!("../../fixtures/example-kinesis-event-encrypted.json");
let parsed: KinesisEvent = serde_json::from_slice(data).unwrap();
assert_eq!(KinesisEncryptionType::Kms, parsed.records[0].kinesis.encryption_type);

let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: KinesisEvent = serde_json::from_slice(output.as_bytes()).unwrap();
assert_eq!(parsed, reparsed);
Expand Down
37 changes: 37 additions & 0 deletions lambda-events/src/fixtures/example-kinesis-event-encrypted.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333333333333333333333333333333333333333333",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480641523.477,
"encryptionType": "KMS"
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333334444444444444444444444444444444444444",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480841523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
}
]
}

0 comments on commit 6b92db9

Please sign in to comment.