Skip to content

Commit

Permalink
temp_save
Browse files Browse the repository at this point in the history
  • Loading branch information
dcstrange committed Jan 24, 2024
1 parent b55614a commit 34da1ad
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 32 deletions.
51 changes: 31 additions & 20 deletions agency_swarm/agency/agency.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, agency_chart, shared_instructions="", shared_files=None):
self._parse_agency_chart(agency_chart)
self._create_send_message_tools()
self._init_agents()
self._init_sessions()
#self._init_sessions() // No need to init sessions, cuz it is created dynamically in tasks.

self.user = User()
self.entrance_session = Session(self.user, self.ceo)
Expand All @@ -60,8 +60,11 @@ def get_completion(self, message: str, message_files=None,
Returns:
Generator or final response: Depending on the 'yield_messages' flag, this method returns either a generator yielding intermediate messages or the final response from the entrance session.
"""
gen = self.entrance_session.get_completion(message=message, message_files=message_files, yield_messages=yield_messages)

gen = self.entrance_session.get_completion(message=message,
message_files=message_files,
topic="talk_to_user",
is_persist=True,
yield_messages=yield_messages)
if not yield_messages:
while True:
try:
Expand Down Expand Up @@ -348,9 +351,9 @@ def check_caller_agent_name(cls, value):
return value

def run(self, caller_thread):
session = caller_thread.sessions[self.recipient.value]

if session is None:
if self.recipient.value in caller_thread.sessions.keys():
session = caller_thread.sessions[self.recipient.value]
else:
session = Session(caller_agent=self.caller_agent, # TODO: check this parameter if error.
recipient_agent=outer_self.get_agent_by_name(self.recipient.value),
caller_thread=caller_thread)
Expand All @@ -362,10 +365,18 @@ def run(self, caller_thread):
#===================# python.thread.create()====================================
# TODO: 创建新的Python线程执行session
caller_thread.session_as_sender = session
response = session.get_completion(message=self.message, message_files=self.message_files)
gen = session.get_completion(message=self.message, message_files=self.message_files)
try:
while True:
yield next(gen)
except StopIteration as e:
message = e.value
except Exception as e:
print(f"Exception{inspect.currentframe().f_code.co_name}{str(e)}")
raise e
#======================# python.thread.wait_to_join()=================================

return response or ""
return message or ""

# TODO: 每个Agent有自己的SendMessage对象。但是当前这个版本认为一个Agent在某一时刻只能有一个SendMessage函数被调用。
# 实际上,在Session模型中,一个Agent有多个Thread,因此可能会有多个SendMessage并行。所以需要注意全局变量的使用。
Expand Down Expand Up @@ -404,21 +415,21 @@ def _init_agents(self):

agent.init_oai()

def _init_sessions(self):
"""
Initializes sessions for communication between agents within the agency.
# def _init_sessions(self):
# """
# Initializes sessions for communication between agents within the agency.

This method creates Session objects for each pair of interacting agents as defined in the agents_and_sessions attribute of the Agency. Each session facilitates communication and task execution between an agent and its designated recipient agent.
# This method creates Session objects for each pair of interacting agents as defined in the agents_and_sessions attribute of the Agency. Each session facilitates communication and task execution between an agent and its designated recipient agent.

No input parameters.
# No input parameters.

Output Parameters:
This method does not return any value but updates the agents_and_sessions attribute with initialized Session objects.
"""
for agent_name, sessions in self.agents_and_sessions.items():
for other_agent, items in sessions.items():
self.agents_and_sessions[agent_name][other_agent] = Session(self.get_agent_by_name(items["agent"]),
self.get_agent_by_name(items["recipient_agent"]))
# Output Parameters:
# This method does not return any value but updates the agents_and_sessions attribute with initialized Session objects.
# """
# for agent_name, sessions in self.agents_and_sessions.items():
# for other_agent, items in sessions.items():
# self.agents_and_sessions[agent_name][other_agent] = Session(self.get_agent_by_name(items["agent"]),
# self.get_agent_by_name(items["recipient_agent"]))

def get_class_folder_path(self):
"""
Expand Down
23 changes: 13 additions & 10 deletions agency_swarm/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@ def get_completion(self,
recipient_thread = Thread()

recipient_thread.status = ThreadStatus.Running
recipient_thread.in_message_chain = self.caller_thread.in_message_chain
recipient_thread.session_as_recipient = self
recipient_thread.properties = ThreadProperty.Persist if is_persist else recipient_thread.properties
if isinstance(self.caller_agent, User):
recipient_thread.in_message_chain = self.caller_agent.uuid
else:
recipient_thread.in_message_chain = self.caller_thread.in_message_chain

# 向recipient thread发送消息并获取回复
gen = self._get_completion_from_thread(recipient_thread, message, message_files, yield_messages)
try:
while True:
msg = next(gen)
msg.cprint()
yield msg
except StopIteration as e:
response = e.value
print(response)
except Exception as e: # 当会话超时,不能释放Thread对象
print(f"Exception{inspect.currentframe().f_code.co_name}{str(e)}")
raise e
Expand All @@ -83,16 +86,16 @@ def get_completion(self,
return response


def _get_completion_from_thread(self, recipien_thread: Thread, message: str, message_files=None, yield_messages=True):
def _get_completion_from_thread(self, recipient_thread: Thread, message: str, message_files=None, yield_messages=True):

# Determine the sender's name based on the agent type
sender_name = "user" if isinstance(self.caller_agent, User) else self.caller_agent.name
playground_url = f'https://platform.openai.com/playground?assistant={self.recipient_agent._assistant.id}&mode=assistant&thread={recipien_thread.thread_id}'
playground_url = f'https://platform.openai.com/playground?assistant={self.recipient_agent._assistant.id}&mode=assistant&thread={recipient_thread.thread_id}'
print(f'THREAD:[ {sender_name} -> {self.recipient_agent.name} ]: URL {playground_url}')

# send message
self.client.beta.threads.messages.create(
thread_id=recipien_thread.thread_id,
thread_id=recipient_thread.thread_id,
role="user",
content=message,
file_ids=message_files if message_files else [],
Expand All @@ -103,7 +106,7 @@ def _get_completion_from_thread(self, recipien_thread: Thread, message: str, mes

# create run
run = self.client.beta.threads.runs.create(
thread_id=recipien_thread.thread_id,
thread_id=recipient_thread.thread_id,
assistant_id=self. recipient_agent.id,
)

Expand All @@ -112,7 +115,7 @@ def _get_completion_from_thread(self, recipien_thread: Thread, message: str, mes
while run.status in ['queued', 'in_progress']:
time.sleep(5)
run = self.client.beta.threads.runs.retrieve(
thread_id=recipien_thread.thread_id,
thread_id=recipient_thread.thread_id,
run_id=run.id
)
print(f"Run [{run.id}] Status: {run.status}")
Expand All @@ -127,7 +130,7 @@ def _get_completion_from_thread(self, recipien_thread: Thread, message: str, mes
str(tool_call.function))

# TODO:这里如果是SendMessage函数,后续会采用创建新Python线程来执行,需要修改处理逻辑。
output = self._execute_tool(tool_call)
output = self._execute_tool(tool_call,recipient_thread)
if inspect.isgenerator(output):
try:
while True:
Expand All @@ -149,7 +152,7 @@ def _get_completion_from_thread(self, recipien_thread: Thread, message: str, mes

# submit tool outputs
run = self.client.beta.threads.runs.submit_tool_outputs(
thread_id=recipien_thread.thread_id,
thread_id=recipient_thread.thread_id,
run_id=run.id,
tool_outputs=tool_outputs
)
Expand All @@ -160,7 +163,7 @@ def _get_completion_from_thread(self, recipien_thread: Thread, message: str, mes
# return assistant message
else:
messages = self.client.beta.threads.messages.list(
thread_id=recipien_thread.thread_id
thread_id=recipient_thread.thread_id
)
message = messages.data[0].content[0].text.value

Expand Down
2 changes: 1 addition & 1 deletion agency_swarm/threads/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Thread:
in_message_chain: str = None
status: ThreadStatus = ThreadStatus.Ready
properties: ThreadProperty = ThreadProperty.OneOff
sessions = {} # {"recipient agent", session}
sessions = {} # {"recipient agent name", session}
session_as_sender = None # 用于python线程异常挂掉后的处理
session_as_recipient= None # 用于python线程异常挂掉后的处理
instruction: str = None
Expand Down
3 changes: 2 additions & 1 deletion agency_swarm/user/user.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import uuid
class User:
name: str = "User"
id: str

def __init__(self, name: str = None):
# later, we can add more attributes to the user like bio, etc
self.uuid = uuid.uuid4()
pass
86 changes: 86 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
from agency_swarm import set_openai_key
from getpass import getpass


from agency_swarm import Agent
from agency_swarm import Agency

set_openai_key(getpass("Please enter your openai key: "))

agent_db_env_instructions = """ #Instructions for Database Environment Agent
### Task: Call the function or tools to interact with database environment.
### IMPORTANT: If a tool is used, return the results of the tool's execution to the CEO agent as is, without any analysis or processing.
You are a highly specialized GPT focused on SQL database operations for professional database administrators and developers.
You understand specific intents recieved from other agents and generates appropriate database actions using commands, scripts, or third-party tools.
Your primary objective is to interact effectively with SQL databases, providing accurate and advanced technical solutions.
you focuses on returning precise results and analyzing them for technical users.
You maintains a professional and straightforward tone, prioritizing technical accuracy and practicality over personalization, ensuring that its responses are detailed, relevant, and valuable to database professionals.
"""

ceo_instructions = """# Instructions for CEO Agent
- Ensure that proposal is send to the user before proceeding with task execution.
- Delegate tasks to appropriate agents, ensuring they align with their expertise and capabilities.
- Clearly define the objectives and expected outcomes for each task.
- Provide necessary context and background information for successful task completion.
- Maintain ongoing communication with agents until complete task execution.
- Review completed tasks to ensure they meet the set objectives.
- Report the results back to the user."""

CoT_instructions = """# Instruction for LLM to Create an Agent for MySQL O&M Task Planning
1. **Objective**: You're an Agent specialized in MySQL Operations & Maintenance (O&M) task planning.
2. **Input Processing**:
- Receive and interpret O&M task intents from users.
- Analyze the task requirements and context.
3. **Task Planning Using Chain of Thought (CoT)**:
- Utilize the Chain of Thought technique for planning.
- Draw on expert knowledge and previous cases to form a logical, step-by-step task chain.
- Ensure each step in the task chain is clear, actionable, and relevant to the overall O&M task.
4. **Interaction with Database Environment**:
- Dispatch specific tasks from the chain to the Agent operating within the database environment.
- Each task should be formulated to interact effectively with the database, considering current state and potential issues.
5. **Dynamic Adjustment Based on Feedback**:
- Continuously monitor the feedback from the database environment.
- Adjust the subsequent tasks in the chain based on the results and feedback received.
- Ensure the adjustment process is agile and responsive to real-time changes in the database environment.
6. **Success Criteria**:
- The task chain leads to the successful execution of the entire O&M task.
- Efficiency and accuracy in task execution.
- Minimal disruptions and optimal performance of the MySQL database during and after the O&M activities.
7. **Continual Learning and Improvement**:
- Implement mechanisms for the Agent to learn from each completed task.
- Update the knowledge base and CoT strategies based on new experiences and outcomes.
"""


agent_db_env = Agent(name="Database Environment",
tools=[],
description="responsible for taking specific tasks and executing them in the database environment (including databases, monitoring systems, etc.), planning and executing specific actions.",
instructions=agent_db_env_instructions,
files_folder=None)

ceo = Agent(name="CEO",
description="Responsible for client communication, task planning and management.",
instructions=ceo_instructions, # can be a file like ./instructions.md
files_folder=None,
tools=[])

agent_CoT = Agent(name="CoT Task Agent",
tools=[],
description="responsible for MySQL Operations & Maintenance (O&M) task planning by CoT, and send the specific task to the database environment. ",
instructions=CoT_instructions,
files_folder=None)

agency_manifesto = """You are tasked with solving O&M problems for MySQL users."""

agency = Agency([
ceo,
[ceo, agent_CoT],
[agent_CoT, agent_db_env]
], shared_instructions=agency_manifesto)

agency.demo_gradio(height=900)

0 comments on commit 34da1ad

Please sign in to comment.