Skip to content

Commit

Permalink
Agentic flow imp (#1780)
Browse files Browse the repository at this point in the history
* basic agentic flow implementation

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

---------

Co-authored-by: spandan_mondal <[email protected]>
  • Loading branch information
hasinaxp and spandan_mondal authored Feb 5, 2025
1 parent f9f89a3 commit d71acd1
Show file tree
Hide file tree
Showing 11 changed files with 807 additions and 13 deletions.
2 changes: 1 addition & 1 deletion kairon/actions/definitions/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
model_to_check = llm_params['hyperparameters'].get('model')
Sysadmin.check_llm_model_exists(model_to_check, llm_type, self.bot)
llm_response, time_taken_llm_response = await llm_processor.predict(user_msg,
user=tracker.sender_id,
user= tracker.sender_id,
invocation='prompt_action',
llm_type=llm_type,
**llm_params)
Expand Down
21 changes: 20 additions & 1 deletion kairon/chat/routers/web_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from kairon.chat.utils import ChatUtils
from kairon.live_agent.live_agent import LiveAgent
from kairon.shared.auth import Authentication
from kairon.shared.chat.models import ChatRequest
from kairon.shared.chat.agent.agent_flow import AgenticFlow
from kairon.shared.chat.models import ChatRequest, AgenticFlowRequest
from kairon.shared.constants import CHAT_ACCESS
from kairon.shared.data.processor import MongoProcessor
from kairon.shared.models import User
Expand Down Expand Up @@ -91,3 +92,21 @@ async def verity_auth(current_user: User = Security(Authentication.get_current_u
"bot_id": current_user.get_bot()
}}

@router.post('/exec/flow', response_model=Response)
async def execute_flow(
request: AgenticFlowRequest,
bot: Text = Path(description="Bot id"),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=CHAT_ACCESS)
):
"""
Retrieves chat client config of a bot.
"""
flow = AgenticFlow(bot, request.slot_vals, request.sender_id)
responses, errors = await flow.execute_rule(request.name)
return {
"data": {
"responses": responses,
"errors": errors,
},
"message": "Rule executed successfully!"
}
5 changes: 4 additions & 1 deletion kairon/shared/actions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,10 @@ def prepare_url(http_url: str, tracker_data: dict):
if not ActionUtility.is_empty(previous_user_msg):
user_msg = previous_user_msg
http_url = http_url.replace("$SENDER_ID", tracker_data.get(ActionParameterType.sender_id.value, ""))
http_url = http_url.replace("$INTENT", tracker_data.get(ActionParameterType.intent.value, ""))
if intent := tracker_data.get(ActionParameterType.intent.value):
http_url = http_url.replace("$INTENT", intent)
else:
http_url = http_url.replace("$INTENT", "")
http_url = http_url.replace("$USER_MESSAGE", user_msg)

pattern_keyvault = r'\$\$\w+'
Expand Down
287 changes: 287 additions & 0 deletions kairon/shared/chat/agent/agent_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import secrets
import time

from mongoengine import DoesNotExist
from pymongo import MongoClient
from rasa.core.nlg import TemplatedNaturalLanguageGenerator
from rasa.shared.core.events import UserUttered
from rasa.shared.core.slots import TextSlot, BooleanSlot, CategoricalSlot, FloatSlot, ListSlot, AnySlot
from rasa.shared.core.trackers import DialogueStateTracker
from uuid6 import uuid7

from kairon import Utility
from kairon.chat.actions import KRemoteAction
from kairon.exceptions import AppException
from kairon.shared.data.data_objects import Slots, Rules, MultiflowStories, Responses, BotSettings
from kairon.shared.data.processor import MongoProcessor
from loguru import logger


class AgenticFlow:
mongo_processor = MongoProcessor()
chat_history_client = None

SLOT_TYPE_MAP = {
"text": {
'constructor': TextSlot,
'args': ['name', 'initial_value', 'value_reset_delay', 'influence_conversation', 'mappings']
},
"boolean": {
'constructor': BooleanSlot,
'args': ['name', 'initial_value', 'value_reset_delay', 'influence_conversation']
},
"categorical": {
'constructor': CategoricalSlot,
'args': ['name', 'initial_value', 'value_reset_delay', 'influence_conversation', 'mappings']
},
"float": {
'constructor': FloatSlot,
'args': ['name', 'initial_value', 'value_reset_delay', 'influence_conversation', 'max_value', 'min_value']
},
"list": {
'constructor': ListSlot,
'args': ['name', 'initial_value', 'value_reset_delay', 'influence_conversation']
},
"any": {
'constructor': AnySlot,
'args': ['name', 'initial_value', 'value_reset_delay', 'influence_conversation', 'mappings']
},
}

def __init__(self, bot: str, slot_vals: dict[str,any] = None, sender_id: str = None):
self.bot = bot
self.max_history = 20
self.sender_id = sender_id if sender_id else str(uuid7().hex)
self.bot_settings = BotSettings.objects(bot=self.bot).first()
if not self.bot_settings:
raise AppException(f"Bot [{self.bot}] not found")
self.responses = []
self.errors = []
slots = self.load_slots(slot_vals)
self.input_slot_vals = slot_vals
self.fake_tracker = DialogueStateTracker(sender_id=self.sender_id,
slots=slots,
max_event_history=self.max_history)
endpoint = AgenticFlow.mongo_processor.get_endpoints(
bot, raise_exception=False
)
self.action_endpoint = Utility.get_action_url(endpoint)
self.domain = AgenticFlow.mongo_processor.load_domain(bot)
self.nlg = TemplatedNaturalLanguageGenerator(self.domain.responses)
self.executed_actions = []



def load_slots(self, slot_vals: dict[str,any] = None) -> list:
"""
Load slots for the bot from the database
:param slot_vals: dictionary of slot values
:return: list of slots
"""
try:
if not slot_vals:
slot_vals = {}
slots = []
for slot in Slots.objects(bot=self.bot):
slot_defination = AgenticFlow.SLOT_TYPE_MAP.get(slot.type)
if not slot_defination:
raise ValueError(f"Unknown slot type: {slot.type}")
if val := slot_vals.get(slot.name):
slot.initial_value = val
slot_args = {arg: getattr(slot, arg, None) for arg in slot_defination['args']}
slot_constructor = slot_defination['constructor']
slots.append(slot_constructor(**slot_args))

slots.append(TextSlot(name="agentic_flow_bot_user", initial_value=self.bot_settings.user, value_reset_delay=None, influence_conversation=False, mappings=[]))

return slots
except Exception as e:
logger.error(f"Error in loading slots: {e}")
raise AppException(f"Error in loading slots: {e}")

def load_rule_events(self, name: str) -> tuple:
self.fake_tracker.update(UserUttered(name,intent={'name': name, 'confidence': 1.0}))
try:

if rule := Rules.objects(bot=self.bot, block_name=name).first():
events = [{
'name': event.name,
'type': event.type,
} for event in rule.events]
return events, None
elif multiflow := MultiflowStories.objects(bot=self.bot, block_name=name).first():
events = multiflow.events
event_map, start = AgenticFlow.sanitize_multiflow_events(events)
return event_map, start
else:
raise DoesNotExist(f"[{name}] not found for this bot")
except DoesNotExist as e:
raise AppException(e)

@staticmethod
def sanitize_multiflow_events(events):
"""
convert database multiflow events into usable graph format
:param events: list of events
:return: graph representing dictionary of events, start node id
"""
start = events[0]["connections"][0]["node_id"]
def sanitize_event(ev:dict)-> dict:
node_id = ev['step']['node_id']
node_type = ev['step']['type']
if 'action' in node_type.lower():
node_type = 'action'
node_name = ev['step']['name']
connections = ev['connections']
conn = None
if len(connections) == 1:
conn = {
'type': 'jump',
'node_id': connections[0]['node_id'],
}
elif len(connections) > 1:
conn = {
'type': 'branch',
'criteria': connections[0]['type'],
'name': connections[0]['name'],
}
for c in connections:
conn[str(c['value'])] = c['node_id']

return {
'node_id': node_id,
'type': node_type,
'name': node_name,
'connections': conn,
}
new_graph = {}
for event in events:
e = sanitize_event(event)
new_graph[e['node_id']] = e
return new_graph, start

def evaluate_criteria(self, criteria: str, connections: dict):
if criteria == 'SLOT':
slot_name = connections['name']
slot_value = self.fake_tracker.get_slot(slot_name)
if slot_value and connections.get(slot_value):
return connections.get(slot_value)
else:
self.errors.append(f"Slot [{slot_name}] not set!")
return None

async def execute_rule(self, rule_name: str):
"""
Execute a rule for the bot
:param rule_name: name of the rule to be executed
:return: list of responses, list of errors
"""
self.responses = []
self.errors = []
self.executed_actions = []
events, node_id = self.load_rule_events(rule_name)
if not node_id:
for event in events:
await self.execute_event(event)
else:
while node_id:
event = events[node_id]
await self.execute_event(event)
if event.get('connections'):
jump_type = event['connections'].get('type')
if jump_type == 'jump':
node_id = event['connections']['node_id']
elif jump_type == 'branch':
criteria = event['connections']['criteria']
node_id = self.evaluate_criteria(criteria, event['connections'])
else:
node_id = None
self.log_chat_history(rule_name)
return self.responses, self.errors

async def execute_event(self, event: dict):
if event['type'] == 'action':
if event['name'] == "...":
return
elif event['name'].startswith("utter_"):
self.responses.append(self.get_utterance_response(event['name']))
self.executed_actions.append(event['name'])
return
try:
action = KRemoteAction(
name=event['name'],
action_endpoint=self.action_endpoint
)
new_events_for_tracker = await action.run(output_channel=None,
nlg=self.nlg,
tracker=self.fake_tracker,
domain=self.domain)
self.process_tracker_events(new_events_for_tracker)
self.fake_tracker.events.extend(new_events_for_tracker)
self.executed_actions.append(event['name'])
except Exception as e:
logger.error(f"Error in executing action [{event['name']}]: {e}")
raise AppException(f"Error in executing action [{event['name']}]")

def process_tracker_events(self, events:list):
for event in events:
if event.type_name == 'slot':
self.fake_tracker.slots[event.key].value = event.value
elif event.type_name == 'action':
self.fake_tracker.latest_action_name = event.key
elif event.type_name == 'user':
self.fake_tracker.latest_message = event.key
elif event.type_name == 'bot':
if event.text:
self.responses.append({'text': event.text})
else:
self.responses.append({'custom': event.data})

def get_non_empty_slots(self, all_slots:bool = False) -> dict:
slots = {}
for name in self.fake_tracker.slots.keys():
slot = self.fake_tracker.slots[name]
if slot.value or all_slots:
slots[name] = slot.value
return slots

def get_utterance_response(self, utterance: str):
responses = Responses.objects(bot=self.bot, name=utterance)
if not responses:
raise AppException(f"No response found for [{utterance}]")
responses_list = []
for response in responses:
if response.text:
responses_list.append({'text': response.text.text})
elif response.custom:
responses_list.append({'custom': response.custom.custom})

random_response = secrets.choice(responses_list)
if random_response.get('text'):
slot_vals = self.get_non_empty_slots(True)
random_response['text'] = random_response['text'].format(**slot_vals)

return random_response

def log_chat_history(self, rule_name: str):
if not AgenticFlow.chat_history_client:
AgenticFlow.chat_history_client = MongoClient(host=Utility.environment["tracker"]["url"])
db = AgenticFlow.chat_history_client.get_database()
conversations = db.get_collection(self.bot)
data = {
"type": "flattened",
"sender_id": self.sender_id,
"conversation_id": str(uuid7().hex),
"data": {
"user_input": rule_name,
"bot_response": self.responses,
"slots": self.get_non_empty_slots(),
"errors": self.errors,
"input_slot_vals": self.input_slot_vals,
"action": self.executed_actions
},
"timestamp": time.time(),
"tag": "agentic_flow"
}
conversations.insert_one(data)
14 changes: 13 additions & 1 deletion kairon/shared/chat/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from kairon.shared.chat.broadcast.constants import MessageBroadcastType
from kairon.shared.utils import Utility

from typing import List, Text, Dict
from typing import List, Text, Dict, Any
from kairon.exceptions import AppException


Expand Down Expand Up @@ -117,3 +117,15 @@ def validate_data(cls, v, values, **kwargs):
if Utility.check_empty_string(v):
raise ValueError("data cannot be empty!")
return v

class AgenticFlowRequest(BaseModel):
name: str
slot_vals: Dict[str, Any] = None
sender_id: str = None

@validator("name")
def validate_name(cls, v, values, **kwargs):
if Utility.check_empty_string(v):
raise ValueError("name cannot be empty!")
return v

1 change: 0 additions & 1 deletion kairon/shared/llm/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ async def __get_completion(self, messages, hyperparameters, user, **kwargs):
request_method="POST",
request_body=body,
timeout=timeout)

logging.info(f"LLM request completed in {elapsed_time} for bot: {self.bot}")
if status_code not in [200, 201, 202, 203, 204]:
raise Exception(HTTPStatus(status_code).phrase)
Expand Down
2 changes: 1 addition & 1 deletion system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ multilingual:

tracker:
type: ${TRACKER_TYPE:"static"}
url: ${TRACKER_URL:"mongodb://localhost:27017/rasa"}
url: ${TRACKER_URL:"mongodb://localhost:27017/test_conversations"}
collection: ${TRACKER_COLLECTION:"conversations"}
authentication:
token: ${AUTH_TOKEN:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtleSI6ImI5ZDAxODYxMzkyNzU3YzY2ZGFhZjFmMjE0MjY4ZTI3MzlhNWJhYWM5MzUwNzFkMDZlMmVhNzFhNjZkYzViY2QifQ.eyJzdWIiOiJrYWlyb24uZGlnaXRlLmNvbSIsIm5hbWUiOiJrYWlyb24iLCJpYXQiOjE1MTYyMzkwMjJ9.3SOONqzoeX1tnMnUH5BVaOtZ7mElgwxyD1xpPvTQTrs"}
Expand Down
Loading

0 comments on commit d71acd1

Please sign in to comment.