Skip to content

docs(appsync_events): improve AppSync events documentation #6572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cfnlintrc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ignore_templates:
- examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml
53 changes: 30 additions & 23 deletions docs/core/event_handler/appsync_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ stateDiagram-v2
* Easily handle publish and subscribe events with dedicated handler methods
* Automatic routing based on namespace and channel patterns
* Support for wildcard patterns to create catch-all handlers
* Process events in parallel corontrol aggregation for batch processing
* Support for async functions
* Aggregation for batch processing
* Graceful error handling for individual events

## Terminology
Expand All @@ -59,6 +60,12 @@ It handles connection management, message broadcasting, authentication, and moni

You must have an existing AppSync Events API with real-time capabilities enabled and IAM permissions to invoke your Lambda function. That said, there are no additional permissions required to use Event Handler as routing requires no dependency (_standard library_).

=== "getting_started_with_appsync_events.yaml"

```yaml
--8<-- "examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml"
```

### AppSync request and response format

AppSync Events uses a specific event format for Lambda requests and responses. In most scenarios, Powertools for AWS simplifies this interaction by automatically formatting resolver returns to match the expected AppSync response structure.
Expand Down Expand Up @@ -102,12 +109,16 @@ When processing events with Lambda, you can return errors to AppSync in three wa

You can define your handlers for different event types using the `app.on_publish()`, `app.async_on_publish()`, and `app.on_subscribe()` methods.

By default, the resolver processes messages individually. For batch processing, see the [Aggregated Processing](#aggregated-processing) section.

=== "getting_started_with_publish_events.py"

```python hl_lines="5 10 13"
--8<-- "examples/event_handler_appsync_events/src/getting_started_with_publish_events.py"
```

1. The `payload` argument is mandatory and will be passed as a dictionary.

=== "getting_started_with_subscribe_events.py"

```python hl_lines="6 7 13 17"
Expand Down Expand Up @@ -159,6 +170,8 @@ You can enable this with the `aggregate` parameter:
--8<-- "examples/event_handler_appsync_events/src/working_with_aggregated_events.py"
```

1. The `payload` argument is mandatory and will be passed as a list of dictionary.

### Handling errors

You can filter or reject events by raising exceptions in your resolvers or by formatting the payload according to the expected response structure. This instructs AppSync not to propagate that specific message, so subscribers will not receive it.
Expand Down Expand Up @@ -191,22 +204,22 @@ When processing batch of items with `aggregate=True`, you must format the payloa

=== "working_with_error_handling_response.json"

```python hl_lines="4"
```json hl_lines="4"
--8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json"
```

If instead you want to fail the entire batch, you can throw an exception. This will cause the Event Handler to return an error response to AppSync and fail the entire batch.

=== "working_with_error_handling_multiple.py"
=== "fail_entire_batch.py"

```python hl_lines="5 6 13"
--8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py"
```python hl_lines="6 15 19 30"
--8<-- "examples/event_handler_appsync_events/src/fail_entire_batch.py"
```

=== "working_with_error_handling_response.json"
=== "fail_entire_batch_response.json"

```python hl_lines="5 6 13"
--8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json"
```json
--8<-- "examples/event_handler_appsync_events/src/fail_entire_batch_response.json"
```

#### Authorization control
Expand All @@ -218,16 +231,10 @@ You can also do content based authorization for channel by raising the `Unauthor
* **When working with publish events** Powertools for AWS stop processing messages and subscribers will not receive any message.
* **When working with subscribe events** the subscription won't be established.

=== "working_with_error_handling.py"
=== "working_with_authorization_control.py"

```python hl_lines="5 6 13"
--8<-- "examples/event_handler_appsync_events/src/working_with_error_handling.py"
```

=== "working_with_error_handling_response.json"

```python hl_lines="5 6 13"
--8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json"
```python hl_lines="6 21 31"
--8<-- "examples/event_handler_appsync_events/src/working_with_authorization_control.py"
```

### Processing events with async resolvers
Expand All @@ -241,7 +248,7 @@ We use `asyncio` module to support async functions, and we ensure reliable execu

=== "working_with_async_resolvers.py"

```python hl_lines="5 6 13"
```python hl_lines="6 14"
--8<-- "examples/event_handler_appsync_events/src/working_with_async_resolvers.py"
```

Expand All @@ -251,7 +258,7 @@ You can access to the original Lambda event or context for additional informatio

=== "accessing_event_and_context.py"

```python hl_lines="5 6 13"
```python hl_lines="17"
--8<-- "examples/event_handler_appsync_events/src/accessing_event_and_context.py"
```

Expand Down Expand Up @@ -363,26 +370,26 @@ You can test your event handlers by passing a mocked or actual AppSync Events La

=== "getting_started_with_testing_publish.py"

```python hl_lines="5 6 13"
```python
--8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_publish.py"
```

=== "getting_started_with_testing_publish_event.json"

```python hl_lines="5 6 13"
```json
--8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_publish_event.json"
```

### Testing subscribe events

=== "getting_started_with_testing_subscribe.py"

```python hl_lines="5 6 13"
```python
--8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe.py"
```

=== "getting_started_with_testing_subscribe_event.json"

```python hl_lines="5 6 13"
```json
--8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe_event.json"
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Metadata:
cfn-lint:
ignore_checks:
- E3002

Globals:
Function:
Timeout: 5
MemorySize: 256
Runtime: python3.13
Tracing: Active
Environment:
Variables:
POWERTOOLS_LOG_LEVEL: INFO
POWERTOOLS_SERVICE_NAME: hello

Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
CodeUri: hello_world

WebsocketAPI:
Type: AWS::AppSync::Api
Properties:
EventConfig:
AuthProviders:
- AuthType: API_KEY
ConnectionAuthModes:
- AuthType: API_KEY
DefaultPublishAuthModes:
- AuthType: API_KEY
DefaultSubscribeAuthModes:
- AuthType: API_KEY
Name: RealTimeEventAPI

NameSpaceDataSource:
Type: AWS::AppSync::DataSource
Properties:
ApiId: !GetAtt WebsocketAPI.ApiId
LambdaConfig:
LambdaFunctionArn: !GetAtt HelloWorldFunction.Arn
Name: powertools_lambda
ServiceRoleArn: !GetAtt DataSourceIAMRole.Arn
Type: AWS_LAMBDA

WebsocketApiKey:
Type: AWS::AppSync::ApiKey
Properties:
ApiId: !GetAtt WebsocketAPI.ApiId

WebsocketAPINamespace:
Type: AWS::AppSync::ChannelNamespace
Properties:
ApiId: !GetAtt WebsocketAPI.ApiId
Name: powertools
HandlerConfigs:
OnPublish:
Behavior: DIRECT
Integration:
DataSourceName: powertools_lambda
LambdaConfig:
InvokeType: REQUEST_RESPONSE
OnSubscribe:
Behavior: DIRECT
Integration:
DataSourceName: powertools_lambda
LambdaConfig:
InvokeType: REQUEST_RESPONSE

DataSourceIAMRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: appsync.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: LambdaInvokePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: !GetAtt HelloWorldFunction.Arn
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
app = AppSyncEventsResolver()


class ValidationError(Exception):
pass


@app.on_publish("/default/channel1")
def handle_channel1_publish(payload: dict[str, Any]):
# Access the full event and context
Expand Down
36 changes: 36 additions & 0 deletions examples/event_handler_appsync_events/src/fail_entire_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()
logger = Logger()


class ChannelException(Exception):
pass


@app.on_publish("/default/*", aggregate=True)
def handle_default_namespace_batch(payload: list[dict[str, Any]]):
results: list = []

# Process all events in the batch together
for event in payload:
try:
# Process each event
results.append({"id": event.get("id"), "payload": {"processed": True, "originalEvent": event}})
except Exception as e:
logger.error("Found and error")
raise ChannelException("An exception occurred") from e

return results


def lambda_handler(event: dict, context: LambdaContext):
return app.resolve(event, context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"error": "ChannelException - An exception occurred"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
def handle_channel1_publish(payload: dict[str, Any]): # (1)!
# Process the payload for this specific channel
return {
"processed": True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ def marshall(item: dict[str, Any]) -> dict[str, Any]:


@app.on_publish("/default/foo/*", aggregate=True)
async def handle_default_namespace_batch(payload: list[dict[str, Any]]):
async def handle_default_namespace_batch(payload: list[dict[str, Any]]): # (1)!
write_operations: list = []

write_operations.extend({"PutRequest": {"Item": marshall(item)}} for item in payload)

# Executar operação de lote no DynamoDB
if write_operations:
dynamodb.batch_write_item(
RequestItems={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined]
from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
from aws_lambda_powertools.utilities.typing import LambdaContext
Expand All @@ -13,8 +13,7 @@

@app.async_on_publish("/default/channel1")
async def handle_channel1_publish(payload: dict[str, Any]):
result = await async_process_data(payload)
return result
return await async_process_data(payload)


async def async_process_data(payload: dict[str, Any]):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver
from aws_lambda_powertools.event_handler.events_appsync.exceptions import UnauthorizedException

if TYPE_CHECKING:
from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()


@app.on_publish("/default/foo")
def handle_specific_channel(payload: dict[str, Any]):
return payload


@app.on_publish("/*")
def handle_root_channel(payload: dict[str, Any]):
raise UnauthorizedException("You can only publish to /default/foo")


@app.on_subscribe("/default/foo")
def handle_subscription_specific_channel():
return True


@app.on_subscribe("/*")
def handle_subscription_root_channel():
raise UnauthorizedException("You can only subscribe to /default/foo")


def lambda_handler(event: dict, context: LambdaContext):
return app.resolve(event, context)
Loading