Skip to content

Commit b552c9c

Browse files
authored
Merge pull request #2543 from zalilias/zalilias-serverless-messaging
New serverless pattern - Serverless Messaging Redrive
2 parents f354293 + 59c1eef commit b552c9c

File tree

9 files changed

+795
-0
lines changed

9 files changed

+795
-0
lines changed
+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Serverless Message Processing Pattern
2+
3+
## Overview
4+
An adaptable pattern for message processing using AWS serverless services, featuring error handling and automatic recovery mechanisms.
5+
6+
## Core Components
7+
- Amazon API Gateway (message ingestion)
8+
- Amazon SQS Queues (main + DLQs)
9+
- Lambda Functions (processing + recovery)
10+
11+
12+
## Architecture Diagram
13+
14+
![Architecture Diagram](architecture.jpeg)
15+
16+
17+
## Basic Flow
18+
1. Messages enter through API Gateway
19+
2. Main queue receives messages
20+
3. Lambda function polls the main queue using Event Source Mappings (ESMs) and handles the messages.
21+
Read more about how Lambda synchronously processes queue messages in this [documentation.](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html)
22+
4. Failed messages route to DLQs -In this case a failed message would be a malformed email, however this can be adapted to other use cases.
23+
5. Decision maker attempts an automated recovery -In this sample, we remediate common email malform issues including whitespace and typos in domain extensions.
24+
25+
## Deployment
26+
# Build the SAM application
27+
28+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
29+
```
30+
git clone https://github.com/aws-samples/serverless-patterns
31+
```
32+
2. Change directory to the pattern directory:
33+
```
34+
cd serverless-patterns/serverless-message-redrive
35+
```
36+
37+
# Build the SAM application
38+
The ```sam build ``` command prepares an application for subsequent steps in the developer workflow, such as local testing or deploying to AWS.
39+
40+
```
41+
sam build
42+
```
43+
# Deploy the application
44+
The ```sam deploy``` command deploys an application to the AWS Cloud using AWS CloudFormation. The ```--guided``` option is to have the AWS SAM CLI use prompts to guide you through the deployment.
45+
46+
```
47+
sam deploy --guided
48+
```
49+
50+
## Key Features
51+
- Automatic retry mechanism
52+
- Segregation of recoverable/fatal errors
53+
- Processing logic with the potential for points of adaptation
54+
55+
## API Reference
56+
# Send Message
57+
58+
The following is an example API call that you can try with your own endpoint.
59+
60+
```
61+
62+
curl -X POST \
63+
'https://\${endpoint}/prod/message' \
64+
-H 'Content-Type: application/json' \
65+
-d '{
66+
"messageId": "test-456",
67+
"messageType": "TYPE_A",
68+
"payload": {
69+
"email": "[email protected]",
70+
"data": "some data"
71+
},
72+
"timestamp": "2023-11-22T12:00:00Z"
73+
}'
74+
```
75+
76+
77+
## Adaptation Points
78+
- Message validation rules
79+
- Processing logic
80+
- Error handling strategies
81+
- Recovery mechanisms
82+
- Monitoring requirements
83+
- API Design
84+
85+
## Cleanup
86+
1. Delete the SAM template
87+
```
88+
sam delete
89+
```
90+
2. Confirm the stack has been deleted
91+
```
92+
aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus"
93+
```
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
{
2+
"title": "Serverless Messaging Redrive",
3+
"description": "Automate the redrive and fixing of queue messages using AWS Lambda.",
4+
"language": "Python",
5+
"level": "200",
6+
"framework": "SAM",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions" ]
11+
},
12+
"gitHub": {
13+
"template": {
14+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/serverless-messaging-processing",
15+
"templateURL": "serverless-patterns/serverless-messaging-processing",
16+
"projectFolder": "serverless-messaging-processing",
17+
"templateFile": "template.yaml"
18+
}
19+
},
20+
"resources": {
21+
"bullets": [
22+
{
23+
"text": "Amazon SQS Docs",
24+
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html"
25+
},
26+
{
27+
"text": "Using dead-letter queues in Amazon SQS",
28+
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html"
29+
}
30+
]
31+
},
32+
"deploy": {
33+
"text": [
34+
"sam build",
35+
"sam deploy --guided"
36+
]
37+
},
38+
"testing": {
39+
"text": [
40+
"See the GitHub repo for detailed testing instructions."
41+
]
42+
},
43+
"cleanup": {
44+
"text": [
45+
"Delete the stack: <code>sam delete</code>."
46+
]
47+
},
48+
"authors": [
49+
{
50+
"name": "Ilias Ali",
51+
"image": "https://avatars.githubusercontent.com/zalilias",
52+
"bio": "I am a Solutions Architect working at AWS based in the UK.",
53+
"linkedin": "ilias-ali-0849991a4"
54+
}
55+
]
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import json
2+
import re
3+
import boto3
4+
import os
5+
import logging
6+
7+
# Set up logging
8+
logger = logging.getLogger()
9+
logger.setLevel(logging.INFO)
10+
11+
# Initialize AWS clients
12+
sqs = boto3.client('sqs')
13+
14+
# Environment variables
15+
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
16+
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']
17+
18+
def fix_email(email):
19+
"""
20+
Attempt to fix common email format issues
21+
Args:
22+
email: String containing malformed email
23+
Returns:
24+
str: Fixed email or original if unfixable
25+
"""
26+
try:
27+
logger.info(f"Starting email fix attempt for: {email}")
28+
29+
# Remove whitespace
30+
email = email.strip()
31+
32+
# Handle multiple @ symbols
33+
if email.count('@') > 1:
34+
parts = email.split('@')
35+
email = f"{parts[0]}@{parts[-1]}"
36+
logger.info(f"Fixed multiple @ symbols. Result: {email}")
37+
38+
# Common domain typo fixes
39+
domain_fixes = {
40+
'.con': '.com',
41+
'.vom': '.com',
42+
'.comm': '.com',
43+
'.orgg': '.org',
44+
'.nett': '.net',
45+
'.ckm': '.com',
46+
'.cm': '.com'
47+
}
48+
49+
original_email = email
50+
for wrong, right in domain_fixes.items():
51+
if email.endswith(wrong):
52+
email = email[:-len(wrong)] + right
53+
logger.info(f"Fixed domain from {wrong} to {right}. Before: {original_email}, After: {email}")
54+
break
55+
56+
return email
57+
except Exception as e:
58+
logger.error(f"Error fixing email: {str(e)}")
59+
return None
60+
61+
def validate_fixed_email(email):
62+
"""
63+
Validate fixed email format.
64+
Args:
65+
email: String containing fixed email
66+
Returns:
67+
bool: True if valid email format, False otherwise
68+
"""
69+
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\$'
70+
return bool(re.match(email_pattern, email))
71+
72+
def lambda_handler(event, context):
73+
"""
74+
Processes messages from a DLQ that have failed validation,
75+
attempting to fix common email format issues.
76+
If fixed successfully, messages are sent back to the main queue.
77+
If unfixable, messages are sent to a fatal DLQ.
78+
79+
Flow:
80+
1. Extract email from failed message
81+
2. Attempt to fix common issues
82+
3. If fixed → Main Queue
83+
4. If unfixable → Fatal DLQ
84+
85+
Extension points:
86+
1. Add more sophisticated routing logic- including a delay queue
87+
2. Implement custom error handling
88+
3. Add message transformation
89+
4. Implement retry mechanisms
90+
5. Add monitoring and metrics
91+
92+
Args:
93+
event: Lambda event object containing SQS messages
94+
context: Lambda context object
95+
Returns:
96+
dict: Processing summary with counts and batchItemFailures
97+
"""
98+
processed_count = 0
99+
fixed_count = 0
100+
fatal_count = 0
101+
failed_message_ids = []
102+
103+
logger.info(f"Starting to process batch of {len(event['Records'])} messages")
104+
105+
for record in event['Records']:
106+
original_message_id = "unknown"
107+
try:
108+
# Parse the failed message
109+
message = json.loads(record['body'])
110+
original_message_id = message.get('messageId', 'unknown')
111+
112+
# Detailed message content logging
113+
logger.info(f"Processing message content: {json.dumps(message, indent=2)}")
114+
115+
# Check if message has already been remediated
116+
if 'remediation' in message:
117+
logger.info("Message already remediated, skipping processing")
118+
continue
119+
120+
# Extract email from payload
121+
if 'payload' in message and 'email' in message['payload']:
122+
original_email = message['payload']['email']
123+
124+
# Attempt to fix email
125+
fixed_email = fix_email(original_email)
126+
127+
if fixed_email and validate_fixed_email(fixed_email):
128+
# Update message with fixed email
129+
message['payload']['email'] = fixed_email
130+
message['remediation'] = {
131+
'original_email': original_email,
132+
'fixed_email': fixed_email,
133+
'timestamp': context.invoked_function_arn
134+
}
135+
136+
# Send back to main queue
137+
sqs.send_message(
138+
QueueUrl=MAIN_QUEUE_URL,
139+
MessageBody=json.dumps(message)
140+
)
141+
fixed_count += 1
142+
else:
143+
# Send to fatal DLQ if unfixable
144+
message['failureReason'] = 'Email could not be remediated'
145+
sqs.send_message(
146+
QueueUrl=FATAL_DLQ_URL,
147+
MessageBody=json.dumps(message)
148+
)
149+
fatal_count += 1
150+
else:
151+
# Send to fatal DLQ if message structure is invalid
152+
message['failureReason'] = 'Invalid message structure - missing email in payload'
153+
sqs.send_message(
154+
QueueUrl=FATAL_DLQ_URL,
155+
MessageBody=json.dumps(message)
156+
)
157+
fatal_count += 1
158+
159+
processed_count += 1
160+
161+
except Exception as e:
162+
logger.error(f"Error processing message {original_message_id}: {str(e)}")
163+
# Add message ID to failed messages list
164+
failed_message_ids.append(record['messageId'])
165+
try:
166+
error_message = {
167+
'originalMessage': record['body'],
168+
'failureReason': f"Remediation error: {str(e)}",
169+
'timestamp': context.invoked_function_arn
170+
}
171+
sqs.send_message(
172+
QueueUrl=FATAL_DLQ_URL,
173+
MessageBody=json.dumps(error_message)
174+
)
175+
fatal_count += 1
176+
except Exception as fatal_e:
177+
logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}")
178+
179+
# Execution summary
180+
logger.info(f"""
181+
=== Execution Summary ===
182+
Messages Processed: {processed_count}
183+
Messages Fixed: {fixed_count}
184+
Messages Fatal: {fatal_count}
185+
Messages Failed: {len(failed_message_ids)}
186+
========================
187+
""")
188+
189+
# Return both the processing info and the batch failures for SQS
190+
result = {
191+
'batchItemFailures': [{"itemIdentifier": message_id} for message_id in failed_message_ids],
192+
'processingInfo': {
193+
'processed': processed_count,
194+
'fixed': fixed_count,
195+
'fatal': fatal_count
196+
}
197+
}
198+
199+
return result
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
boto3==1.26.137

0 commit comments

Comments
 (0)