diff --git a/.cfnlintrc.yaml b/.cfnlintrc.yaml new file mode 100644 index 00000000000..3909b9bb437 --- /dev/null +++ b/.cfnlintrc.yaml @@ -0,0 +1,2 @@ +ignore_templates: + - examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml diff --git a/docs/core/event_handler/appsync_events.md b/docs/core/event_handler/appsync_events.md index 82e87f60eda..72595f9ce94 100644 --- a/docs/core/event_handler/appsync_events.md +++ b/docs/core/event_handler/appsync_events.md @@ -41,7 +41,8 @@ stateDiagram-v2 * Easily handle publish and subscribe events with dedicated handler methods * Automatic routing based on namespace and channel patterns * Support for wildcard patterns to create catch-all handlers -* Process events in parallel corontrol aggregation for batch processing +* Support for async functions +* Aggregation for batch processing * Graceful error handling for individual events ## Terminology @@ -59,6 +60,12 @@ It handles connection management, message broadcasting, authentication, and moni You must have an existing AppSync Events API with real-time capabilities enabled and IAM permissions to invoke your Lambda function. That said, there are no additional permissions required to use Event Handler as routing requires no dependency (_standard library_). +=== "getting_started_with_appsync_events.yaml" + + ```yaml + --8<-- "examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml" + ``` + ### AppSync request and response format AppSync Events uses a specific event format for Lambda requests and responses. In most scenarios, Powertools for AWS simplifies this interaction by automatically formatting resolver returns to match the expected AppSync response structure. @@ -102,12 +109,16 @@ When processing events with Lambda, you can return errors to AppSync in three wa You can define your handlers for different event types using the `app.on_publish()`, `app.async_on_publish()`, and `app.on_subscribe()` methods. +By default, the resolver processes messages individually. For batch processing, see the [Aggregated Processing](#aggregated-processing) section. + === "getting_started_with_publish_events.py" ```python hl_lines="5 10 13" --8<-- "examples/event_handler_appsync_events/src/getting_started_with_publish_events.py" ``` + 1. The `payload` argument is mandatory and will be passed as a dictionary. + === "getting_started_with_subscribe_events.py" ```python hl_lines="6 7 13 17" @@ -159,6 +170,8 @@ You can enable this with the `aggregate` parameter: --8<-- "examples/event_handler_appsync_events/src/working_with_aggregated_events.py" ``` + 1. The `payload` argument is mandatory and will be passed as a list of dictionary. + ### Handling errors You can filter or reject events by raising exceptions in your resolvers or by formatting the payload according to the expected response structure. This instructs AppSync not to propagate that specific message, so subscribers will not receive it. @@ -191,22 +204,22 @@ When processing batch of items with `aggregate=True`, you must format the payloa === "working_with_error_handling_response.json" - ```python hl_lines="4" + ```json hl_lines="4" --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" ``` If instead you want to fail the entire batch, you can throw an exception. This will cause the Event Handler to return an error response to AppSync and fail the entire batch. -=== "working_with_error_handling_multiple.py" +=== "fail_entire_batch.py" - ```python hl_lines="5 6 13" - --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py" + ```python hl_lines="6 15 19 30" + --8<-- "examples/event_handler_appsync_events/src/fail_entire_batch.py" ``` -=== "working_with_error_handling_response.json" +=== "fail_entire_batch_response.json" - ```python hl_lines="5 6 13" - --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" + ```json + --8<-- "examples/event_handler_appsync_events/src/fail_entire_batch_response.json" ``` #### Authorization control @@ -218,16 +231,10 @@ You can also do content based authorization for channel by raising the `Unauthor * **When working with publish events** Powertools for AWS stop processing messages and subscribers will not receive any message. * **When working with subscribe events** the subscription won't be established. -=== "working_with_error_handling.py" +=== "working_with_authorization_control.py" - ```python hl_lines="5 6 13" - --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling.py" - ``` - -=== "working_with_error_handling_response.json" - - ```python hl_lines="5 6 13" - --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" + ```python hl_lines="6 21 31" + --8<-- "examples/event_handler_appsync_events/src/working_with_authorization_control.py" ``` ### Processing events with async resolvers @@ -241,7 +248,7 @@ We use `asyncio` module to support async functions, and we ensure reliable execu === "working_with_async_resolvers.py" - ```python hl_lines="5 6 13" + ```python hl_lines="6 14" --8<-- "examples/event_handler_appsync_events/src/working_with_async_resolvers.py" ``` @@ -251,7 +258,7 @@ You can access to the original Lambda event or context for additional informatio === "accessing_event_and_context.py" - ```python hl_lines="5 6 13" + ```python hl_lines="17" --8<-- "examples/event_handler_appsync_events/src/accessing_event_and_context.py" ``` @@ -363,13 +370,13 @@ You can test your event handlers by passing a mocked or actual AppSync Events La === "getting_started_with_testing_publish.py" - ```python hl_lines="5 6 13" + ```python --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_publish.py" ``` === "getting_started_with_testing_publish_event.json" - ```python hl_lines="5 6 13" + ```json --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_publish_event.json" ``` @@ -377,12 +384,12 @@ You can test your event handlers by passing a mocked or actual AppSync Events La === "getting_started_with_testing_subscribe.py" - ```python hl_lines="5 6 13" + ```python --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe.py" ``` === "getting_started_with_testing_subscribe_event.json" - ```python hl_lines="5 6 13" + ```json --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe_event.json" ``` diff --git a/examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml b/examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml new file mode 100644 index 00000000000..ac154a47920 --- /dev/null +++ b/examples/event_handler_appsync_events/sam/getting_started_with_appsync_events.yaml @@ -0,0 +1,93 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 + +Metadata: + cfn-lint: + ignore_checks: + - E3002 + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.13 + Tracing: Active + Environment: + Variables: + POWERTOOLS_LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: index.handler + CodeUri: hello_world + + WebsocketAPI: + Type: AWS::AppSync::Api + Properties: + EventConfig: + AuthProviders: + - AuthType: API_KEY + ConnectionAuthModes: + - AuthType: API_KEY + DefaultPublishAuthModes: + - AuthType: API_KEY + DefaultSubscribeAuthModes: + - AuthType: API_KEY + Name: RealTimeEventAPI + + NameSpaceDataSource: + Type: AWS::AppSync::DataSource + Properties: + ApiId: !GetAtt WebsocketAPI.ApiId + LambdaConfig: + LambdaFunctionArn: !GetAtt HelloWorldFunction.Arn + Name: powertools_lambda + ServiceRoleArn: !GetAtt DataSourceIAMRole.Arn + Type: AWS_LAMBDA + + WebsocketApiKey: + Type: AWS::AppSync::ApiKey + Properties: + ApiId: !GetAtt WebsocketAPI.ApiId + + WebsocketAPINamespace: + Type: AWS::AppSync::ChannelNamespace + Properties: + ApiId: !GetAtt WebsocketAPI.ApiId + Name: powertools + HandlerConfigs: + OnPublish: + Behavior: DIRECT + Integration: + DataSourceName: powertools_lambda + LambdaConfig: + InvokeType: REQUEST_RESPONSE + OnSubscribe: + Behavior: DIRECT + Integration: + DataSourceName: powertools_lambda + LambdaConfig: + InvokeType: REQUEST_RESPONSE + + DataSourceIAMRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: appsync.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: LambdaInvokePolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !GetAtt HelloWorldFunction.Arn diff --git a/examples/event_handler_appsync_events/src/accessing_event_and_context.py b/examples/event_handler_appsync_events/src/accessing_event_and_context.py index 85d48c23d85..c1a2ebf536f 100644 --- a/examples/event_handler_appsync_events/src/accessing_event_and_context.py +++ b/examples/event_handler_appsync_events/src/accessing_event_and_context.py @@ -11,10 +11,6 @@ app = AppSyncEventsResolver() -class ValidationError(Exception): - pass - - @app.on_publish("/default/channel1") def handle_channel1_publish(payload: dict[str, Any]): # Access the full event and context diff --git a/examples/event_handler_appsync_events/src/fail_entire_batch.py b/examples/event_handler_appsync_events/src/fail_entire_batch.py new file mode 100644 index 00000000000..10cf8fce73f --- /dev/null +++ b/examples/event_handler_appsync_events/src/fail_entire_batch.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.event_handler import AppSyncEventsResolver + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() +logger = Logger() + + +class ChannelException(Exception): + pass + + +@app.on_publish("/default/*", aggregate=True) +def handle_default_namespace_batch(payload: list[dict[str, Any]]): + results: list = [] + + # Process all events in the batch together + for event in payload: + try: + # Process each event + results.append({"id": event.get("id"), "payload": {"processed": True, "originalEvent": event}}) + except Exception as e: + logger.error("Found and error") + raise ChannelException("An exception occurred") from e + + return results + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/fail_entire_batch_response.json b/examples/event_handler_appsync_events/src/fail_entire_batch_response.json new file mode 100644 index 00000000000..babd5b4bf29 --- /dev/null +++ b/examples/event_handler_appsync_events/src/fail_entire_batch_response.json @@ -0,0 +1,3 @@ +{ + "error": "ChannelException - An exception occurred" +} diff --git a/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py b/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py index bd4fa00142f..8f40a4759a2 100644 --- a/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py +++ b/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py @@ -11,7 +11,7 @@ @app.on_publish("/default/channel") -def handle_channel1_publish(payload: dict[str, Any]): +def handle_channel1_publish(payload: dict[str, Any]): # (1)! # Process the payload for this specific channel return { "processed": True, diff --git a/examples/event_handler_appsync_events/src/working_with_aggregated_events.py b/examples/event_handler_appsync_events/src/working_with_aggregated_events.py index 6e59ba9718b..a5dee22da6a 100644 --- a/examples/event_handler_appsync_events/src/working_with_aggregated_events.py +++ b/examples/event_handler_appsync_events/src/working_with_aggregated_events.py @@ -20,12 +20,11 @@ def marshall(item: dict[str, Any]) -> dict[str, Any]: @app.on_publish("/default/foo/*", aggregate=True) -async def handle_default_namespace_batch(payload: list[dict[str, Any]]): +async def handle_default_namespace_batch(payload: list[dict[str, Any]]): # (1)! write_operations: list = [] write_operations.extend({"PutRequest": {"Item": marshall(item)}} for item in payload) - # Executar operação de lote no DynamoDB if write_operations: dynamodb.batch_write_item( RequestItems={ diff --git a/examples/event_handler_appsync_events/src/working_with_async_resolvers.py b/examples/event_handler_appsync_events/src/working_with_async_resolvers.py index b34645f1e74..3ed8dbe517d 100644 --- a/examples/event_handler_appsync_events/src/working_with_async_resolvers.py +++ b/examples/event_handler_appsync_events/src/working_with_async_resolvers.py @@ -3,7 +3,7 @@ import asyncio from typing import TYPE_CHECKING, Any -from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] +from aws_lambda_powertools.event_handler import AppSyncEventsResolver if TYPE_CHECKING: from aws_lambda_powertools.utilities.typing import LambdaContext @@ -13,8 +13,7 @@ @app.async_on_publish("/default/channel1") async def handle_channel1_publish(payload: dict[str, Any]): - result = await async_process_data(payload) - return result + return await async_process_data(payload) async def async_process_data(payload: dict[str, Any]): diff --git a/examples/event_handler_appsync_events/src/working_with_authorization_control.py b/examples/event_handler_appsync_events/src/working_with_authorization_control.py new file mode 100644 index 00000000000..86858b762e7 --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_authorization_control.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver +from aws_lambda_powertools.event_handler.events_appsync.exceptions import UnauthorizedException + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.on_publish("/default/foo") +def handle_specific_channel(payload: dict[str, Any]): + return payload + + +@app.on_publish("/*") +def handle_root_channel(payload: dict[str, Any]): + raise UnauthorizedException("You can only publish to /default/foo") + + +@app.on_subscribe("/default/foo") +def handle_subscription_specific_channel(): + return True + + +@app.on_subscribe("/*") +def handle_subscription_root_channel(): + raise UnauthorizedException("You can only subscribe to /default/foo") + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context)