Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 增加 AWS SQS 消息队列功能 #85

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .aws/petercat-preview.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ region = "ap-northeast-1"
confirm_changeset = true
capabilities = "CAPABILITY_IAM"
disable_rollback = true
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo"]
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/petercatapipreview49199518/fastapifunctionead79d0drepo", "SQSSubscriptionFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/petercatapipreview49199518/sqssubscriptionfunctiona2fc8b7drepo"]
2 changes: 1 addition & 1 deletion .aws/petercat-prod.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ region = "ap-northeast-1"
confirm_changeset = true
capabilities = "CAPABILITY_IAM"
disable_rollback = true
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo"]
image_repositories = ["FastAPIFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/fastapifunctionead79d0drepo", "SQSSubscriptionFunction=654654285942.dkr.ecr.ap-northeast-1.amazonaws.com/samapp7427b055/sqssubscriptionfunctiona2fc8b7drepo"]
12 changes: 0 additions & 12 deletions server/Dockerfile

This file was deleted.

5 changes: 5 additions & 0 deletions server/data_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ class Message(BaseModel):
class ChatData(BaseModel):
messages: list[Message] = []
prompt: str = None

class ExecuteMessage(BaseModel):
type: str
repo: str
path: str
26 changes: 14 additions & 12 deletions server/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
import uvicorn

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from data_class import DalleData, ChatData
from openai_api import dalle
from langchain_api import chat

from agent import stream
from uilts.env import get_env_variable
import uvicorn
from data_class import ChatData, ExecuteMessage
from message_queue.queue_wrapper import delete_messages, get_queue, receive_messages, send_message, unpack_message

open_api_key = get_env_variable("OPENAI_API_KEY")
sqs_queue_name = get_env_variable("PETERCAT_EX_SQS")

app = FastAPI(
title="Bo-meta Server",
Expand All @@ -30,15 +32,15 @@
def read_root():
return {"Hello": "World"}

@app.post("/api/dall-e")
def run_img_generator(input_data: DalleData):
result = dalle.img_generator(input_data, open_api_key)
return result
@app.post("/api/message")
def send_sqs_message(message: ExecuteMessage):
queue = get_queue(sqs_queue_name)
return send_message(queue=queue, message=message)

@app.post("/api/chat")
def run_langchain_chat(input_data: ChatData):
result = chat.langchain_chat(input_data, open_api_key)
return result
@app.get("/api/message/receive")
def receive_sqs_message():
queue = get_queue(sqs_queue_name)
return StreamingResponse(receive_messages(queue), media_type="text/event-stream")


@app.post("/api/chat/stream", response_class=StreamingResponse)
Expand Down
102 changes: 102 additions & 0 deletions server/message_queue/queue_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import json
import boto3
from botocore.exceptions import ClientError
import logging

from data_class import ExecuteMessage

logger = logging.getLogger(__name__)
sqs = boto3.resource("sqs")

def get_queue(name):
try:
queue = sqs.get_queue_by_name(QueueName=name)
except ClientError as error:
logger.exception("Couldn't get queue named %s.", name)
raise error
else:
return queue

def send_message(queue, message: ExecuteMessage, message_attributes=None):
if not message_attributes:
message_attributes = {
"type": { "StringValue": message.type, "DataType": "String" },
"repo": { "StringValue": message.repo, "DataType": "String" },
"path": { "StringValue": message.path, "DataType": "String" },
}

message_body = encode_message(message=message)

try:
response = queue.send_message(
MessageBody=message_body, MessageAttributes=message_attributes
)

except ClientError as error:
logger.exception("Send message failed: %s", message_body)
raise error
else:
return response

async def receive_messages(queue, max_number = 10, wait_time = 2):
try:
messages = queue.receive_messages(
MessageAttributeNames=["All"],
MaxNumberOfMessages=max_number,
WaitTimeSeconds=wait_time,
)
for msg in messages:
logger.info("Received message: %s: %s", msg.message_id, msg.body)
type, repo, path = unpack_message(msg)
yield json.dumps({ "type": type, "repo": repo, "path": path })
delete_messages(queue, messages)

except ClientError as error:
logger.exception("Couldn't receive messages from queue: %s", queue)
raise error


def delete_messages(queue, messages):
"""
Delete a batch of messages from a queue in a single request.

:param queue: The queue from which to delete the messages.
:param messages: The list of messages to delete.
:return: The response from SQS that contains the list of successful and failed
message deletions.
"""
try:
entries = [
{"Id": str(ind), "ReceiptHandle": msg.receipt_handle}
for ind, msg in enumerate(messages)
]
response = queue.delete_messages(Entries=entries)
if "Successful" in response:
for msg_meta in response["Successful"]:
logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle)
if "Failed" in response:
for msg_meta in response["Failed"]:
logger.warning(
"Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle
)
except ClientError:
logger.exception("Couldn't delete messages from queue %s", queue)
else:
return response

def encode_message(message: ExecuteMessage):
return json.dumps({
"type": message.type,
"repo": message.repo,
"path": message.path,
})

def unpack_message(msg):
if (msg is None):
return (f"", f"", f"")
else:
return (
msg.message_attributes["type"]["StringValue"],
msg.message_attributes["repo"]["StringValue"],
msg.message_attributes["path"]["StringValue"],
)
1 change: 1 addition & 0 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ PyGithub
python-multipart
httpx[socks]
load_dotenv
boto3>=1.26.79
13 changes: 13 additions & 0 deletions subscriber/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM public.ecr.aws/lambda/python:3.12

# Copy requirements.txt
COPY requirements.txt ${LAMBDA_TASK_ROOT}

# Install the specified packages
RUN pip install -r requirements.txt

# Copy function code
COPY sqs_subscriber.py ${LAMBDA_TASK_ROOT}

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "sqs_subscriber.lambda_handler" ]
Empty file added subscriber/requirements.txt
Empty file.
16 changes: 16 additions & 0 deletions subscriber/sqs_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json

def lambda_handler(event, context):
if event:
batch_item_failures = []
sqs_batch_response = {}

for record in event["Records"]:
try:
# process message
print(f"receive message here")
except Exception as e:
batch_item_failures.append({"itemIdentifier": record['messageId']})

sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
29 changes: 28 additions & 1 deletion template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,37 @@ Resources:
DockerContext: server
DockerTag: v1

SQSSubscriptionFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
MemorySize: 512
FunctionUrlConfig:
AuthType: NONE
Policies:
- Statement:
- Sid: BedrockInvokePolicy
Effect: Allow
Action:
- bedrock:InvokeModelWithResponseStream
Resource: '*'
Tracing: Active
Metadata:
Dockerfile: Dockerfile
DockerContext: subscriber
DockerTag: v1

Outputs:
FastAPIFunctionUrl:
Description: "Function URL for FastAPI function"
Value: !GetAtt FastAPIFunctionUrl.FunctionUrl
FastAPIFunction:
Description: "FastAPI Lambda Function ARN"
Value: !GetAtt FastAPIFunction.Arn
Value: !GetAtt FastAPIFunction.Arn

SQSSubscriptionFunctionUrl:
Description: "Function URL for SQS Subscriptio function"
Value: !GetAtt FastAPIFunctionUrl.FunctionUrl
SQSSubscriptionFunction:
Description: "SQS Subscription Function Lambda Function ARN"
Value: !GetAtt SQSSubscriptionFunction.Arn