-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbedrock_agent.py
183 lines (149 loc) · 5.3 KB
/
bedrock_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from boto3.session import Session
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
import json
import os
from requests import request
import base64
import io
import sys
#For this to run on a local machine in VScode, you need to set the AWS_PROFILE environment variable to the name of the profile/credentials you want to use.
#You also need to input your model ID near the bottom of this file.
#check for credentials
#echo $AWS_ACCESS_KEY_ID
#echo $AWS_SECRET_ACCESS_KEY
#echo $AWS_SESSION_TOKEN
#os.environ["AWS_PROFILE"] = "agent-demo"
theRegion = 'us-east-1'
agentId = 'JR3RMLFNU8'
agentAliasId = 'EDC5BNXEDX'
llm_response = ""
def sigv4_request(
url,
method='GET',
body=None,
params=None,
headers=None,
service='execute-api',
region=theRegion,
credentials=Session().get_credentials().get_frozen_credentials()
):
"""Sends an HTTP request signed with SigV4
Args:
url: The request URL (e.g. 'https://www.example.com').
method: The request method (e.g. 'GET', 'POST', 'PUT', 'DELETE'). Defaults to 'GET'.
body: The request body (e.g. json.dumps({ 'foo': 'bar' })). Defaults to None.
params: The request query params (e.g. { 'foo': 'bar' }). Defaults to None.
headers: The request headers (e.g. { 'content-type': 'application/json' }). Defaults to None.
service: The AWS service name. Defaults to 'execute-api'.
region: The AWS region id. Defaults to the env var 'AWS_REGION'.
credentials: The AWS credentials. Defaults to the current boto3 session's credentials.
Returns:
The HTTP response
"""
# sign request
req = AWSRequest(
method=method,
url=url,
data=body,
params=params,
headers=headers
)
SigV4Auth(credentials, service, region).add_auth(req)
req = req.prepare()
# send request
return request(
method=req.method,
url=req.url,
headers=req.headers,
data=req.body
)
def askQuestion(question, url, endSession=False):
myobj = {
"inputText": question,
"enableTrace": True,
"endSession": endSession
}
# send request
response = sigv4_request(
url,
method='POST',
service='bedrock',
headers={
'content-type': 'application/json',
'accept': 'application/json',
},
region=theRegion,
body=json.dumps(myobj)
)
return decode_response(response)
def decode_response(response):
# Create a StringIO object to capture print statements
captured_output = io.StringIO()
sys.stdout = captured_output
# Your existing logic
string = ""
for line in response.iter_content():
try:
string += line.decode(encoding='utf-8')
except:
continue
print("Decoded response", string)
split_response = string.split(":message-type")
print(f"Split Response: {split_response}")
print(f"length of split: {len(split_response)}")
for idx in range(len(split_response)):
if "bytes" in split_response[idx]:
#print(f"Bytes found index {idx}")
encoded_last_response = split_response[idx].split("\"")[3]
decoded = base64.b64decode(encoded_last_response)
final_response = decoded.decode('utf-8')
print(final_response)
else:
print(f"no bytes at index {idx}")
print(split_response[idx])
last_response = split_response[-1]
print(f"Lst Response: {last_response}")
if "bytes" in last_response:
print("Bytes in last response")
encoded_last_response = last_response.split("\"")[3]
decoded = base64.b64decode(encoded_last_response)
final_response = decoded.decode('utf-8')
else:
print("no bytes in last response")
part1 = string[string.find('finalResponse')+len('finalResponse":'):]
part2 = part1[:part1.find('"}')+2]
final_response = json.loads(part2)['text']
final_response = final_response.replace("\"", "")
final_response = final_response.replace("{input:{value:", "")
final_response = final_response.replace(",source:null}}", "")
llm_response = final_response
# Restore original stdout
sys.stdout = sys.__stdout__
# Get the string from captured output
captured_string = captured_output.getvalue()
# Return both the captured output and the final response
return captured_string, llm_response
def lambda_handler(event, context):
sessionId = event["sessionId"]
question = event["question"]
endSession = False
print(f"Session: {sessionId} asked question: {question}")
try:
if (event["endSession"] == "true"):
endSession = True
except:
endSession = False
url = f'https://bedrock-agent-runtime.{theRegion}.amazonaws.com/agents/{agentId}/agentAliases/{agentAliasId}/sessions/{sessionId}/text'
try:
response, trace_data = askQuestion(question, url, endSession)
return {
"status_code": 200,
"body": json.dumps({"response": response, "trace_data": trace_data})
}
except Exception as e:
return {
"status_code": 500,
"body": json.dumps({"error": str(e)})
}