Skip to content

Commit

Permalink
Merge pull request #133 from filip-michalsky/async_server
Browse files Browse the repository at this point in the history
Async server
  • Loading branch information
filip-michalsky authored Mar 26, 2024
2 parents 2cdb631 + da89971 commit 8c0b077
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
!.github/workflows

/my-app/node_modules

examples/sales_agent_bedrock.ipynb

*.log
Expand Down
78 changes: 66 additions & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pytest-cov = "^4.1.0"
pytest-asyncio = "^0.23.1"
langchain-openai = "0.0.2"
tokenizers = "^0.15.2"
boto3 = "^1.34.70"
boto3 = ">=1.33.2,<1.34.35"
aioboto3 = "^12.3.0"

[tool.poetry.group.dev.dependencies]
black = "^23.11.0"
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pydantic>=2.5.2
litellm>=1.10.2
ipykernel>=6.27.1
langchain-openai==0.0.2
boto3
boto3>=1.33.2,<1.34.35
aioboto3==12.3.0
2 changes: 1 addition & 1 deletion run_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def stream_response():

return StreamingResponse(stream_response())
else:
response = sales_api.do(req.human_say)
response = await sales_api.do(req.human_say)
return response


Expand Down
111 changes: 99 additions & 12 deletions salesgpt/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,51 @@ def determine_conversation_stage(self):

print(f"Conversation Stage: {self.current_conversation_stage}")

@time_logger
async def adetermine_conversation_stage(self):
"""
Determines the current conversation stage based on the conversation history.
This method uses the stage_analyzer_chain to analyze the conversation history and determine the current stage.
The conversation history is joined into a single string, with each entry separated by a newline character.
The current conversation stage ID is also passed to the stage_analyzer_chain.
The method then prints the determined conversation stage ID and retrieves the corresponding conversation stage
from the conversation_stage_dict dictionary using the retrieve_conversation_stage method.
Finally, the method prints the determined conversation stage.
Returns:
None
"""
print(f"Conversation Stage ID before analysis: {self.conversation_stage_id}")
print("Conversation history:")
print(self.conversation_history)
stage_analyzer_output = await self.stage_analyzer_chain.ainvoke(
input={
"conversation_history": "\n".join(self.conversation_history).rstrip(
"\n"
),
"conversation_stage_id": self.conversation_stage_id,
"conversation_stages": "\n".join(
[
str(key) + ": " + str(value)
for key, value in CONVERSATION_STAGES.items()
]
),
},
return_only_outputs=False,
)
print("Stage analyzer output")
print(stage_analyzer_output)
self.conversation_stage_id = stage_analyzer_output.get("text")

self.current_conversation_stage = self.retrieve_conversation_stage(
self.conversation_stage_id
)

print(f"Conversation Stage: {self.current_conversation_stage}")

def human_step(self, human_input):
"""
Processes the human input and appends it to the conversation history.
Expand Down Expand Up @@ -225,28 +270,70 @@ async def astep(self, stream: bool = False):
Generator: A streaming generator object if stream is set to True. Otherwise, it returns None.
"""
if not stream:
self._acall(inputs={})
return await self.acall(inputs={})
else:
return await self._astreaming_generator()

@time_logger
def acall(self, *args, **kwargs):
async def acall(self, inputs: Dict[str, Any]) -> Dict[str, Any]:

"""
This method is currently not implemented.
Executes one step of the sales agent.
This function overrides the input temporarily with the current state of the conversation,
generates the agent's utterance using either the sales agent executor or the sales conversation utterance chain,
adds the agent's response to the conversation history, and returns the AI message.
Parameters
----------
\*args : tuple
Variable length argument list.
\*\*kwargs : dict
Arbitrary keyword arguments.
inputs : Dict[str, Any]
The initial inputs for the sales agent.
Returns
-------
Dict[str, Any]
The AI message generated by the sales agent.
Raises
------
NotImplementedError
Indicates that this method has not been implemented yet.
"""
raise NotImplementedError("This method has not been implemented yet.")
# override inputs temporarily
inputs = {
"input": "",
"conversation_stage": self.current_conversation_stage,
"conversation_history": "\n".join(self.conversation_history),
"salesperson_name": self.salesperson_name,
"salesperson_role": self.salesperson_role,
"company_name": self.company_name,
"company_business": self.company_business,
"company_values": self.company_values,
"conversation_purpose": self.conversation_purpose,
"conversation_type": self.conversation_type,
}

# Generate agent's utterance
if self.use_tools:
ai_message = await self.sales_agent_executor.ainvoke(inputs)
output = ai_message["output"]
else:
ai_message = await self.sales_conversation_utterance_chain.ainvoke(
inputs, return_intermediate_steps=True
)
output = ai_message["text"]

# Add agent's response to conversation history
agent_name = self.salesperson_name
output = agent_name + ": " + output
if "<END_OF_TURN>" not in output:
output += " <END_OF_TURN>"
self.conversation_history.append(output)

if self.verbose:
tool_status = "USE TOOLS INVOKE:" if self.use_tools else "WITHOUT TOOLS:"
print(f"{tool_status}\n#\n#\n#\n#\n------------------")
print(f"AI Message: {ai_message}")
print()
print(f"Output: {output.replace('<END_OF_TURN>', '')}")

return ai_message

@time_logger
def _prep_messages(self):
Expand Down
69 changes: 69 additions & 0 deletions salesgpt/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,72 @@ def _generate(
message = AIMessage(content=content)
generation = ChatGeneration(message=message)
return ChatResult(generations=[generation])

async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
stream: Optional[bool] = None,
**kwargs: Any,
) -> ChatResult:
should_stream = stream if stream is not None else self.streaming
if should_stream:
raise NotImplementedError("Streaming not implemented")

last_message = messages[-1]

print(messages)
response = await acompletion_bedrock(
model_id=self.model,
system_prompt=self.system_prompt,
messages=[{"content": last_message.content, "role": "user"}],
max_tokens=1000,
)
print("output", response)
content = response["content"][0]["text"]
message = AIMessage(content=content)
generation = ChatGeneration(message=message)
return ChatResult(generations=[generation])

# message_dicts, params = self._create_message_dicts(messages, stop)
# params = {
# **params,
# **({"stream": stream} if stream is not None else {}),
# **kwargs,
# }
# response = await self.async_client.create(messages=message_dicts, **params)
# return self._create_chat_result(response)

import aioboto3
import os
import json

async def acompletion_bedrock(model_id, system_prompt, messages, max_tokens=1000):
"""
High-level API call to generate a message with Anthropic Claude, refactored for async.
"""
session = aioboto3.Session()
async with session.client(service_name="bedrock-runtime", region_name=os.environ.get("AWS_REGION_NAME")) as bedrock_runtime:

body = json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"system": system_prompt,
"messages": messages,
}
)

response = await bedrock_runtime.invoke_model(body=body, modelId=model_id)

# print('RESPONSE', response)

# Correctly handle the streaming body
response_body_bytes = await response['body'].read()
# print('RESPONSE BODY', response_body_bytes)
response_body = json.loads(response_body_bytes.decode("utf-8"))
# print('RESPONSE BODY', response_body)

return response_body

Loading

0 comments on commit 8c0b077

Please sign in to comment.