Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is this the correct approach to building a multi-agent application using Pydantic AI? #300

Open
ishswar opened this issue Dec 17, 2024 · 9 comments · May be fixed by #541
Open

Is this the correct approach to building a multi-agent application using Pydantic AI? #300

ishswar opened this issue Dec 17, 2024 · 9 comments · May be fixed by #541

Comments

@ishswar
Copy link

ishswar commented Dec 17, 2024

Hi Team,

I have been working on extending the sample provided at Pydantic AI Bank Support Example, and I’ve added a multi-agent system that includes several agents interacting with each other. Here’s an overview of what I’ve implemented:

Multi-Agent System Breakdown:

napkin-selection

  • Triage Agent: This agent receives customer queries and determines whether the query is related to support or loan. It then calls either the Support Agent or Loan Agent using tools based on the query type.
  • Support Agent: Handles customer support queries, such as balance checks and card blocking. It can call the Marketing Agent to capture customer details (e.g., their name).
  • Loan Agent: Deals with loan-related queries, such as loan status and loan balance. It can also call the Marketing Agent to capture customer details.
  • Marketing Agent: A backend agent responsible for capturing customer details (e.g., saving the customer’s name and tracking inquiries in a CSV file).

Pydantic_ai-multi-agent-bank

Each agent is called via Tools and utilizes RunContext to pass dependencies. The tools call_support_agent and call_loan_agent are used by the Triage Agent to forward the query to the appropriate agent based on the customer’s request.

Sample Output:

Here is an example of what the output looks like for various queries:

Prompt: What is my balance?
Response: Hello John, your current account balance is $123.45.


Prompt: My card is lost. Please help!
Response: I'm sorry to hear that, John. We are temporarily blocking your card to prevent unauthorized transactions. Please confirm if you need further assistance or would like to request a new card.


Prompt: What is the status of my loan?
Response: Your loan status is currently 'Active' and the remaining balance is $5000.


Prompt: How tall is the Eiffel Tower?
Response: I'm sorry, but I cannot answer questions that don't relate to loans or support. Please let me know if you have any queries regarding loans or bank support!


My Questions:

  • Is this the correct approach to building a multi-agent application using Pydantic AI?
  • Are the dependencies, tools, and agents interacting correctly in this structure?
  • Is there a more optimal way to organize or manage communication between agents in this setup?

I’m looking for feedback on whether the way I’ve implemented these agents and their interaction is aligned with best practices for multi-agent systems in Pydantic AI. Any suggestions or improvements would be greatly appreciated.

Thanks for your help!

logFire Debug lines ( nicely indented - Thanks to whoever coded that! 🙏)

********************************************************************************
Prompt: What is my balance?
********************************************************************************
06:05:07.247 triage_agent run prompt=What is my balance?
06:05:07.247   preparing model and tools run_step=1
06:05:07.248   model request
Logfire project URL: https://logfire.pydantic.dev/ishswar/weather
06:05:07.996   handle model response
06:05:07.997     running tools=['call_support_agent']
06:05:07.997     support_agent run prompt=What is the customer's account balance?
06:05:07.997       preparing model and tools run_step=1
06:05:07.998       model request
06:05:09.375       handle model response
06:05:09.376         running tools=['customer_balance', 'capture_customer_name']
06:05:09.377         marketing_agent run prompt=Save customer name John for ID 123
06:05:09.378           preparing model and tools run_step=1
06:05:09.378           model request
06:05:10.252           handle model response
06:05:10.253             running tools=['save_customer_name']
06:05:10.256           preparing model and tools run_step=2
06:05:10.257           model request
06:05:10.901           handle model response
06:05:10.905       preparing model and tools run_step=2
06:05:10.906       model request
06:05:12.569       handle model response
06:05:12.575   preparing model and tools run_step=2
06:05:12.575   model request
06:05:14.432   handle model response
Your current account balance is $123.45, including pending transactions.

Full Code

import os
import csv
import uuid
import dotenv
import logfire
from typing import Any, Optional
from dataclasses import dataclass
from pydantic import BaseModel, Field
from pydantic_ai.result import RunResult
from pydantic_ai import Agent, RunContext

# Load environment variables
dotenv.load_dotenv()
# # 'if-token-present' means nothing will be sent (and the example will work) if you don't have logfire configured
# logfire.configure()

# --- Fake Database for Loan Data ---
class LoanDB:
    """This is a fake loan database for example purposes.

    In reality, you'd be connecting to an external database
    (e.g., PostgreSQL) to manage loan information.
    """

    @classmethod
    async def customer_name(cls, *, id: int) -> str | None:
        if id == 123:
            return 'John'

    @classmethod
    async def loan_status(cls, *, id: int) -> str | None:
        """Fetch the loan status of a customer by their ID."""
        if id == 123:
            return 'Active'
        elif id == 124:
            return 'Paid off'
        elif id == 125:
            return 'Defaulted'
        else:
            return None

    @classmethod
    async def cancel_loan(cls, *, id: int) -> str:
        """Cancel a loan for a customer."""
        if id == 123:
            # Fake logic for canceling a loan
            return f"Loan for customer ID {id} has been canceled."
        else:
            raise ValueError(f"Customer with ID {id} does not have an active loan.")

    @classmethod
    async def add_loan(cls, *, id: int, amount: float, interest_rate: float) -> str:
        """Add a loan for a customer."""
        if id == 123:
            # Fake logic for adding a loan
            return f"Loan of ${amount} with an interest rate of {interest_rate}% has been added for customer ID {id}."
        else:
            raise ValueError(f"Customer with ID {id} cannot be found to add a loan.")

    @classmethod
    async def loan_balance(cls, *, id: int) -> float | None:
        """Fetch the remaining balance of a customer's loan."""
        if id == 123:
            return 5000.0  # Fake loan balance
        elif id == 124:
            return 0.0  # Loan paid off
        else:
            raise ValueError(f"Customer with ID {id} not found or no loan exists.")

# --- Fake Database for Customer Data ---
class DatabaseConn:
    """This is a fake database for example purposes.

    In reality, you'd be connecting to an external database
    (e.g. PostgreSQL) to get information about customers.
    """

    @classmethod
    async def customer_name(cls, *, id: int) -> str | None:
        if id == 123:
            return 'John'

    @classmethod
    async def customer_balance(cls, *, id: int, include_pending: bool) -> float:
        if id == 123:
            return 123.45
        else:
            raise ValueError('Customer not found')

# --- Dependencies ---

@dataclass
class SupportDependencies:
    customer_id: int
    db: DatabaseConn
    marketing_agent: Agent

@dataclass
class LoanDependencies:
    customer_id: int
    db: LoanDB
    marketing_agent: Agent

@dataclass
class TriageDependencies:
    support_agent: Agent
    loan_agent: Agent
    customer_id: int

# --- Result Models ---
class SupportResult(BaseModel):
    support_advice: str = Field(description='Advice returned to the customer')
    block_card: bool = Field(description='Whether to block their')
    risk: int = Field(description='Risk level of query', ge=0, le=10)
    customer_tracking_id : str = Field(description='Tracking ID for customer')

class LoanResult(BaseModel):
    loan_approval_status: str = Field(description='Approval status of the loan (e.g., Approved, Denied, Pending)')
    loan_balance: float = Field(description='Remaining balance of the loan')
    customer_tracking_id: str = Field(description='Tracking ID for the customer applying for the loan')

class TriageResult(BaseModel):
    department: Optional[str] = Field(description='Department to direct the customer query to')
    response: Optional[LoanResult | SupportResult] = Field(description='Response to the customer query')
    text_response: Optional[str] = Field(description='Text response to the customer query')

# --- Agents ---

# Support agent for handling customer support queries
support_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=SupportDependencies,
    result_type=SupportResult,
    system_prompt=(
        'You are a support agent in our bank, give the '
        'customer support and judge the risk level of their query. '
        "Reply using the customer's name."
        'Additionally, always capture the customer's name in our marking system using the tool `capture_customer_name`, regardless of the query type. '
        'At the end of your response, make sure to capture the customer's name to maintain proper records. '
    ),
)

@support_agent.system_prompt
async def add_customer_name(ctx: RunContext[SupportDependencies]) -> str:
    customer_name = await ctx.deps.db.customer_name(id=ctx.deps.customer_id)
    return f"The customer's name is {customer_name!r}"

@support_agent.tool()
async def block_card(ctx: RunContext[SupportDependencies] , customer_name: str ) -> str:
    return f"I'm sorry to hear that, {customer_name}. We are temporarily blocking your card to prevent unauthorized transactions."

@support_agent.tool
async def customer_balance(
    ctx: RunContext[SupportDependencies], include_pending: bool
) -> str:
    """Returns the customer's current account balance."""
    balance = await ctx.deps.db.customer_balance(
        id=ctx.deps.customer_id,
        include_pending=include_pending,
    )
    return f'${balance:.2f}'

# Loan agent for handling loan-related queries

loan_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=LoanDependencies,
    result_type=LoanResult,
    system_prompt=(
        'You are a support agent in our bank, assisting customers with loan-related inquiries. '
        'For every query, provide the following information: '
        '- Loan approval status (e.g., Approved, Denied, Pending) '
        '- Loan balance '
        'Please ensure that your response is clear and helpful for the customer. '
        'Always conclude by providing the customer's name and capturing their information in the marking system using the tool `capture_customer_name`. '
        'Never generate data based on your internal knowledge; always rely on the provided tools to fetch the most accurate and up-to-date information.'
    ),
)

# Add the customer's name to the response
@loan_agent.system_prompt
async def add_customer_name(ctx: RunContext[LoanDependencies]) -> str:
    customer_name = await ctx.deps.db.customer_name(id=ctx.deps.customer_id)
    return f"The customer's name is {customer_name!r}"

# Tools for the loan agent
@loan_agent.tool()
async def loan_status(ctx: RunContext[LoanDependencies]) -> str:
    status = await ctx.deps.db.loan_status(id=ctx.deps.customer_id)
    return f'The loan status is {status!r}'

@loan_agent.tool()
async def cancel_loan(ctx: RunContext[LoanDependencies]) -> str:
    return await ctx.deps.db.cancel_loan(id=ctx.deps.customer_id)

@loan_agent.tool()
async def add_loan(ctx: RunContext[LoanDependencies], amount: float, interest_rate: float) -> str:
    return await ctx.deps.db.add_loan(id=ctx.deps.customer_id, amount=amount, interest_rate=interest_rate)

@loan_agent.tool()
async def loan_balance(ctx: RunContext[LoanDependencies]) -> float:
    return await ctx.deps.db.loan_balance(id=ctx.deps.customer_id)

# End of the loan agent

# Common tool for capturing the customer's name
# Used by both the support and loan agents
@support_agent.tool
@loan_agent.tool
async def capture_customer_name(ctx: RunContext[SupportDependencies], customer_name: str) -> str:
    """Capture the customer's name for marketing purposes."""

    await ctx.deps.marketing_agent.run(f"Save customer name {customer_name} for ID {ctx.deps.customer_id}", deps=ctx.deps)

    tracking_id = str(uuid.uuid4())
    return tracking_id

# Start of the triage agent

# Triage agent to direct customer queries to the appropriate department
triage_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=TriageDependencies,
    system_prompt=(
        'You are a triage agent in our bank, responsible for directing customer queries to the appropriate department. '
        'For each query, determine whether it is related to support (e.g., balance, card, account-related queries) or loan services (e.g., loan status, application, and loan-related inquiries). '
        'If the query is related to support, direct the customer to the support team with an appropriate response. '
        'If the query is related to loans, direct the customer to the loan department with a relevant response. '
        'If the query is unclear or does not fit into either category, politely inform the customer and suggest they ask about loans or support. '
        'Always ensure that the response is clear, concise, and provides direction to the right department for further assistance.'
        'Never generate data based on your internal knowledge; always rely on the provided tools to fetch the most accurate and up-to-date information.'
    ),
    result_type=TriageResult,
)

# Start of the tools for the triage agent

@triage_agent.tool
async def call_support_agent(ctx: RunContext[TriageDependencies], prompt: str) -> RunResult[Any]:
    # print(f"Calling support agent with prompt: {prompt}")
    support_deps = SupportDependencies(customer_id=ctx.deps.customer_id, db=DatabaseConn(), marketing_agent=marketing_agent)

    return await ctx.deps.support_agent.run(prompt, deps=support_deps)

@triage_agent.tool
async def call_loan_agent(ctx: RunContext[TriageDependencies], prompt: str) -> RunResult[Any]:
    # print(f"Calling loan agent with prompt: {prompt}")
    loan_deps = LoanDependencies(customer_id=ctx.deps.customer_id, db=LoanDB(), marketing_agent=marketing_agent)

    return await ctx.deps.loan_agent.run(prompt, deps=loan_deps)

# End of the tools for the triage agent

# Marketing agent for saving customer names
marketing_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=SupportDependencies,
    system_prompt=(
        'You are a marketing agent in our bank'
        'For now you only save the customer name in our marking system using tool `save_customer_name`'
    ),
)

@marketing_agent.tool_plain
async def save_customer_name(customer_name: str, customer_id: int) -> None:
    """Saves the customer's name and tracks how many times their info is captured."""
    # print(f"Saving customer name {customer_name} for ID {customer_id}. in the marketing system")
    # Path to the CSV file
    csv_file_path = 'customer_name.csv'

    # If the file does not exist, create it and write the header
    if not os.path.exists(csv_file_path):
        with open(csv_file_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['customer_id', 'customer_name', 'inquiries_count'])

    # Read the existing data to check if the customer already exists
    customer_found = False
    rows = []
    with open(csv_file_path, 'r', newline='') as f:
        reader = csv.reader(f)
        rows = list(reader)

    # Check if the customer ID already exists and update the inquiry count
    for row in rows:
        if row[0] == str(customer_id):
            row[2] = str(int(row[2]) + 1)  # Increment the inquiry count
            customer_found = True
            break

    # If the customer was not found, add a new row with inquiry count starting from 0
    if not customer_found:
        rows.append([str(customer_id), customer_name, '0'])

    # Write the updated data back to the CSV file
    with open(csv_file_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerows(rows)

    # print(f"Customer data updated for ID {customer_id} with name {customer_name}.")

def print_prompt(prompt: str):
    print("*" * 80)
    print(f"Prompt: {prompt}")
    print("*" * 80)

# Main function to run the triage agent
def main():
    deps = TriageDependencies(support_agent=support_agent, loan_agent=loan_agent, customer_id=123)
    prompt = 'What is my balance?'
    print_prompt(prompt)
    result = triage_agent.run_sync(prompt, deps=deps)
    print(result.data.text_response)
    # print(result.data.model_dump_json(indent=2))
    """
    {
      "department": "support",
      "response": {
        "support_advice": "Your current account balance is $123.45.",
        "block_card": false,
        "risk": 2,
        "customer_tracking_id": "13673e99-70ff-4851-8737-d06e66151234"
      },
      "text_response": "Your current account balance is $123.45."
    }
        """

    prompt = 'My card is lost. Please help!'
    print_prompt(prompt)
    result = triage_agent.run_sync(prompt, deps=deps)
    print(result.data.text_response)
    # print(result.data.model_dump_json(indent=2))
    """
    {
      "department": "support",
      "response": {
        "support_advice": "I'm sorry to hear that, John. We are temporarily blocking your card to prevent unauthorized transactions.",
        "block_card": true,
        "risk": 8,
        "customer_tracking_id": "04ee6c84-d996-43ae-b049-466c36249042"
      },
      "text_response": "I'm sorry to hear that, John. We are temporarily blocking your card to prevent unauthorized transactions."
    }
        """
    prompt = 'What is the status of my loan?'
    print_prompt(prompt)
    result = triage_agent.run_sync(prompt, deps=deps)
    print(result.data.text_response)
    # print(result.data.model_dump_json(indent=2))

    """
    {
      "department": "loan",
      "response": {
        "loan_approval_status": "Active",
        "loan_balance": 5000.0,
        "customer_tracking_id": "3ec98579-43cc-4fb0-86eb-bd49ac66479c"
      },
      "text_response": "Your loan status is currently 'Active' and you have a remaining loan balance of $5000. If you need further assistance, feel free to reach out!"
    }
        """
    prompt = 'How tall is Eiffel tower ?'
    print_prompt(prompt)
    result = triage_agent.run_sync(prompt, deps=deps)
    print(result.data.text_response)
    #print(result.data.model_dump_json(indent=2))

if __name__ == '__main__':
    main()
@samuelcolvin
Copy link
Member

Thanks so much for this, this is really interesting,

Yes this is roughly what we're thinking of, you might also be interested in #120.

There are a few things I think we can do to make this even simpler:

  • use the same deps across all agents, so we can pass deps around freely - I don't see any downside to this
  • provide a cleaner way of registering agents as tools
  • Introduce and recommend a DOA pattern for data access - we almost have this already in these examples

@MarkusOdenthal
Copy link

Looks solid. No better idea currently on how to build multi-agents. And thanks for sharing this. I am also currently working on a kind of multi-agent system.

But I tried to use just tools so far. What I'm currently struggling with is getting the agent to always think before deciding to use a tool to answer the user's message. I haven't found a way to add a rule so the agent does this consistently.

But I think your approach with the multi-agent could work for me. I could create a thinking/planning agent and use it as a dependency.

@pietz
Copy link

pietz commented Dec 17, 2024

@ishswar really cool example!

I think there are (at least) two different types of multi-agent systems. (I'm making up terms on the spot):

  • Orchestrator: Each message is initially send to the orchestrator agent, which then forwards single tool calls to other agents as necessary. It follows an analogy of "let me ask my colleague and get back to you".
  • Handoff: The current agent can decide at any point to handoff a conversation to a different agent to lead the conversation. It follows an analogy of "let me forward you to my colleague".

The example above uses the Orchestrator approach. I have a gut feeling that this is a serious limitation for more complex systems.

How about this instead:

# --- Result Model ---
class AgentResult(BaseModel):
    response: str = Field(description="Response to the customer")
    target_agent: Optional[str] = Field(description="Agent to hand over to", default=None)

# --- Dependencies ---
@dataclass
class AgentDependencies:
    conversation_id: str
    user_id: str

# --- Agents ---
triage_agent = Agent(
    "openai:gpt-4o",
    deps_type=AgentDependencies,
    result_type=AgentResult,
    system_prompt="""You are a triage agent that directs customers to the right department.
    For technical issues (internet, service outages, connection problems), set target_agent to 'tech_support'.
    For billing issues (payments, charges, invoices), set target_agent to 'billing'.
    For general inquiries, handle them yourself without setting a target_agent.
    """
)

tech_support = Agent(
    "openai:gpt-4o",
    deps_type=AgentDependencies,
    result_type=AgentResult,
    system_prompt="""You are a technical support agent. Handle technical issues.
    If a billing question comes up, set target_agent to 'billing' in your response.
    """
)

billing_agent = Agent(
    "openai:gpt-4o",
    deps_type=AgentDependencies,
    result_type=AgentResult,
    system_prompt="""You are a billing support agent. Handle billing issues.
    If a technical question comes up, set target_agent to 'tech_support' in your response.
    """
)

# --- Router ---
class AgentRouter:
    def __init__(self):
        self.agents = {
            "triage": triage_agent,
            "tech_support": tech_support,
            "billing": billing_agent
        }

    async def handle_conversation(self, prompt: str, deps: AgentDependencies) -> str:
        current_agent = self.agents["triage"]
        
        while True:
            # Run current agent
            result = await current_agent.run(prompt, deps=deps)
            
            # Check for handover
            if result.data.target_agent:
                print(f"Handing over to {result.data.target_agent}")
                current_agent = self.agents[result.data.target_agent]
                continue
            
            # No handover needed, return response
            return result.data.response

@MarkusOdenthal
Copy link

@pietz this is an elegant approach, thanks. I will try it.

@samuelcolvin
Copy link
Member

See also:

@grassxyz
Copy link

I implemented my agents using an Orchestrator approach last week, and I found it to be more effective than the router method. In this setup, the first agent has a comprehensive view of the overall process, utilizing all other agents as tools to achieve the desired outcome. This is analogous to a manager who assigns tasks to different team members and reviews the results before presenting them to the boss.
In contrast, the handoff approach tends to miss important details, as each agent only has knowledge of specific tasks or information. The Orchestrator method ensures a more cohesive and integrated workflow. At the end of the day, it all depends on your use cases.

@Ravitwr
Copy link

Ravitwr commented Dec 21, 2024

I have a question regarding agents being used as dependencies for other agents. In what scenarios do we actually need this? For example, in @ishswar's example, the triage agent has dependencies on the support and loan agents. Why can't we directly use the support and loan agents instead of passing them as dependencies to the triage agent?
I am actually trying to create a sample pydanticai fast api backend.

@samuelcolvin
Copy link
Member

I have a question regarding agents being used as dependencies for other agents. In what scenarios do we actually need this? For example, in @ishswar's example, the triage agent has dependencies on the support and loan agents. Why can't we directly use the support and loan agents instead of passing them as dependencies to the triage agent? I am actually trying to create a sample pydanticai fast api backend.

I think there's no reason to make an agent a dependency, agents are generally global and immutable, so you can use them without making them deps.

@samuelcolvin
Copy link
Member

I've proposed #541 which I hope documents multi-agent applications pretty well.

Full support for graphs is coming too, see #528 and #539.

I'd love your feedback, particularly on #541 at this time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants
@samuelcolvin @ishswar @pietz @grassxyz @MarkusOdenthal @Ravitwr and others