From bf4e83added67b01f167a96f3f6265e88d7e79e6 Mon Sep 17 00:00:00 2001 From: Phil Callister Date: Wed, 23 Oct 2024 16:05:49 -0500 Subject: [PATCH 1/6] Initial Commit... --- .../.gitignore | 10 + .../README.md | 108 +++++++ .../cdk/.npmignore | 6 + .../cdk/bin/bedrock-streamer.ts | 21 ++ .../cdk/cdk.json | 71 ++++ .../lib/bedrock-streamer-lambda/.gitignore | 1 + .../lib/bedrock-streamer-lambda/Cargo.toml | 24 ++ .../lib/bedrock-streamer-lambda/src/main.rs | 306 ++++++++++++++++++ .../cdk/lib/bedrock-streamer-stack.ts | 69 ++++ .../cdk/package.json | 24 ++ .../cdk/tsconfig.json | 31 ++ .../example-pattern.json | 68 ++++ 12 files changed, 739 insertions(+) create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/.gitignore create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/README.md create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/.npmignore create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/cdk.json create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/.gitignore create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/Cargo.toml create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-stack.ts create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/package.json create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/tsconfig.json create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/.gitignore b/apigw-websocket-api-bedrock-streaming-rust-cdk/.gitignore new file mode 100644 index 000000000..2cdcf27b8 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/.gitignore @@ -0,0 +1,10 @@ +*.js +!jest.config.js +*.d.ts +node_modules +.DS_Store + +# CDK asset staging directory +.cdk.staging +cdk.out + diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md b/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md new file mode 100644 index 000000000..2e944149c --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md @@ -0,0 +1,108 @@ +# API Gateway WebSocket API to Lambda to Bedrock Streamed Response + +This CDK application demonstrates a simple, serverless approach to integrating Amazon Bedrock with AWS Lambda and Amazon API Gateway. Written in Rust, it showcases how to efficiently stream responses from Amazon Bedrock to a client via WebSocket connections. The example serves as a practical illustration of implementing real-time, serverless communication between Bedrock's GenAI capabilities and a client application. + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [Node and NPM](https://nodejs.org/en/download/) installed +* [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) (AWS CDK) installed +* [Docker](https://docs.docker.com/engine/install/) installed and running locally (needed for Rust cross-platform Lambda build) +* [Rust](https://www.rust-lang.org/) 🦀 installed with v1.79.0 or higher +* [Cargo Lambda](https://www.cargo-lambda.info/) installed +* [wscat](https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-how-to-call-websocket-api-wscat.html) installed for CLI WebSocket capabilities + + +## Amazon Bedrock Setup Instructions + +You must request access to the Bedrock LLM model before you can use it. This example uses `Claude 3 Sonnet`, so make sure you have `Access granted` to this model. For more information, see [Model access](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html). + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ```bash + git clone https://github.com/aws-samples/serverless-patterns + ``` +2. Change directory to the pattern's CDK directory: + ```bash + cd apigw-websocket-api-bedrock-streaming-rust-cdk/cdk + ``` +3. From the command line, use npm to install the development dependencies: + ```bash + npm install + ``` +4. If you haven't done so previously for this account, run this command to bootstrap CDK: + ```bash + cdk bootstrap + ``` +5. Review the CloudFormation template that CDK generates for your stack using the following AWS CDK CLI command: + ```bash + cdk synth + ``` +6. Use AWS CDK to deploy your AWS resources + ```bash + cdk deploy + ``` + + After the deployment completes, note the URL in the `Outputs` section at the end. The `BedrockStreamerStack.WebSocketURL` followed by the WebSocket URL will be used to connect to API Gateway. It should look something like `wss://{YOUR_API_ID_HERE}.execute-api.{YOUR_REGION_HERE}.amazonaws.com/prod` + +## How it works + +This pattern establishes a WebSocket connection to Amazon API Gateway. When requests are made to this API, API Gateway routes them to an AWS Lambda function. The Lambda function then initiates a streaming request to Amazon Bedrock using the [ConverseStream](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) API. This allows the response from the LLM in Bedrock to start streaming back to the Lambda function as soon as generation begins, without waiting for the entire response to be ready. + +Generating Bedrock responses often takes a long time. By using a streaming approach, the Lambda function can immediately start processing the incoming response and writing the data chunks to the API Gateway WebSocket. The WebSocket then delivers these chunks in real-time to the connected client, providing an extremely interactive user experience. + +## Testing + +1. From your terminal, use `wscat` to connect to API Gateway using the WebSocket API and generate a short story about `CATS` by entering the `{"storyType": "CATS"}` line after `wscat` startup. + ```bash + # Connect to the API Gateway via WebSocket + wscat -c + + Connected (press CTRL+C to quit) + > {"storyType": "CATS"} <--- ENTER THIS...PRESS RETURN + < {"type":"other","message":null} + < {"type":"text","message":"Here"} + < {"type":"text","message":" is"} + < {"type":"text","message":" a"} + < {"type":"text","message":" very"} + < {"type":"text","message":" short"} + < {"type":"text","message":" story"} + < {"type":"text","message":" about"} + < {"type":"text","message":" cats"} + < {"type":"text","message":":"} + < {"type":"text","message":"\n\nMitt"} + < {"type":"text","message":"ens"} + < {"type":"text","message":" cur"} + < {"type":"text","message":"le"} + < {"type":"text","message":"d up"} + < {"type":"text","message":" on"} + < {"type":"text","message":" the"} + < {"type":"text","message":" window"} + < {"type":"text","message":"s"} + < {"type":"text","message":"ill"} + < {"type":"text","message":","} + . + . + < {"type":"other","message":null} + < {"type":"other","message":null} + < {"type":"other","message":null} + > ⏎ <--- CTRL+C HERE> + ``` +2. As the `wscat` CLI says, press `CTRL+C` to disconnect + +## Cleanup + +You can use the following commands to destroy the AWS resources created during deployment. This assumes you're currently at the `apigw-websocket-api-bedrock-streaming-rust-cdk/cdk` directory in your terminal: + +```bash +cdk destroy +``` +---- +Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/.npmignore b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/.npmignore new file mode 100644 index 000000000..c1d6d45dc --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/.npmignore @@ -0,0 +1,6 @@ +*.ts +!*.d.ts + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts new file mode 100644 index 000000000..3ac6983dd --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts @@ -0,0 +1,21 @@ +#!/usr/bin/env node +import 'source-map-support/register'; +import * as cdk from 'aws-cdk-lib'; +import { BedrockStreamerStack } from '../lib/bedrock-streamer-stack'; + +const app = new cdk.App(); +new BedrockStreamerStack(app, 'BedrockStreamerStack', { + /* If you don't specify 'env', this stack will be environment-agnostic. + * Account/Region-dependent features and context lookups will not work, + * but a single synthesized template can be deployed anywhere. */ + + /* Uncomment the next line to specialize this stack for the AWS Account + * and Region that are implied by the current CLI configuration. */ + // env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION }, + + /* Uncomment the next line if you know exactly what Account and Region you + * want to deploy the stack to. */ + // env: { account: '123456789012', region: 'us-east-1' }, + + /* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */ +}); \ No newline at end of file diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/cdk.json b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/cdk.json new file mode 100644 index 000000000..18218baed --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/cdk.json @@ -0,0 +1,71 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/bedrock-streamer.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false + } +} diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/.gitignore b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/.gitignore new file mode 100644 index 000000000..c41cc9e35 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/.gitignore @@ -0,0 +1 @@ +/target \ No newline at end of file diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/Cargo.toml b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/Cargo.toml new file mode 100644 index 000000000..823f21cac --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "bedrock-streamer" +version = "0.1.0" +edition = "2021" + +[dependencies] +aws_lambda_events = { version = "0.11.1", default-features = false, features = ["apigw"] } +aws-config = "1.1.1" +aws-sdk-bedrockruntime = "1.53.0" +aws-sdk-apigatewaymanagement = "1.39.0" +bytes = "1.5.0" +http = "1.1.0" +lambda_runtime = "0.9.1" +serde = { version = "1.0.193", features = ["derive"] } +serde_json = "1.0.108" +tokio = { version = "1.34.0", features = ["full"] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } + +[profile.release] +opt-level = "z" +strip = true +lto = true +codegen-units = 1 \ No newline at end of file diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs new file mode 100644 index 000000000..bcc7775a8 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs @@ -0,0 +1,306 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use aws_config::{meta::region::RegionProviderChain, BehaviorVersion, SdkConfig}; +use aws_lambda_events::apigw::ApiGatewayWebsocketProxyRequest; +use aws_sdk_apigatewaymanagement::{ + config::Builder, primitives::Blob as ApiGatewayBlob, Client as ApiGatewayManagementClient, +}; +use aws_sdk_bedrockruntime::{ + types::{ContentBlock, ConversationRole, ConverseStreamOutput, Message}, + Client, +}; +use http::Uri; +use lambda_runtime::{service_fn, Error as LambdaError, LambdaEvent}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use tracing::{error, info}; + +const MODEL_ID: &str = "anthropic.claude-3-haiku-20240307-v1:0"; + +/// Bedrock story +#[derive(Debug, Deserialize)] +struct StoryRequest { + #[serde(rename = "storyType")] + story_type: String, +} + +/// Amazon API Gateway response +#[derive(serde::Serialize)] +struct ApiGatewayResponse { + #[serde(rename = "statusCode")] + status_code: u16, + body: String, +} + +/// Response for each stream record sent from Amazon Bedrock +#[derive(Debug, Serialize)] +struct BedrockResponse { + #[serde(rename = "type")] + response_type: String, + message: Option, +} + +/// Main Lambda handler here... +async fn function_handler( + event: LambdaEvent, +) -> Result { + let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); + let config = aws_config::defaults(BehaviorVersion::latest()) + .region(region_provider) + .load() + .await; + + let connection_id = event + .payload + .request_context + .connection_id + .as_deref() + .ok_or_else(|| LambdaError::from("Missing connection_id"))?; + let domain_name = event + .payload + .request_context + .domain_name + .as_deref() + .ok_or_else(|| LambdaError::from("Missing domain_name"))?; + let stage = event + .payload + .request_context + .stage + .as_deref() + .ok_or_else(|| LambdaError::from("Missing stage"))?; + + match event.payload.request_context.route_key.as_deref() { + Some("$connect") => handle_connect(connection_id).await, + Some("$disconnect") => handle_disconnect(connection_id).await, + Some("$default") => { + let bedrock_client = Client::new(&config); + let apigw_client = api_gateway_config(domain_name, stage, &config)?; + let request_body = event.payload.body; + + handle_default(bedrock_client, apigw_client, connection_id, request_body).await + } + Some(_) | None => Ok(ApiGatewayResponse { + status_code: 400, + body: "Unknown route".into(), + }), + } +} + +fn api_gateway_config( + domain_name: &str, + stage: &str, + config: &SdkConfig, +) -> Result { + let endpoint = format!("https://{}/{}", domain_name, stage); + let uri = match endpoint.parse::() { + Ok(uri) => uri, + Err(e) => { + return Err(LambdaError::from(format!( + "Failed to parse endpoint URI: {:?}", + e + ))) + } + }; + let apigw_client = ApiGatewayManagementClient::from_conf( + Builder::from(config).endpoint_url(uri.to_string()).build(), + ); + Ok(apigw_client) +} + +/// Handle WebSocket connection +async fn handle_connect(connection_id: &str) -> Result { + info!("Handle $connect: {}", connection_id); + Ok(ApiGatewayResponse { + status_code: 200, + body: "Connected...: $connect".into(), + }) +} + +/// Handle WebSocket disconnect +async fn handle_disconnect(connection_id: &str) -> Result { + info!("Handle $disconnect: {}", connection_id); + Ok(ApiGatewayResponse { + status_code: 200, + body: "Disconnected...: $disconnect".into(), + }) +} + +/// Handle WebSocket default message +async fn handle_default( + bedrock_client: Client, + apigw_client: ApiGatewayManagementClient, + connection_id: &str, + request_body: Option, +) -> Result { + info!("Handle $default: {}", connection_id); + + // Parse the incoming JSON payload + let story_request: StoryRequest = match request_body { + Some(body) => serde_json::from_str(&body) + .map_err(|e| LambdaError::from(format!("Failed to parse request body: {:?}", e)))?, + None => Err(LambdaError::from("Missing request body"))?, + }; + + // Construct the prompt based on the type of story to create + let prompt: String = format!( + "Tell me a very short story about: {}", + story_request.story_type + ); + info!("Bedrock story prompt...: {}", prompt); + + // Start reading from Bedrock & writing the API GW + bedrock_websocket_pipeline( + prompt, + bedrock_client, + apigw_client, + connection_id.to_string(), + ) + .await?; + + Ok(ApiGatewayResponse { + status_code: 200, + body: "Message processed...: $default".into(), + }) +} + +/// Start the Bedrock + Websocket threads +async fn bedrock_websocket_pipeline( + prompt: String, + bedrock_client: Client, + apigw_client: ApiGatewayManagementClient, + connection_id: String, +) -> Result<(), LambdaError> { + info!("Starting Bedrock + WebSocket pipeline..."); + + let (sender, receiver) = mpsc::channel(32); // Adjust buffer size as needed + + let bedrock_task = + tokio::spawn(async move { process_bedrock_stream(sender, prompt, bedrock_client).await }); + + let websocket_task = + tokio::spawn(async move { send_to_websocket(receiver, apigw_client, connection_id).await }); + + // Wait for both tasks to complete + let (bedrock_result, websocket_result) = tokio::try_join!(bedrock_task, websocket_task) + .map_err(|e| LambdaError::from(format!("Task join error: {:?}", e)))?; + + // Propagate errors from the tasks + bedrock_result?; + websocket_result?; + + Ok(()) +} + +/// Process the Bedrock stream +#[tracing::instrument] +async fn process_bedrock_stream( + sender: mpsc::Sender, + prompt: String, + bedrock_client: Client, +) -> Result<(), LambdaError> { + let bedrock_response = bedrock_client + .converse_stream() + .model_id(MODEL_ID) + .messages( + Message::builder() + .role(ConversationRole::User) + .content(ContentBlock::Text(prompt)) + .build() + .map_err(|_| LambdaError::from("failed to build message"))?, + ) + .send() + .await; + + let mut converse_stream = match bedrock_response { + Ok(output) => Ok(output.stream), + Err(e) => { + error!("Error in Bedrock response: {:?}", e); + Err(LambdaError::from("Error in Bedrock response")) + } + }?; + + loop { + let token = converse_stream.recv().await; + match token { + Ok(Some(output)) => { + info!("Bedrock response: {:?}", output); + let response = get_response(output).map_err(LambdaError::from)?; + if let Err(e) = sender.send(response).await { + error!("Receiver dropped error: {:?}", e); + return Err(LambdaError::from( + "Receiver dropped error. Bedrock proccessing stopped.", + )); + } + } + Ok(None) => break, + Err(e) => { + error!("Error receiving stream: {:?}", e); + return Err(LambdaError::from("Error receiving stream")); + } + } + } + + info!("Bedrock stream processing complete..."); + Ok(()) +} + +fn get_response(output: ConverseStreamOutput) -> Result { + match output { + ConverseStreamOutput::ContentBlockDelta(event) => match event.delta() { + Some(delta) => { + let text = delta.as_text().map_err(|e| { + LambdaError::from(format!("Failed to get text from delta: {:?}", e)) + })?; + Ok(BedrockResponse { + response_type: "text".into(), + message: Some(text.to_string()), + }) + } + None => Ok(BedrockResponse { + response_type: "message".into(), + message: Some("".into()), + }), + }, + _ => Ok(BedrockResponse { + response_type: "other".into(), + message: None, + }), + } +} + +/// Process incoming Bedrock messages and send to WebSocket +#[tracing::instrument] +async fn send_to_websocket( + mut reciever: mpsc::Receiver, + apigw_client: ApiGatewayManagementClient, + connection_id: String, +) -> Result<(), LambdaError> { + while let Some(response) = reciever.recv().await { + info!("Sending to WebSocket: {:?}", response); + apigw_client + .post_to_connection() + .connection_id(&connection_id) + .data(ApiGatewayBlob::new( + serde_json::to_vec(&response).map_err(|e| LambdaError::from(e.to_string()))?, + )) + .send() + .await + .map_err(LambdaError::from)?; + } + + info!("WebSocket sender complete..."); + Ok(()) +} + +/// Lambda Entry +#[tokio::main] +async fn main() -> Result<(), LambdaError> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_target(false) + .without_time() + .init(); + + lambda_runtime::run(service_fn(function_handler)).await +} diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-stack.ts b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-stack.ts new file mode 100644 index 000000000..07ab3cece --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-stack.ts @@ -0,0 +1,69 @@ +import * as cdk from 'aws-cdk-lib'; +import * as apigatewayv2 from 'aws-cdk-lib/aws-apigatewayv2'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import { WebSocketLambdaIntegration } from 'aws-cdk-lib/aws-apigatewayv2-integrations'; +import { Construct } from 'constructs'; +import { RustFunction } from 'cargo-lambda-cdk'; + +export class BedrockStreamerStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + // Rust Lambda Functions + const streamingLambda = new RustFunction(this, 'BedrockStreamer', { + manifestPath: 'lib/bedrock-streamer-lambda/Cargo.toml', + bundling: { + architecture: lambda.Architecture.ARM_64, + cargoLambdaFlags: [ + '--compiler', + 'cross', + '--release', + ], + }, + timeout: cdk.Duration.seconds(60), + }); + + // Grant Lambda permission to invoke Bedrock + streamingLambda.addToRolePolicy(new iam.PolicyStatement({ + actions: ['bedrock:InvokeModelWithResponseStream'], + resources: ['*'], + })); + + // Create the WebSocket API + const webSocketApi = new apigatewayv2.WebSocketApi(this, 'WebSocketApi', { + connectRouteOptions: { + integration: new WebSocketLambdaIntegration('ConnectIntegration', streamingLambda), + }, + disconnectRouteOptions: { + integration: new WebSocketLambdaIntegration('DisconnectIntegration', streamingLambda), + }, + defaultRouteOptions: { + integration: new WebSocketLambdaIntegration('DefaultIntegration', streamingLambda), + }, + }); + + // Create a stage for the WebSocket API + const stage = new apigatewayv2.WebSocketStage(this, 'Stage', { + webSocketApi, + stageName: 'prod', + autoDeploy: true, + }); + + // Grant the Lambda function permission to manage WebSocket connections + streamingLambda.addToRolePolicy(new iam.PolicyStatement({ + actions: [ + 'execute-api:ManageConnections', + ], + resources: [ + `arn:aws:execute-api:${this.region}:${this.account}:${webSocketApi.apiId}/${stage.stageName}/*`, + ], + })); + + // Output the WebSocket URL + new cdk.CfnOutput(this, 'WebSocketURL', { + value: stage.url, + description: 'WebSocket URL', + }); + } +} \ No newline at end of file diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/package.json b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/package.json new file mode 100644 index 000000000..9730c4825 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/package.json @@ -0,0 +1,24 @@ +{ + "name": "bedrock-streamer", + "version": "0.1.0", + "bin": { + "bedrock-streamer": "bin/bedrock-streamer.js" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "cdk": "cdk" + }, + "devDependencies": { + "@types/node": "20.14.9", + "aws-cdk": "2.152.0", + "ts-node": "^10.9.2", + "typescript": "~5.5.3" + }, + "dependencies": { + "aws-cdk-lib": "2.152.0", + "cargo-lambda-cdk": "^0.0.22", + "constructs": "^10.0.0", + "source-map-support": "^0.5.21" + } +} diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/tsconfig.json b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/tsconfig.json new file mode 100644 index 000000000..aaa7dc510 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": [ + "es2020", + "dom" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +} diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json b/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json new file mode 100644 index 000000000..f4ca755b9 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "API Gateway WebSocket API to Lambda to Bedrock Streamed Response", + "description": "Create an API Gateway WebSocket API to Lambda function that streams a response from an Amazon Bedrock LLM.", + "language": "Rust", + "level": "200", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern establishes a WebSocket connection to Amazon API Gateway. When requests are made to this API, API Gateway", + "routes them to an AWS Lambda function. The Lambda function then initiates a streaming request to Amazon Bedrock using", + "the ConverseStream API. This allows the response from the LLM in Bedrock to start streaming back to the Lambda function", + "as soon as generation begins, without waiting for the entire response to be ready." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/apigw-websocket-api-bedrock-streaming-rust-cdk", + "templateURL": "serverless-patterns/apigw-websocket-api-bedrock-streaming-rust-cdk", + "projectFolder": "apigw-websocket-api-bedrock-streaming-rust-cdk", + "templateFile": "cdk/lib/bedrock-streamer-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "Working with WebSocket APIs", + "link": "https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api.html" + }, + { + "text": "Amazon Bedrock", + "link": "https://aws.amazon.com/bedrock/" + }, + { + "text": "Cloud Development Kit", + "link": "https://docs.aws.amazon.com/cdk/v2/guide/home.html" + }, + { + "text": "AWS SDK for Rust", + "link": "https://aws.amazon.com/sdk-for-rust/" + } + ] + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk delete." + ] + }, + "authors": [ + { + "name": "Phil Callister", + "image": "link-to-your-photo.jpg", + "bio": "Your bio.", + "linkedin": "linked-in-ID", + "twitter": "twitter-handle" + } + ] +} From fb5bb37b08cc199e623aaf1bb4a6192163fb4966 Mon Sep 17 00:00:00 2001 From: Phil Callister Date: Tue, 29 Oct 2024 15:55:35 -0500 Subject: [PATCH 2/6] Template update + WS constants --- .../cdk/lib/bedrock-streamer-lambda/src/main.rs | 10 +++++++--- .../example-pattern.json | 9 ++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs index bcc7775a8..7386297a8 100644 --- a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/lib/bedrock-streamer-lambda/src/main.rs @@ -18,6 +18,10 @@ use tracing::{error, info}; const MODEL_ID: &str = "anthropic.claude-3-haiku-20240307-v1:0"; +const WS_CONNECT: &str = "$connect"; +const WS_DISCONNECT: &str = "$disconnect"; +const WS_DEFAULT: &str = "$default"; + /// Bedrock story #[derive(Debug, Deserialize)] struct StoryRequest { @@ -71,9 +75,9 @@ async fn function_handler( .ok_or_else(|| LambdaError::from("Missing stage"))?; match event.payload.request_context.route_key.as_deref() { - Some("$connect") => handle_connect(connection_id).await, - Some("$disconnect") => handle_disconnect(connection_id).await, - Some("$default") => { + Some(WS_CONNECT) => handle_connect(connection_id).await, + Some(WS_DISCONNECT) => handle_disconnect(connection_id).await, + Some(WS_DEFAULT) => { let bedrock_client = Client::new(&config); let apigw_client = api_gateway_config(domain_name, stage, &config)?; let request_body = event.payload.body; diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json b/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json index f4ca755b9..ed874b6e7 100644 --- a/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/example-pattern.json @@ -53,16 +53,15 @@ }, "cleanup": { "text": [ - "Delete the stack: cdk delete." + "Delete the stack: cdk destroy." ] }, "authors": [ { "name": "Phil Callister", - "image": "link-to-your-photo.jpg", - "bio": "Your bio.", - "linkedin": "linked-in-ID", - "twitter": "twitter-handle" + "image": "https://media.licdn.com/dms/image/v2/D5603AQHElNV-QFIGQw/profile-displayphoto-shrink_200_200/profile-displayphoto-shrink_200_200/0/1725218566874?e=1735171200&v=beta&t=gr82L1aMjvNX8CZn1Nitg3fViTUYoPK_xnXNc8CdohQ", + "bio": "I'm an Enterprise Solutions Architect at AWS, with a focus on Financial Services. As a passionate builder, I enjoy helping customers create innovative solutions to achieve their business objectives.", + "linkedin": "https://www.linkedin.com/in/philcallister/" } ] } From 9acf4dbd16b0483110248f9a7ed5c92aafb6576f Mon Sep 17 00:00:00 2001 From: Phil Callister Date: Mon, 2 Dec 2024 12:27:39 -0600 Subject: [PATCH 3/6] Update README to include 'cross' dependency --- apigw-websocket-api-bedrock-streaming-rust-cdk/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md b/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md index 2e944149c..32e63d98b 100644 --- a/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md @@ -14,6 +14,7 @@ Important: this application uses various AWS services and there are costs associ * [Docker](https://docs.docker.com/engine/install/) installed and running locally (needed for Rust cross-platform Lambda build) * [Rust](https://www.rust-lang.org/) 🦀 installed with v1.79.0 or higher * [Cargo Lambda](https://www.cargo-lambda.info/) installed +* [cross](https://github.com/cross-rs/cross) compilation installed for Cargo Lambda: `cargo install cross --git https://github.com/cross-rs/cross` * [wscat](https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-how-to-call-websocket-api-wscat.html) installed for CLI WebSocket capabilities From 859da41284d52960437275f3d2c5dcc586752c1c Mon Sep 17 00:00:00 2001 From: Phil Callister Date: Tue, 3 Dec 2024 07:33:30 -0600 Subject: [PATCH 4/6] Update README with new title --- apigw-websocket-api-bedrock-streaming-rust-cdk/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md b/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md index 32e63d98b..d19ef8eb4 100644 --- a/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/README.md @@ -1,4 +1,4 @@ -# API Gateway WebSocket API to Lambda to Bedrock Streamed Response +# Streaming Amazon Bedrock response with Amazon API Gateway WebSocket API and AWS Lambda This CDK application demonstrates a simple, serverless approach to integrating Amazon Bedrock with AWS Lambda and Amazon API Gateway. Written in Rust, it showcases how to efficiently stream responses from Amazon Bedrock to a client via WebSocket connections. The example serves as a practical illustration of implementing real-time, serverless communication between Bedrock's GenAI capabilities and a client application. From 0c7b76447506a02b74b1b752c59464e0ee7595f3 Mon Sep 17 00:00:00 2001 From: ellisms <114107920+ellisms@users.noreply.github.com> Date: Tue, 3 Dec 2024 06:36:36 -0800 Subject: [PATCH 5/6] tagging --- .../cdk/bin/bedrock-streamer.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts index 3ac6983dd..c0285ec44 100644 --- a/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/cdk/bin/bedrock-streamer.ts @@ -4,7 +4,8 @@ import * as cdk from 'aws-cdk-lib'; import { BedrockStreamerStack } from '../lib/bedrock-streamer-stack'; const app = new cdk.App(); -new BedrockStreamerStack(app, 'BedrockStreamerStack', { +const description = "Serverlessland Bedrock streamining pattern. (uksb-1tthgi812) (tag:apigw-websocket-api-bedrock-streamining-rust-cdk)" +new BedrockStreamerStack(app, 'BedrockStreamerStack', {description:description /* If you don't specify 'env', this stack will be environment-agnostic. * Account/Region-dependent features and context lookups will not work, * but a single synthesized template can be deployed anywhere. */ @@ -18,4 +19,4 @@ new BedrockStreamerStack(app, 'BedrockStreamerStack', { // env: { account: '123456789012', region: 'us-east-1' }, /* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */ -}); \ No newline at end of file +}); From 6f4c7a0bba29d8b74c3be0682679d374982f8eb1 Mon Sep 17 00:00:00 2001 From: ellisms <114107920+ellisms@users.noreply.github.com> Date: Tue, 3 Dec 2024 07:00:31 -0800 Subject: [PATCH 6/6] publishing file --- ...socket-api-bedrock-streaming-rust-cdk.json | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 apigw-websocket-api-bedrock-streaming-rust-cdk/apigw-websocket-api-bedrock-streaming-rust-cdk.json diff --git a/apigw-websocket-api-bedrock-streaming-rust-cdk/apigw-websocket-api-bedrock-streaming-rust-cdk.json b/apigw-websocket-api-bedrock-streaming-rust-cdk/apigw-websocket-api-bedrock-streaming-rust-cdk.json new file mode 100644 index 000000000..e85112130 --- /dev/null +++ b/apigw-websocket-api-bedrock-streaming-rust-cdk/apigw-websocket-api-bedrock-streaming-rust-cdk.json @@ -0,0 +1,93 @@ +{ + "title": "Streaming Amazon Bedrock response with Amazon API Gateway", + "description": "Stream an Amazon Bedrock LLM response with API Gateway WebSocket API and AWS Lambda function .", + "language": "Rust", + "level": "200", + "framework": "CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern establishes a WebSocket connection to Amazon API Gateway. When requests are made to this API, API Gateway routes them to an AWS Lambda function.", + "The Lambda function then initiates a streaming request to Amazon Bedrock using the ConverseStream API. This allows the response from the LLM in Bedrock to start streaming back to the Lambda function as soon as generation begins, without waiting for the entire response to be ready." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/apigw-websocket-api-bedrock-streaming-rust-cdk", + "templateURL": "serverless-patterns/apigw-websocket-api-bedrock-streaming-rust-cdk", + "projectFolder": "apigw-websocket-api-bedrock-streaming-rust-cdk", + "templateFile": "cdk/lib/bedrock-streamer-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "Working with WebSocket APIs", + "link": "https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api.html" + }, + { + "text": "Amazon Bedrock", + "link": "https://aws.amazon.com/bedrock/" + }, + { + "text": "Cloud Development Kit", + "link": "https://docs.aws.amazon.com/cdk/v2/guide/home.html" + }, + { + "text": "AWS SDK for Rust", + "link": "https://aws.amazon.com/sdk-for-rust/" + } + ] + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy." + ] + }, + "authors": [ + { + "name": "Phil Callister", + "image": "https://media.licdn.com/dms/image/v2/D5603AQHElNV-QFIGQw/profile-displayphoto-shrink_200_200/profile-displayphoto-shrink_200_200/0/1725218566874?e=1735171200&v=beta&t=gr82L1aMjvNX8CZn1Nitg3fViTUYoPK_xnXNc8CdohQ", + "bio": "I'm an Enterprise Solutions Architect at AWS, with a focus on Financial Services. As a passionate builder, I enjoy helping customers create innovative solutions to achieve their business objectives.", + "linkedin": "philcallister" + } + ], + "patternArch": { + "icon1": { + "x": 20, + "y": 50, + "service": "apigw", + "label": "API Gateway Websocket API" + }, + "icon2": { + "x": 50, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "icon3": { + "x": 80, + "y": 50, + "service": "bedrock", + "label": "Amazon Bedrock" + }, + "line1": { + "from": "icon1", + "to": "icon2" + }, + "line3": { + "from": "icon2", + "to": "icon3" + } + } +}