Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add aws self-managed kafka event source #686

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ serverlessExpress({
'AWS_KINESIS_DATA_STREAM': '/kinesis',
'AWS_S3': '/s3',
'AWS_STEP_FUNCTIONS': '/step-functions',
'AWS_SELF_MANAGED_KAFKA': '/self-managed-kafka',
}
})
```
Expand Down
20 changes: 20 additions & 0 deletions __tests__/unit.self-managed-kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const eventSources = require('../src/event-sources')
const testUtils = require('./utils')

const selfManagedKafkaEventSource = eventSources.getEventSource({
eventSourceName: 'AWS_SELF_MANAGED_KAFKA'
})

test('request is correct', () => {
const req = getReq()
expect(typeof req).toEqual('object')
expect(req.headers).toEqual({ host: 'self-managed-kafka' })
expect(req.body).toEqual(testUtils.selfManagedKafkaEvent)
expect(req.method).toEqual('POST')
})

function getReq () {
const event = testUtils.selfManagedKafkaEvent
const request = selfManagedKafkaEventSource.getRequest({ event })
return request
}
44 changes: 43 additions & 1 deletion __tests__/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ const kinesisDataStreamEvent = {
]
}

// Sample event from https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
const selfManagedKafkaEvent = {
eventSource: 'SelfManagedKafka',
bootstrapServers: 'b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092',
records: {
'mytopic-0': [
{
topic: 'mytopic',
partition: 0,
offset: 15,
timestamp: 1545084650987,
timestampType: 'CREATE_TIME',
key: 'abcDEFghiJKLmnoPQRstuVWXyz1234==',
value: 'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==',
headers: [
{
headerKey: [
104,
101,
97,
100,
101,
114,
86,
97,
108,
117,
101
]
}
]
}
]
}
}

describe('getEventSourceNameBasedOnEvent', () => {
test('throws error on empty event', () => {
expect(() => getEventSourceNameBasedOnEvent({ event: {} })).toThrow(
Expand Down Expand Up @@ -263,6 +299,11 @@ describe('getEventSourceNameBasedOnEvent', () => {
expect(result).toEqual('AWS_KINESIS_DATA_STREAM')
})

test('recognises self managed kafka event', () => {
const result = getEventSourceNameBasedOnEvent({ event: selfManagedKafkaEvent })
expect(result).toEqual('AWS_SELF_MANAGED_KAFKA')
})

test('recognizes eventbridge event', () => {
const result = getEventSourceNameBasedOnEvent({ event: eventbridgeEvent })
expect(result).toEqual('AWS_EVENTBRIDGE')
Expand All @@ -287,5 +328,6 @@ module.exports = {
eventbridgeEvent,
eventbridgeScheduledEvent,
eventbridgeCustomerEvent,
kinesisDataStreamEvent
kinesisDataStreamEvent,
selfManagedKafkaEvent
}
2 changes: 1 addition & 1 deletion src/configure.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Handler } from 'aws-lambda';
import { Logger } from './logger';
import Framework from './frameworks';

type EventSources = 'AWS_SNS' | 'AWS_DYNAMODB' | 'AWS_EVENTBRIDGE' | 'AWS_SQS' | 'AWS_KINESIS_DATA_STREAM' | 'AWS_S3' | 'AWS_STEP_FUNCTIONS';
type EventSources = 'AWS_SNS' | 'AWS_DYNAMODB' | 'AWS_EVENTBRIDGE' | 'AWS_SQS' | 'AWS_KINESIS_DATA_STREAM' | 'AWS_S3' | 'AWS_STEP_FUNCTIONS' | 'AWS_SELF_MANAGED_KAFKA';

interface EventSource {
getRequest?: any; // TODO:
Expand Down
17 changes: 17 additions & 0 deletions src/event-sources/aws/self-managed-kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const { emptyResponseMapper } = require('../utils')

const getRequestValuesFromSelfManagedKafka = ({ event }) => {
const method = 'POST'
const headers = { host: 'self-managed-kafka' }
const body = event

return {
method,
headers,
body
}
}
module.exports = {
getRequest: getRequestValuesFromSelfManagedKafka,
getResponse: emptyResponseMapper
}
3 changes: 3 additions & 0 deletions src/event-sources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const awsEventBridgeEventSource = require('./aws/eventbridge')
const awsKinesisEventSource = require('./aws/kinesis')
const awsS3 = require('./aws/s3')
const awsStepFunctionsEventSource = require('./aws/step-functions')
const awsSelfManagedKafkaEventSource = require('./aws/self-managed-kafka')

function getEventSource ({ eventSourceName }) {
switch (eventSourceName) {
Expand Down Expand Up @@ -40,6 +41,8 @@ function getEventSource ({ eventSourceName }) {
return awsS3
case 'AWS_STEP_FUNCTIONS':
return awsStepFunctionsEventSource
case 'AWS_SELF_MANAGED_KAFKA':
return awsSelfManagedKafkaEventSource
default:
throw new Error('Couldn\'t detect valid event source.')
}
Expand Down
1 change: 1 addition & 0 deletions src/event-sources/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ function getEventSourceNameBasedOnEvent ({
event
}) {
if (event.requestContext && event.requestContext.elb) return 'AWS_ALB'
if (event.eventSource === 'SelfManagedKafka') return 'AWS_SELF_MANAGED_KAFKA'
if (event.Records) {
const eventSource = event.Records[0] ? event.Records[0].EventSource || event.Records[0].eventSource : undefined
if (eventSource === 'aws:sns') {
Expand Down