Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(platform) Smart Decision Maker Block #9490

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions autogpt_platform/backend/backend/blocks/smart_decision_maker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import logging
from typing import Any

from autogpt_libs.utils.cache import thread_cached

import backend.blocks
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.graph import Graph
from backend.data.model import SchemaField

logger = logging.getLogger(__name__)


@thread_cached
def get_database_manager_client():
from backend.executor import DatabaseManager
from backend.util.service import get_service_client

return get_service_client(DatabaseManager)


class SmartDecisionMakerBlock(Block):
# Note: Currently proving out the concept of determining the inputs a tool takes

class Input(BlockSchema):
# Note: This is a placeholder for the actual input schema
text: str = SchemaField(description="The text to print to the console.")

class Output(BlockSchema):
# Starting with a single tool.
tools: dict[str, dict[str, Any]] = SchemaField(
description="The tools that are available to use."
)

def __init__(self):
super().__init__(
id="3b191d9f-356f-482d-8238-ba04b6d18381",
description="Uses AI to intelligently decide what tool to use.",
categories={BlockCategory.BASIC},
input_schema=SmartDecisionMakerBlock.Input,
output_schema=SmartDecisionMakerBlock.Output,
test_input={"text": "Hello, World!"},
test_output=[
(
"tools",
{
"add_to_dictionary": {
"key": "greeting",
"value": "Hello, World!",
},
"print_to_console": {
"text": "Hello, World!",
},
},
)
],
)

@staticmethod
def _create_function_signature(node_id: str, graph: Graph) -> list[dict[str, Any]]:
"""
Creates a list of function signatures for tools linked to a specific node in a graph.

This method identifies all tool links associated with a given node ID within a graph,
groups them by tool name, and constructs a function signature for each tool. Each
function signature includes the tool's name, description, and parameters required
for its execution.

Parameters:
- node_id (str): The ID of the node for which tool function signatures are to be created.
- graph (GraphModel): The graph model containing nodes and links.

Returns:
- list[dict[str, Any]]: A list of dictionaries, each representing a tool function signature.
Each dictionary contains:
- "name": The name of the tool.
- "description": A description of the tool or its multi-step process.
- "parameters": A dictionary detailing the parameters required by the tool, including:
- "type": The data type of the parameter.
- "description": A description of the parameter.

Raises:
- ValueError: If no tool links are found for the given node ID or if no tool sink nodes are identified.
"""
# Filter the graph links to find those that are tools and are linked to the specified node_id
tool_links = [
link
for link in graph.links
# NOTE: Maybe we can do a specific database call to only get relevant nodes
# async def get_connected_output_nodes(source_node_id: str) -> list[Node]:
# links = await AgentNodeLink.prisma().find_many(
# where={"agentNodeSourceId": source_node_id},
# include={"AgentNode": {"include": AGENT_NODE_INCLUDE}},
# )
# return [NodeModel.from_db(link.AgentNodeSink) for link in links]
if link.source_name.startswith("tools_^_") and link.source_id == node_id
]

node_block_map = {node.id: node.block_id for node in graph.nodes}

if not tool_links:
raise ValueError(
f"Expected at least one tool link in the graph. Node ID: {node_id}. Graph: {graph.links}"
)

tool_functions = []
grouped_tool_links = {}

for link in tool_links:
grouped_tool_links.setdefault(link.sink_id, []).append(link)

logger.warning(f"Grouped tool links: {grouped_tool_links}")

for tool_name, links in grouped_tool_links.items():
tool_sink_nodes = {link.sink_id for link in links}
tool_function = {"name": tool_name}

if len(tool_sink_nodes) == 1:
tool_block = backend.blocks.AVAILABLE_BLOCKS[
node_block_map[next(iter(tool_sink_nodes))]
]
tool_function["name"] = tool_block().name
tool_function["description"] = tool_block().description
elif len(tool_sink_nodes) > 1:
tool_blocks = [
backend.blocks.AVAILABLE_BLOCKS[node_block_map[node_id]]
for node_id in tool_sink_nodes
]
tool_function["name"] = tool_blocks[0]().name
tool_function["description"] = (
"This tool is a multi-step tool that can be used to perform a task. "
"It includes blocks that do: "
+ ", ".join(
block.name
for block in tool_blocks
if isinstance(block.name, str)
)
)
else:
raise ValueError(
f"Expected at least one tool link in the graph: {tool_links}"
)

properties = {}
required = []

for link in links:
required.append(link.sink_name)
sink_block = backend.blocks.AVAILABLE_BLOCKS[
node_block_map[link.sink_id]
]
sink_block_input = sink_block().input_schema.__fields__[link.sink_name]
sink_type = sink_block_input.annotation
sink_description = sink_block_input.description

if sink_type not in ["string", "number", "boolean", "object"]:
sink_type = "string"

properties[link.sink_name] = {
"type": sink_type,
"description": sink_description,
}

tool_function["parameters"] = {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False,
"strict": True,
}

tool_functions.append({"type": "function", "function": tool_function})

return tool_functions

def run(
self,
input_data: Input,
*,
graph_id: str,
node_id: str,
graph_exec_id: str,
node_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
db_client = get_database_manager_client()

# Retrieve the current graph and node details
graph = db_client.get_graph(graph_id)

if not graph:
raise ValueError(f"Graph not found {graph_id}")

tool_functions = self._create_function_signature(node_id, graph)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test this works with mem0

since it uses the kwargs based on its place in the graph

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ntindle It seems that high level decision is that tools must be an AgentExecutionBlock.

If you feel strongly that this is a bad idea, let me know and we can discuss it with Toran

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should it require that? I can see plenty of use for this not needing a whole agent and the overhead that would come with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason why is to make a smart decision, a certain amount of information is required about a tool. We can backprogate this information from an agent - name, description, required params, etc.

But with a normal block, we can't really do this unless we assume that block can't be connected to another block...

There are a fair bit of subtleties with this system


yield "tools_#_add_to_dictionary_#_key", tool_functions
8 changes: 8 additions & 0 deletions autogpt_platform/backend/backend/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ def run(self, input_data: BlockSchemaInputType, **kwargs) -> BlockOutput:
Run the block with the given input data.
Args:
input_data: The input data with the structure of input_schema.

Kwargs: Currently 14/02/2025 these include
graph_id: The ID of the graph.
node_id: The ID of the node.
graph_exec_id: The ID of the graph execution.
node_exec_id: The ID of the node execution.
user_id: The ID of the user.

Returns:
A Generator that yields (output_name, output_data).
output_name: One of the output name defined in Block's output_schema.
Expand Down
69 changes: 63 additions & 6 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,38 @@ async def get_executions_in_timerange(


def parse_execution_output(output: BlockData, name: str) -> Any | None:
# Allow extracting partial output data by name.
"""
Extracts partial output data by name from a given BlockData.

The function supports extracting data from lists, dictionaries, and objects
using specific naming conventions:
- For lists: <output_name>_$_<index>
- For dictionaries: <output_name>_#_<key>
- For objects: <output_name>_@_<attribute>

Args:
output (BlockData): A tuple containing the output name and data.
name (str): The name used to extract specific data from the output.

Returns:
Any | None: The extracted data if found, otherwise None.

Examples:
>>> output = ("result", [10, 20, 30])
>>> parse_execution_output(output, "result_$_1")
20

>>> output = ("config", {"key1": "value1", "key2": "value2"})
>>> parse_execution_output(output, "config_#_key1")
'value1'

>>> class Sample:
... attr1 = "value1"
... attr2 = "value2"
>>> output = ("object", Sample())
>>> parse_execution_output(output, "object_@_attr1")
'value1'
"""
output_name, output_data = output

if name == output_name:
Expand Down Expand Up @@ -428,11 +459,37 @@ def parse_execution_output(output: BlockData, name: str) -> Any | None:

def merge_execution_input(data: BlockInput) -> BlockInput:
"""
Merge all dynamic input pins which described by the following pattern:
- <input_name>_$_<index> for list input.
- <input_name>_#_<index> for dict input.
- <input_name>_@_<index> for object input.
This function will construct pins with the same name into a single list/dict/object.
Merges dynamic input pins into a single list, dictionary, or object based on naming patterns.

This function processes input keys that follow specific patterns to merge them into a unified structure:
- `<input_name>_$_<index>` for list inputs.
- `<input_name>_#_<index>` for dictionary inputs.
- `<input_name>_@_<index>` for object inputs.

Args:
data (BlockInput): A dictionary containing input keys and their corresponding values.

Returns:
BlockInput: A dictionary with merged inputs.

Raises:
ValueError: If a list index is not an integer.

Examples:
>>> data = {
... "list_$_0": "a",
... "list_$_1": "b",
... "dict_#_key1": "value1",
... "dict_#_key2": "value2",
... "object_@_attr1": "value1",
... "object_@_attr2": "value2"
... }
>>> merge_execution_input(data)
{
"list": ["a", "b"],
"dict": {"key1": "value1", "key2": "value2"},
"object": <MockObject attr1="value1" attr2="value2">
}
"""

# Merge all input with <input_name>_$_<index> into a single list.
Expand Down
33 changes: 32 additions & 1 deletion autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,43 @@ def reassign_ids(self, user_id: str, reassign_graph_id: bool = False):

def validate_graph(self, for_run: bool = False):
def sanitize(name):
return name.split("_#_")[0].split("_@_")[0].split("_$_")[0]
return name.split("_#_")[0].split("_@_")[0].split("_$_")[0].split("_^_")[0]

# Validate smart decision maker nodes
smart_decision_maker_nodes = set()
# smd_unique_tool_links = {}
agent_nodes = set()

for node in self.nodes:
if node.block_id == "3b191d9f-356f-482d-8238-ba04b6d18381":
smart_decision_maker_nodes.add(node.id)
elif node.block_id == "e189baac-8c20-45a1-94a7-55177ea42565":
agent_nodes.add(node.id)

input_links = defaultdict(list)
tool_name_to_node = {}

for link in self.links:
input_links[link.sink_id].append(link)

if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments here?

link.source_id in smart_decision_maker_nodes
and link.source_name.startswith("tools_^_")
):
tool_name = link.source_name.split("_^_")[1]
if tool_name in tool_name_to_node:
if tool_name_to_node[tool_name] != link.sink_id:
raise ValueError(
f"Tool name {tool_name} links to multiple nodes: {tool_name_to_node[tool_name]} and {link.sink_id}"
)
else:
tool_name_to_node[tool_name] = link.sink_id

# TODO: Uncomment this when I've updated the tests
# smd_unique_tool_links[link.source_id] = set(link.sink_id)
# if link.sink_id not in agent_nodes:
# raise ValueError(f"Smart decision maker node {link.source_id} cannot link to non-agent node {link.sink_id}")

# Nodes: required fields are filled or connected and dependencies are satisfied
for node in self.nodes:
block = get_block(node.block_id)
Expand Down
11 changes: 10 additions & 1 deletion autogpt_platform/backend/backend/data/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,25 @@
from backend.data.includes import INTEGRATION_WEBHOOK_INCLUDE
from backend.data.queue import AsyncRedisEventBus
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks.utils import webhook_ingress_url
from backend.util.settings import Config

from .db import BaseDbModel

if TYPE_CHECKING:
from .graph import NodeModel

app_config = Config()

logger = logging.getLogger(__name__)


def webhook_ingress_url(provider_name: ProviderName, webhook_id: str) -> str:
return (
f"{app_config.platform_base_url}/api/integrations/{provider_name.value}"
f"/webhooks/{webhook_id}/ingress"
)


class Webhook(BaseDbModel):
user_id: str
provider: ProviderName
Expand Down
Loading
Loading