diff --git a/pyproject.toml b/pyproject.toml index 11c357f823fd..abe737d34738 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ repository = "https://github.com/rasahq/rasa" documentation = "https://rasa.com/docs" classifiers = [ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Topic :: Software Development :: Libraries",] keywords = [ "nlp", "machine-learning", "machine-learning-library", "bot", "bots", "botkit", "rasa conversational-agents", "conversational-ai", "chatbot", "chatbot-framework", "bot-framework",] -include = [ "LICENSE.txt", "README.md", "rasa/shared/core/training_data/visualization.html", "rasa/cli/default_config.yml", "rasa/shared/importers/*", "rasa/utils/schemas/*", "rasa/keys", "rasa/core/channels/chat.html", "rasa/core/policies/detectors/prompt_sensitive_topic.jinja2"] +include = [ "LICENSE.txt", "README.md", "rasa/shared/core/training_data/visualization.html", "rasa/cli/default_config.yml", "rasa/shared/importers/*", "rasa/utils/schemas/*", "rasa/keys", "rasa/core/channels/chat.html", "rasa/core/policies/detectors/prompt_sensitive_topic.jinja2", "rasa/nlu/classifiers/flow_prompt_template.jinja2"] readme = "README.md" license = "Apache-2.0" [[tool.poetry.source]] diff --git a/rasa/core/actions/action.py b/rasa/core/actions/action.py index 02a64f278044..f444b6279fba 100644 --- a/rasa/core/actions/action.py +++ b/rasa/core/actions/action.py @@ -98,7 +98,6 @@ def default_actions(action_endpoint: Optional[EndpointConfig] = None) -> List["Action"]: """List default actions.""" from rasa.core.actions.two_stage_fallback import TwoStageFallbackAction - from rasa.core.actions.flows import ActionFlowContinueInterupted return [ ActionListen(), @@ -114,7 +113,6 @@ def default_actions(action_endpoint: Optional[EndpointConfig] = None) -> List["A ActionSendText(), ActionBack(), ActionExtractSlots(action_endpoint), - ActionFlowContinueInterupted(), ] @@ -314,6 +312,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Simple run implementation uttering a (hopefully defined) response.""" response_ids_for_response = domain.response_ids_per_response.get( @@ -372,6 +371,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action (see parent class for full docstring).""" message = {"text": self.action_text} @@ -460,6 +460,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Query the appropriate response and create a bot utterance with that.""" latest_message = tracker.latest_message @@ -520,6 +521,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" # only utter the response if it is available @@ -545,6 +547,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" return [] @@ -570,6 +573,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" # only utter the response if it is available @@ -606,6 +610,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" _events: List[Event] = [SessionStarted()] @@ -635,6 +640,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" # only utter the response if it is available @@ -655,6 +661,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" return [ActiveLoop(None), SlotSet(REQUESTED_SLOT, None)] @@ -769,6 +776,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" json_body = self._action_call_format(tracker, domain) @@ -895,6 +903,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" from rasa.core.policies.two_stage_fallback import has_user_rephrased @@ -925,6 +934,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" return [] @@ -1005,6 +1015,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" latest_message = tracker.latest_message @@ -1273,6 +1284,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Runs action. Please see parent class for the full docstring.""" slot_events: List[Event] = [] @@ -1354,6 +1366,16 @@ def extract_slot_value_from_predefined_mapping( tracker: "DialogueStateTracker", ) -> List[Any]: """Extracts slot value if slot has an applicable predefined mapping.""" + + if tracker.has_bot_message_after_latest_user_message(): + # TODO: this needs further validation - not sure if this breaks something!!! + + # If the bot sent a message after the user sent a message, we can't + # extract any slots from the user message. We assume that the user + # message was already processed by the bot and the slot value was + # already extracted (e.g. for a prior form slot). + return [] + should_fill_entity_slot = ( mapping_type == SlotMappingType.FROM_ENTITY and SlotMapping.entity_is_desired(mapping, tracker) diff --git a/rasa/core/actions/flows.py b/rasa/core/actions/flows.py index 3fc98989da0c..f16ffa628bd4 100644 --- a/rasa/core/actions/flows.py +++ b/rasa/core/actions/flows.py @@ -5,10 +5,7 @@ from rasa.core.policies.flow_policy import FlowStack, FlowStackFrame, StackFrameType from rasa.shared.constants import FLOW_PREFIX -from rasa.shared.core.constants import ( - ACTION_FLOW_CONTINUE_INERRUPTED_NAME, - FLOW_STACK_SLOT, -) +from rasa.shared.core.constants import FLOW_STACK_SLOT from rasa.shared.core.domain import Domain from rasa.shared.core.events import ( ActiveLoop, @@ -47,10 +44,13 @@ async def run( metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: """Trigger the flow.""" - stack = FlowStack.from_tracker(tracker) if tracker.active_loop_name and not stack.is_empty(): frame_type = StackFrameType.INTERRUPT + elif self._flow_name == "pattern_continue_interrupted": + frame_type = StackFrameType.RESUME + elif self._flow_name == "pattern_correction": + frame_type = StackFrameType.CORRECTION else: frame_type = StackFrameType.REGULAR @@ -61,43 +61,15 @@ async def run( ) ) - events: List[Event] = [SlotSet(FLOW_STACK_SLOT, stack.as_dict())] + slots_to_be_set = metadata.get("slots", {}) if metadata else {} + slot_set_events: List[Event] = [ + SlotSet(key, value) for key, value in slots_to_be_set.items() + ] + + events: List[Event] = [ + SlotSet(FLOW_STACK_SLOT, stack.as_dict()) + ] + slot_set_events if tracker.active_loop_name: events.append(ActiveLoop(None)) return events - - -UTTER_FLOW_CONTINUE_INTERRUPTED = "utter_flow_continue_interrupted" - - -class ActionFlowContinueInterupted(action.Action): - """Action triggered when an interrupted flow is continued.""" - - def name(self) -> Text: - """Return the flow name.""" - return ACTION_FLOW_CONTINUE_INERRUPTED_NAME - - async def run( - self, - output_channel: "OutputChannel", - nlg: "NaturalLanguageGenerator", - tracker: "DialogueStateTracker", - domain: "Domain", - metadata: Optional[Dict[Text, Any]] = None, - ) -> List[Event]: - """Trigger the flow.""" - - fallback = {"text": "Let's return to the previous topic."} - flow_name = metadata.get("flow_name") if metadata else None - - generated = await nlg.generate( - UTTER_FLOW_CONTINUE_INTERRUPTED, - tracker, - output_channel.name(), - flow_name=flow_name, - ) - - utterance: Event = action.create_bot_utterance(generated or fallback) - - return [utterance] diff --git a/rasa/core/actions/loops.py b/rasa/core/actions/loops.py index ccaa704a7cb0..ea6e3868978b 100644 --- a/rasa/core/actions/loops.py +++ b/rasa/core/actions/loops.py @@ -1,5 +1,5 @@ from abc import ABC -from typing import List, TYPE_CHECKING +from typing import Any, Dict, List, TYPE_CHECKING, Optional, Text from rasa.core.actions.action import Action from rasa.shared.core.events import Event, ActiveLoop @@ -18,6 +18,7 @@ async def run( nlg: "NaturalLanguageGenerator", tracker: "DialogueStateTracker", domain: "Domain", + metadata: Optional[Dict[Text, Any]] = None, ) -> List[Event]: events: List[Event] = [] diff --git a/rasa/core/policies/default_flows.yml b/rasa/core/policies/default_flows.yml new file mode 100644 index 000000000000..5d3f49a06c8b --- /dev/null +++ b/rasa/core/policies/default_flows.yml @@ -0,0 +1,68 @@ +responses: + utter_flow_continue_interrupted: + - text: Let's continue with the topic {rasa_previous_flow}. + metadata: + rephrase: True + + utter_ask_confirm_correction: + - text: "Do you want to update your information?" + buttons: + - payload: /affirm + title: Yes + - payload: /deny + title: No, please keep the previous information + metadata: + rephrase: True + + utter_corrected_previous_input: + - text: "Ok, I corrected the previous input." + metadata: + rephrase: True + + utter_not_corrected_previous_input: + - text: "Ok, I did not correct the previous input." + metadata: + rephrase: True + +slots: + confirm_correction: + type: text + mappings: + - intent: affirm + type: from_intent + value: "True" + conditions: + - active_loop: question_confirm_correction + - intent: deny + type: from_intent + value: "False" + conditions: + - active_loop: question_confirm_correction + +flows: + pattern_continue_interrupted: + description: A meta flow that should be started to continue an interrupted flow. + + steps: + - id: "0" + action: utter_flow_continue_interrupted + + pattern_correction: + description: A meta flow that should be started to correct a previous user input. + + steps: + - id: "0" + question: confirm_correction + next: + - if: confirm_correction + then: "1" + - else: "2" + - id: "1" + action: utter_corrected_previous_input + - id: "2" + set_slots: + - rasa_corrected_slots: None + next: "3" + - id: "3" + action: utter_not_corrected_previous_input + \ No newline at end of file diff --git a/rasa/core/policies/detectors/sensitive_topic.py b/rasa/core/policies/detectors/sensitive_topic.py index 12bb20e89f12..6b8b25168764 100644 --- a/rasa/core/policies/detectors/sensitive_topic.py +++ b/rasa/core/policies/detectors/sensitive_topic.py @@ -43,7 +43,8 @@ def check(self, user_msg: Text) -> bool: user_msg: user message to check Returns: - True if the message contains sensitive topic, False otherwise""" + True if the message contains sensitive topic, False otherwise + """ ... def action(self) -> Text: @@ -104,7 +105,7 @@ def check(self, user_msg: Optional[Text]) -> bool: if self._use_stub: return self._stub.check(user_msg) try: - resp = openai.Completion.create( + resp = openai.Completion.create( # type: ignore[no-untyped-call] model=self._model_name, prompt=self._make_prompt(user_msg), temperature=0.0, @@ -125,7 +126,8 @@ def _make_prompt(self, user_message: Text) -> Text: def _parse_response(text: Text) -> bool: """Parse response from OpenAI ChatGPT model. - Expected responses are "YES" and "NO" (case-insensitive).""" + Expected responses are "YES" and "NO" (case-insensitive). + """ return "YES" in text.upper() diff --git a/rasa/core/policies/flow_policy.py b/rasa/core/policies/flow_policy.py index df80861d3915..e22c5b11b1f8 100644 --- a/rasa/core/policies/flow_policy.py +++ b/rasa/core/policies/flow_policy.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from enum import Enum from typing import Any, Dict, Text, List, Optional, Union -import logging from rasa.core.constants import ( DEFAULT_POLICY_PRIORITY, @@ -12,13 +11,19 @@ ) from pypred import Predicate from rasa.shared.constants import FLOW_PREFIX -from rasa.shared.nlu.constants import ENTITY_ATTRIBUTE_TYPE, INTENT_NAME_KEY +from rasa.shared.nlu.constants import ( + ACTION_NAME, + ENTITY_ATTRIBUTE_TYPE, + INTENT_NAME_KEY, + CORRECTION_INTENT, +) from rasa.shared.core.constants import ( - ACTION_FLOW_CONTINUE_INERRUPTED_NAME, ACTION_LISTEN_NAME, + CORRECTED_SLOTS_SLOT, FLOW_STACK_SLOT, + PREVIOUS_FLOW_SLOT, ) -from rasa.shared.core.events import ActiveLoop, Event, SlotSet +from rasa.shared.core.events import ActiveLoop, Event, SlotSet, UserUttered from rasa.shared.core.flows.flow import ( END_STEP, START_STEP, @@ -49,9 +54,9 @@ DialogueStateTracker, ) from rasa.core.policies.detectors import SensitiveTopicDetector +import structlog - -logger = logging.getLogger(__name__) +structlogger = structlog.get_logger() SENSITIVE_TOPIC_DETECTOR_CONFIG_KEY = "sensitive_topic_detector" @@ -112,7 +117,9 @@ def __init__( # if the detector is configured, we need to load it full_config = SensitiveTopicDetector.get_default_config() full_config.update(detector_config) - self._sensitive_topic_detector = SensitiveTopicDetector(full_config) + self._sensitive_topic_detector: Optional[ + SensitiveTopicDetector + ] = SensitiveTopicDetector(full_config) else: self._sensitive_topic_detector = None @@ -139,22 +146,6 @@ def train( # or do some preprocessing here. return self.resource - @staticmethod - def _is_first_prediction_after_user_message(tracker: DialogueStateTracker) -> bool: - """Checks whether the tracker ends with an action listen. - - If the tracker ends with an action listen, it means that we've just received - a user message. - - Args: - tracker: The tracker. - - Returns: - `True` if the tracker is the first one after a user message, `False` - otherwise. - """ - return tracker.latest_action_name == ACTION_LISTEN_NAME - def predict_action_probabilities( self, tracker: DialogueStateTracker, @@ -179,7 +170,7 @@ def predict_action_probabilities( predicted_action = None if ( self._sensitive_topic_detector - and self._is_first_prediction_after_user_message(tracker) + and not tracker.has_action_after_latest_user_message() and (latest_message := tracker.latest_message) ): if self._sensitive_topic_detector.check(latest_message.text): @@ -188,32 +179,22 @@ def predict_action_probabilities( # sure that the input isn't used in any following flow # steps. At the same time, we can't completely skip flows # as we want to guide the user to the next step of the flow. - logger.info( - "Sensitive topic detected, predicting action %s", predicted_action + structlogger.info( + "sensitive.topic.detected", predicted_action=predicted_action ) else: - logger.info("No sensitive topic detected: %s", latest_message.text) + structlogger.debug( + "sensitive.topic.notdetected", message=latest_message.text + ) # if detector predicted an action, we don't want to predict a flow if predicted_action is not None: return self._create_prediction_result(predicted_action, domain, 1.0, []) - executor = FlowExecutor.from_tracker(tracker, flows or FlowsList([])) - if tracker.active_loop: - # we are in a loop - likely answering a question - we need to check - # if the user responded with a trigger intent for another flow rather - # than answering the question - prediction = executor.consider_flow_switch(tracker) - return self._create_prediction_result( - action_name=prediction.action_name, - domain=domain, - score=prediction.score, - events=[], - action_metadata=prediction.metadata, - ) + executor = FlowExecutor.from_tracker(tracker, flows or FlowsList([]), domain) # create executor and predict next action - prediction = executor.advance_flows(tracker, domain) + prediction = executor.advance_flows(tracker) return self._create_prediction_result( prediction.action_name, domain, @@ -339,11 +320,12 @@ def top_flow_step(self, flows: FlowsList) -> Optional[FlowStep]: """Get the current flow step. Returns: - The current flow step or `None` if no flow is active.""" + The current flow step or `None` if no flow is active. + """ if not (top := self.top()) or not (top_flow := self.top_flow(flows)): return None - return top_flow.step_for_id(top.step_id) + return top_flow.step_by_id(top.step_id) def is_empty(self) -> bool: """Checks if the stack is empty. @@ -389,7 +371,16 @@ class StackFrameType(str, Enum): LINK = "link" """The frame is a link frame. + This means that the previous flow linked to this flow.""" + RESUME = "resume" + """The frame is a resume frame. + + This means that the previous flow was resumed by this flow.""" + CORRECTION = "correction" + """The frame is a correction frame. + + This means that the previous flow was corrected by this flow.""" REGULAR = "regular" """The frame is a regular frame. @@ -406,6 +397,10 @@ def from_str(typ: Optional[Text]) -> "StackFrameType": return StackFrameType.LINK elif typ == StackFrameType.REGULAR.value: return StackFrameType.REGULAR + elif typ == StackFrameType.RESUME.value: + return StackFrameType.RESUME + elif typ == StackFrameType.CORRECTION.value: + return StackFrameType.CORRECTION else: raise NotImplementedError @@ -471,7 +466,9 @@ def __repr__(self) -> Text: class FlowExecutor: """Executes a flow.""" - def __init__(self, flow_stack: FlowStack, all_flows: FlowsList) -> None: + def __init__( + self, flow_stack: FlowStack, all_flows: FlowsList, domain: Domain + ) -> None: """Initializes the `FlowExecutor`. Args: @@ -480,9 +477,12 @@ def __init__(self, flow_stack: FlowStack, all_flows: FlowsList) -> None: """ self.flow_stack = flow_stack self.all_flows = all_flows + self.domain = domain @staticmethod - def from_tracker(tracker: DialogueStateTracker, flows: FlowsList) -> FlowExecutor: + def from_tracker( + tracker: DialogueStateTracker, flows: FlowsList, domain: Domain + ) -> FlowExecutor: """Creates a `FlowExecutor` from a tracker. Args: @@ -490,16 +490,16 @@ def from_tracker(tracker: DialogueStateTracker, flows: FlowsList) -> FlowExecuto flows: The flows to use. Returns: - The created `FlowExecutor`.""" + The created `FlowExecutor`. + """ flow_stack = FlowStack.from_tracker(tracker) - return FlowExecutor(flow_stack, flows or FlowsList([])) + return FlowExecutor(flow_stack, flows or FlowsList([]), domain) def find_startable_flow(self, tracker: DialogueStateTracker) -> Optional[Flow]: """Finds a flow which can be started. Args: tracker: The tracker containing the conversation history up to now. - domain: The model's domain. flows: The flows to use. Returns: @@ -525,9 +525,8 @@ def find_startable_flow(self, tracker: DialogueStateTracker) -> Optional[Flow]: return flow return None - @staticmethod def is_condition_satisfied( - predicate: Text, domain: Domain, tracker: "DialogueStateTracker" + self, predicate: Text, tracker: "DialogueStateTracker" ) -> bool: """Evaluate a predicate condition.""" @@ -551,14 +550,17 @@ def get_value( return initial_value text_slots = dict( - {slot.name: get_value(tracker.get_slot(slot.name)) for slot in domain.slots} + { + slot.name: get_value(tracker.get_slot(slot.name)) + for slot in self.domain.slots + } ) p = Predicate(predicate) evaluation, _ = p.analyze(text_slots) return evaluation def _select_next_step_id( - self, current: FlowStep, domain: Domain, tracker: "DialogueStateTracker" + self, current: FlowStep, tracker: "DialogueStateTracker" ) -> Optional[Text]: """Selects the next step id based on the current step.""" next = current.next @@ -568,7 +570,7 @@ def _select_next_step_id( # evaluate if conditions for link in next.links: if isinstance(link, IfFlowLink) and link.condition: - if self.is_condition_satisfied(link.condition, domain, tracker): + if self.is_condition_satisfied(link.condition, tracker): return link.target # evaluate else condition @@ -592,20 +594,18 @@ def _select_next_step_id( def _select_next_step( self, tracker: "DialogueStateTracker", - domain: Domain, current_step: FlowStep, - flow_id: Text, + flow: Flow, ) -> Optional[FlowStep]: """Get the next step to execute.""" - next_id = self._select_next_step_id(current_step, domain, tracker) - if next_id is None: - return None - - return self.all_flows.step_by_id(next_id, flow_id) + next_id = self._select_next_step_id(current_step, tracker) + step = flow.step_by_id(next_id) + structlogger.debug("flow.step.next", next=step, current=current_step, flow=flow) + return step - def _slot_for_question(self, question: Text, domain: Domain) -> Slot: + def _slot_for_question(self, question: Text) -> Slot: """Find the slot for a question.""" - for slot in domain.slots: + for slot in self.domain.slots: if slot.name == question: return slot else: @@ -622,6 +622,17 @@ def _is_step_completed( else: return True + def _find_earliest_updated_question( + self, current_step: FlowStep, flow: Flow, updated_slots: List[Text] + ) -> Optional[FlowStep]: + """Find the question that was updated.""" + asked_question_steps = flow.previously_asked_questions(current_step.id) + + for question_step in reversed(asked_question_steps): + if question_step.question in updated_slots: + return question_step + return None + def consider_flow_switch(self, tracker: DialogueStateTracker) -> ActionPrediction: """Consider switching to a new flow. @@ -629,19 +640,18 @@ def consider_flow_switch(self, tracker: DialogueStateTracker) -> ActionPredictio tracker: The tracker to get the next action for. Returns: - The predicted action and the events to run.""" + The predicted action and the events to run. + """ if new_flow := self.find_startable_flow(tracker): # there are flows available, but we are not in a flow # it looks like we can start a flow, so we'll predict the trigger action - logger.debug(f"Found startable flow: {new_flow.id}") + structlogger.debug("flow.startable", flow_id=new_flow.id) return ActionPrediction(FLOW_PREFIX + new_flow.id, 1.0) else: - logger.debug("No startable flow found.") + structlogger.debug("flow.nostartable") return ActionPrediction(None, 0.0) - def advance_flows( - self, tracker: DialogueStateTracker, domain: Domain - ) -> ActionPrediction: + def advance_flows(self, tracker: DialogueStateTracker) -> ActionPrediction: """Advance the flows. Either start a new flow or advance the current flow. @@ -651,8 +661,8 @@ def advance_flows( domain: The domain to get the next action for. Returns: - The predicted action and the events to run.""" - + The predicted action and the events to run. + """ prediction = self.consider_flow_switch(tracker) if prediction.action_name: @@ -662,7 +672,7 @@ def advance_flows( # if there are no flows, there is nothing to do return ActionPrediction(None, 0.0) else: - prediction = self._select_next_action(tracker, domain) + prediction = self._select_next_action(tracker) if FlowStack.from_tracker(tracker).as_dict() != self.flow_stack.as_dict(): # we need to update the flow stack to persist the state of the executor if not prediction.events: @@ -675,10 +685,50 @@ def advance_flows( ) return prediction + def _slot_sets_after_latest_message( + self, tracker: DialogueStateTracker + ) -> List[SlotSet]: + """Get all slot sets after the latest message.""" + if not tracker.latest_message: + return [] + + slot_sets = [] + + for event in reversed(tracker.applied_events()): + if isinstance(event, UserUttered): + break + elif isinstance(event, SlotSet): + slot_sets.append(event) + return slot_sets + + def _is_correction(self, tracker: DialogueStateTracker) -> bool: + return ( + tracker.latest_action_name == ACTION_LISTEN_NAME + and tracker.latest_message is not None + and tracker.latest_message.intent.get("name") == CORRECTION_INTENT + ) + + def _correct_flow_position( + self, + newly_set_slots: List[Text], + step: FlowStep, + flow: Flow, + tracker: DialogueStateTracker, + ) -> None: + reset_point = self._find_earliest_updated_question(step, flow, newly_set_slots) + + if reset_point: + structlogger.info( + "flow.reset.slotupdate", + step=step, + flow=flow, + reset_point=reset_point.id, + ) + self.flow_stack.advance_top_flow(reset_point.id) + def _select_next_action( self, tracker: DialogueStateTracker, - domain: Domain, ) -> ActionPrediction: """Select the next action to execute. @@ -693,10 +743,13 @@ def _select_next_action( Returns: The next action to execute, the events that should be applied to the - tracker and the confidence of the prediction.""" - + tracker and the confidence of the prediction. + """ predicted_action: Optional[ActionPrediction] = None - gathered_events = [] + + tracker = tracker.copy() + + number_of_initial_events = len(tracker.events) while not predicted_action or predicted_action.score == 0.0: if not (current_flow := self.flow_stack.top_flow(self.all_flows)): @@ -713,21 +766,20 @@ def _select_next_action( "to __start__ if it ended it should be popped from the stack." ) - if not self._is_step_completed(previous_step, tracker): - # TODO: figure out - raise FlowException( - f"Not quite sure what to do here yet. {previous_step}" - ) - - current_step = self._select_next_step( - tracker, domain, previous_step, current_flow.id + structlogger.debug("flow.action.loop", previous_step) + predicted_action = self._wrap_up_previous_step( + current_flow, previous_step, tracker ) - if current_step is None: - frame = self.flow_stack.pop() - if frame.frame_type == StackFrameType.INTERRUPT: - # if the previous frame got interrupted, we need to run the step - # that got interrupted again - current_step = self.flow_stack.top_flow_step(self.all_flows) + tracker.update_with_events(predicted_action.events or [], self.domain) + + if predicted_action.action_name: + # if the previous step predicted an action, we'll stop here + # the step is not completed yet and we need to predict the + # action first before we can try again to wrap up this step and + # advance to the next one + break + + current_step = self._select_next_step(tracker, previous_step, current_flow) if current_step: # this can't be an else, because the previous if might change @@ -735,8 +787,9 @@ def _select_next_action( self.flow_stack.advance_top_flow(current_step.id) predicted_action = self._run_step(current_flow, current_step, tracker) - gathered_events.extend(predicted_action.events or []) + tracker.update_with_events(predicted_action.events or [], self.domain) + gathered_events = list(tracker.events)[number_of_initial_events:] predicted_action.events = gathered_events return predicted_action @@ -753,6 +806,76 @@ def _reset_scoped_slots( events.append(SlotSet(step.question, initial_value)) return events + @staticmethod + def _predict_question_loop( + tracker: DialogueStateTracker, loop_name: Text + ) -> Optional[Text]: + + is_finished = ( + tracker.latest_action + and tracker.latest_action.get(ACTION_NAME) == loop_name + and not tracker.active_loop + ) + + if is_finished: + return None + + active_loop_rejected = tracker.is_active_loop_rejected + should_predict_loop = ( + not active_loop_rejected + and tracker.latest_action + and tracker.latest_action.get(ACTION_NAME) != loop_name + ) + + if should_predict_loop: + structlogger.debug("flow.question.loop", loop=loop_name) + return loop_name + else: + structlogger.debug("flow.question.noloop") + return ACTION_LISTEN_NAME + + def _wrap_up_previous_step( + self, + flow: Flow, + step: FlowStep, + tracker: DialogueStateTracker, + ) -> ActionPrediction: + """Try to wrap up the previous step. + + Args: + current_flow: The current flow. + step: The previous step. + tracker: The tracker to run the step on. + + Returns: + The predicted action and the events to run. + """ + structlogger.debug("flow.step.wrapup", step=step, flow=flow) + if isinstance(step, QuestionFlowStep): + if self._is_correction(tracker): + updated_slots = self._slot_sets_after_latest_message(tracker) + return ActionPrediction( + FLOW_PREFIX + "pattern_correction", + 1.0, + metadata={ + "slots": { + CORRECTED_SLOTS_SLOT: [s.as_dict() for s in updated_slots] + } + }, + ) + # the question is only finished once the slot is set and the loop + # is finished + loop_name = "question_" + step.question + action_name = self._predict_question_loop(tracker, loop_name) + + if action_name: + # loop is not yet done + return ActionPrediction(action_name, 1.0) + else: + return ActionPrediction(None, 0.0) + else: + return ActionPrediction(None, 0.0) + def _run_step( self, flow: Flow, @@ -774,23 +897,28 @@ def _run_step( tracker: The tracker to run the step on. Returns: - A tuple of the predicted action and a list of events.""" + A tuple of the predicted action and a list of events. + """ if isinstance(step, QuestionFlowStep): + structlogger.debug("flow.step.run.question", step=step, flow=flow) slot = tracker.slots.get(step.question, None) initial_value = slot.initial_value if slot else None - if step.skip_if_filled and slot.value != initial_value: + slot_value = slot.value if slot else None + if step.skip_if_filled and slot_value != initial_value: return ActionPrediction(None, 0.0) question_action = ActionPrediction("question_" + step.question, 1.0) - if slot.value != initial_value: + if slot_value != initial_value: question_action.events = [SlotSet(step.question, initial_value)] return question_action elif isinstance(step, ActionFlowStep): + structlogger.debug("flow.step.run.action", step=step, flow=flow) if not step.action: raise FlowException(f"Action not specified for step {step}") return ActionPrediction(step.action, 1.0) elif isinstance(step, LinkFlowStep): + structlogger.debug("flow.step.run.link", step=step, flow=flow) self.flow_stack.push( FlowStackFrame( flow_id=step.link, @@ -803,6 +931,7 @@ def _run_step( else: return ActionPrediction(None, 0.0) elif isinstance(step, SetSlotsFlowStep): + structlogger.debug("flow.step.run.slot", step=step, flow=flow) return ActionPrediction( None, 0.0, @@ -813,21 +942,41 @@ def _run_step( elif isinstance(step, EndFlowStep): # this is the end of the flow, so we'll pop it from the stack events = self._reset_scoped_slots(flow, tracker) - if len(self.flow_stack.frames) >= 2: - previous_frame = self.flow_stack.frames[-2] - current_frame = self.flow_stack.frames[-1] - + structlogger.debug("flow.step.run.flowend", flow=flow) + if current_frame := self.flow_stack.pop(): + previous_flow = self.flow_stack.top_flow(self.all_flows) + previous_flow_step = self.flow_stack.top_flow_step(self.all_flows) if current_frame.frame_type == StackFrameType.INTERRUPT: # get stack frame that is below the current one and which will # be continued now that this one has ended. - previous_flow = self.all_flows.flow_by_id(previous_frame.flow_id) - previous_flow_name = previous_flow.name if previous_flow else None + previous_flow_name = ( + previous_flow.name or previous_flow.id + if previous_flow + else None + ) + return ActionPrediction( - ACTION_FLOW_CONTINUE_INERRUPTED_NAME, + FLOW_PREFIX + "pattern_continue_interrupted", 1.0, - metadata={"flow_name": previous_flow_name}, + metadata={"slots": {PREVIOUS_FLOW_SLOT: previous_flow_name}}, events=events, ) + elif ( + previous_flow + and previous_flow_step + and current_frame.frame_type == StackFrameType.CORRECTION + ): + # TODO: we need to figure out how to actually + # "undo" the changed slots + corrected_slots = tracker.get_slot(CORRECTED_SLOTS_SLOT) + if corrected_slots: + self._correct_flow_position( + corrected_slots, previous_flow_step, previous_flow, tracker + ) + else: + # TODO: we need to figure out how to actually "undo" the + # changed slots + pass return ActionPrediction(None, 0.0, events=events) else: raise FlowException(f"Unknown flow step type {type(step)}") diff --git a/rasa/core/policies/test_flow_policy.py b/rasa/core/policies/test_flow_policy.py index 9b979a3c9f3e..4bfcf918237b 100644 --- a/rasa/core/policies/test_flow_policy.py +++ b/rasa/core/policies/test_flow_policy.py @@ -18,7 +18,7 @@ def _run_flow_until_listen( events = [] actions = [] while True: - action_prediction = executor.advance_flows(tracker, domain) + action_prediction = executor.advance_flows(tracker) if not action_prediction: break @@ -60,7 +60,7 @@ def test_select_next_action() -> None: ], ) domain = Domain.empty() - executor = FlowExecutor.from_tracker(tracker, flows) + executor = FlowExecutor.from_tracker(tracker, flows, domain) actions, events = _run_flow_until_listen(executor, tracker, domain) diff --git a/rasa/core/processor.py b/rasa/core/processor.py index 2876a4a098a3..7d23c983b2c6 100644 --- a/rasa/core/processor.py +++ b/rasa/core/processor.py @@ -713,8 +713,6 @@ async def parse_message( if self.http_interpreter: parse_data = await self.http_interpreter.parse(message) else: - if tracker is None: - tracker = DialogueStateTracker.from_events(message.sender_id, []) parse_data = self._parse_message_with_graph( message, tracker, only_output_properties ) @@ -733,7 +731,7 @@ async def parse_message( def _parse_message_with_graph( self, message: UserMessage, - tracker: DialogueStateTracker, + tracker: Optional[DialogueStateTracker] = None, only_output_properties: bool = True, ) -> Dict[Text, Any]: """Interprets the passed message. diff --git a/rasa/engine/recipes/default_components.py b/rasa/engine/recipes/default_components.py index 5ddbea3e45d0..2b14f466aa75 100644 --- a/rasa/engine/recipes/default_components.py +++ b/rasa/engine/recipes/default_components.py @@ -1,6 +1,7 @@ from rasa.nlu.classifiers.diet_classifier import DIETClassifier from rasa.nlu.classifiers.fallback_classifier import FallbackClassifier from rasa.nlu.classifiers.keyword_intent_classifier import KeywordIntentClassifier +from rasa.nlu.classifiers.llm_flow_classifier import LLMFlowClassifier from rasa.nlu.classifiers.logistic_regression_classifier import ( LogisticRegressionClassifier, ) @@ -45,6 +46,7 @@ MitieIntentClassifier, SklearnIntentClassifier, LogisticRegressionClassifier, + LLMFlowClassifier, # Response Selectors ResponseSelector, # Message Entity Extractors diff --git a/rasa/graph_components/validators/default_recipe_validator.py b/rasa/graph_components/validators/default_recipe_validator.py index 1d2aab15f702..9aa18c32794a 100644 --- a/rasa/graph_components/validators/default_recipe_validator.py +++ b/rasa/graph_components/validators/default_recipe_validator.py @@ -31,7 +31,7 @@ DOCS_URL_POLICIES, DOCS_URL_RULES, ) -from rasa.shared.core.domain import Domain, InvalidDomain +from rasa.shared.core.domain import Domain from rasa.shared.core.constants import ( ACTION_BACK_NAME, ACTION_RESTART_NAME, @@ -383,7 +383,7 @@ def _validate_core(self, story_graph: StoryGraph, domain: Domain) -> None: if not self._policy_schema_nodes: return self._warn_if_no_rule_policy_is_contained() - self._raise_if_domain_contains_form_names_but_no_rule_policy_given(domain) + self._warn_if_domain_contains_form_names_but_no_rule_policy_given(domain) self._raise_if_a_rule_policy_is_incompatible_with_domain(domain) self._validate_policy_priorities() self._warn_if_rule_based_data_is_unused_or_missing(story_graph=story_graph) @@ -400,14 +400,10 @@ def _warn_if_no_rule_policy_is_contained(self) -> None: docs=DOCS_URL_DEFAULT_ACTIONS, ) - def _raise_if_domain_contains_form_names_but_no_rule_policy_given( + def _warn_if_domain_contains_form_names_but_no_rule_policy_given( self, domain: Domain ) -> None: - """Validates that there exists a rule policy if forms are defined. - - Raises: - `InvalidConfigException` if domain and rule policies do not match - """ + """Validates that there exists a rule policy if forms are defined.""" contains_rule_policy = any( schema_node for schema_node in self._graph_schema.nodes.values() @@ -415,7 +411,7 @@ def _raise_if_domain_contains_form_names_but_no_rule_policy_given( ) if domain.form_names and not contains_rule_policy: - raise InvalidDomain( + rasa.shared.utils.io.raise_warning( "You have defined a form action, but have not added the " f"'{RulePolicy.__name__}' to your policy ensemble. " f"Either remove all forms from your domain or add the " diff --git a/rasa/nlu/classifiers/flow_prompt_template.jinja2 b/rasa/nlu/classifiers/flow_prompt_template.jinja2 new file mode 100644 index 000000000000..df3b484c6f49 --- /dev/null +++ b/rasa/nlu/classifiers/flow_prompt_template.jinja2 @@ -0,0 +1,39 @@ +Your task is to analyze the current conversation context and start new business processes that we call flows and to extract slots to advance active flows. + +These are the flows that can be started, with their description and slots: +{% for flow in available_flows %} +- {{ flow.name }}: {{ flow.description }} (slots: {{ flow.slots }}) +{%- endfor %} + +Here is what happened previously in the conversation: +{{ current_conversation }} + +{% if current_flow != None %} +You are currently in the flow "{{ current_flow }}". +You have just asked the user for the slot "{{ question }}". + +{% if flow_slots|length > 0 %} +Here are the slots of the currently active flow with their names and values: +{% for slot in flow_slots %} +- {{ slot.name }}: {{ slot.value }} +{% endfor %} +{% endif %} +{% else %} +You are currently not in any flow and so there are no active slots. +{% endif %} +If you start a flow, you can already fill that flow's slots with information the user provided for starting the flow. + +The user just said """{{ user_message }}""". + +Based on this information generate a list of actions you want to take. Your job is to start flows and to fill slots where appropriate. Any logic of what happens afterwards is handled by the flow engine. These are your available actions: +* Slot setting, described by "SetSlot(slot_name, slot_value)". An example would be "SetSlot(recipient, Freddy)" +* Starting another flow, described by "StartFlow(flow_name)". An example would be "StartFlow(transfer_money)" +* Cancelling the current flow, describe by "CancelFlow()" + +Write out the actions you want to take for the last user message, one per line. +Do not prematurely fill slots with abstract values. +Only use information provided by the user. +Strictly adhere to the provided action types above for starting flows and setting slots. +Focus on the last message and take it one step at a time. +Use the previous conversation steps only to aid understanding. +The action list: diff --git a/rasa/nlu/classifiers/llm_flow_classifier.py b/rasa/nlu/classifiers/llm_flow_classifier.py new file mode 100644 index 000000000000..e8bd9a04449a --- /dev/null +++ b/rasa/nlu/classifiers/llm_flow_classifier.py @@ -0,0 +1,324 @@ +import importlib.resources +import re +import logging +from typing import Dict, Any, Optional, List, Tuple + +from jinja2 import Template + +from rasa.core.policies.flow_policy import FlowStack +from rasa.engine.graph import GraphComponent, ExecutionContext +from rasa.engine.recipes.default_recipe import DefaultV1Recipe +from rasa.engine.storage.resource import Resource +from rasa.engine.storage.storage import ModelStorage +from rasa.nlu.classifiers.classifier import IntentClassifier +from rasa.nlu.extractors.extractor import EntityExtractorMixin +from rasa.shared.core.flows.flow import FlowsList, QuestionFlowStep +from rasa.shared.core.trackers import DialogueStateTracker +from rasa.shared.nlu.constants import ( + INTENT, + EXTRACTOR, + ENTITY_ATTRIBUTE_TYPE, + ENTITY_ATTRIBUTE_VALUE, + ENTITIES, + TEXT, + ENTITY_ATTRIBUTE_START, + ENTITY_ATTRIBUTE_END, + ENTITY_ATTRIBUTE_TEXT, + ENTITY_ATTRIBUTE_CONFIDENCE, + CORRECTION_INTENT, +) +from rasa.shared.nlu.training_data.message import Message +from rasa.shared.nlu.training_data.training_data import TrainingData +from rasa.utils.llm import ( + DEFAULT_OPENAI_CHAT_MODEL_NAME, + tracker_as_readable_transcript, + generate_text_openai_chat, + sanitize_message_for_prompt, +) + +DEFAULT_FLOW_PROMPT_TEMPLATE = importlib.resources.read_text( + "rasa.nlu.classifiers", "flow_prompt_template.jinja2" +) + +logger = logging.getLogger(__name__) + + +@DefaultV1Recipe.register( + [ + DefaultV1Recipe.ComponentType.INTENT_CLASSIFIER, + DefaultV1Recipe.ComponentType.ENTITY_EXTRACTOR, + ], + is_trainable=True, +) +class LLMFlowClassifier(GraphComponent, IntentClassifier, EntityExtractorMixin): + @staticmethod + def get_default_config() -> Dict[str, Any]: + """The component's default config (see parent class for full docstring).""" + return { + "prompt": DEFAULT_FLOW_PROMPT_TEMPLATE, + "temperature": 0.0, + "model_name": DEFAULT_OPENAI_CHAT_MODEL_NAME, + } + + def __init__( + self, + config: Dict[str, Any], + model_storage: ModelStorage, + resource: Resource, + ) -> None: + self.config = {**self.get_default_config(), **config} + self.model = self.config["model_name"] + self.prompt_template = self.config["prompt"] + self.temperature = self.config["temperature"] + self._model_storage = model_storage + self._resource = resource + + @classmethod + def create( + cls, + config: Dict[str, Any], + model_storage: ModelStorage, + resource: Resource, + execution_context: ExecutionContext, + ) -> "LLMFlowClassifier": + """Creates a new untrained component (see parent class for full docstring).""" + return cls(config, model_storage, resource) + + def persist(self) -> None: + pass + + @classmethod + def load( + cls, + config: Dict[str, Any], + model_storage: ModelStorage, + resource: Resource, + execution_context: ExecutionContext, + **kwargs: Any, + ) -> "LLMFlowClassifier": + """Loads trained component (see parent class for full docstring).""" + return cls(config, model_storage, resource) + + def train(self, training_data: TrainingData) -> Resource: + """Train the intent classifier on a data set.""" + self.persist() + return self._resource + + def process( + self, + messages: List[Message], + tracker: Optional[DialogueStateTracker] = None, + flows: Optional[FlowsList] = None, + ) -> List[Message]: + """Return intent and entities for a message.""" + return [self.process_single(msg, tracker, flows) for msg in messages] + + def process_single( + self, + message: Message, + tracker: Optional[DialogueStateTracker] = None, + flows: Optional[FlowsList] = None, + ) -> Message: + if flows is None or tracker is None: + # cannot do anything if there are no flows or no tracker + return message + flows_without_patterns = FlowsList( + [f for f in flows.underlying_flows if not f.is_handling_pattern()] + ) + flow_prompt = self.render_template(message, tracker, flows_without_patterns) + logger.info(flow_prompt) + action_list = generate_text_openai_chat( + flow_prompt, self.model, self.temperature + ) + logger.info(action_list) + intent_name, entities = self.parse_action_list( + action_list, tracker, flows_without_patterns + ) + intent = {"name": intent_name, "confidence": 0.90} + message.set(INTENT, intent, add_to_output=True) + if len(entities) > 0: + formatted_entities = [ + { + ENTITY_ATTRIBUTE_START: 0, + ENTITY_ATTRIBUTE_END: 0, + ENTITY_ATTRIBUTE_TYPE: e[0], + ENTITY_ATTRIBUTE_VALUE: e[1], + ENTITY_ATTRIBUTE_TEXT: e[1], + ENTITY_ATTRIBUTE_CONFIDENCE: 0.9, + EXTRACTOR: self.__class__.__name__, + } + for e in entities + ] + message.set(ENTITIES, formatted_entities, add_to_output=True) + return message + + @classmethod + def parse_action_list( + cls, actions: str, tracker: DialogueStateTracker, flows: FlowsList + ) -> Tuple[str, List[Tuple[str, str]]]: + """Parse the actions returned by the llm into intent and entities.""" + start_flow_actions = [] + slot_sets = [] + cancel_flow = False + slot_set_re = re.compile( + r"""SetSlot\(([a-zA-Z_][a-zA-Z0-9_-]*?), ?\"?([^)]*?)\"?\)""" + ) + start_flow_re = re.compile(r"StartFlow\(([a-zA-Z_][a-zA-Z0-9_-]*?)\)") + cancel_flow_re = re.compile(r"CancelFlow") + for action in actions.strip().splitlines(): + if m := slot_set_re.search(action): + slot_name = m.group(1).strip() + slot_value = m.group(2).strip() + if slot_value == "undefined": + continue + if slot_name == "flow_name": + start_flow_actions.append(slot_value) + else: + # most likely some hallucinated variable + if "_" in slot_value: + continue + slot_sets.append((slot_name, slot_value)) + elif m := start_flow_re.search(action): + start_flow_actions.append(m.group(1).strip()) + elif cancel_flow_re.search(action): + cancel_flow = True + + # case 1 + # "I want to send some money" + # starting a flow -> intent = flow name + + # case 2 + # "I want to send some money to Joe" + # starting a flow with entities mentioned -> intent = flow name, + # entities only those that are valid for the flow + + # case 3 + # "50$" + # giving information for the current slot -> intent = inform, + # entity only that of the current slot + + # case 4 + # "Sorry I meant, Joe, not John" + # correcting a previous slot from the flow -> intent = correction, + # entity of the previous slot + + # everything else is too complex for now: + # case 5 + # "50$, how much money do I still have btw?" + # giving information about current flow and starting new flow + # right away -> intent = complex + + # TODO: check that we have a valid flow name if any, reprompt if mistake? + # TODO: assign slot sets to current flow, new flow if any, and other + + flow_stack = FlowStack.from_tracker(tracker) + top_flow = flow_stack.top_flow(flows) + top_flow_step = flow_stack.top_flow_step(flows) + if top_flow_step is not None and top_flow is not None: + slots_so_far = top_flow.slots_up_to_step(top_flow_step.id) + other_slots = [ + slot_set for slot_set in slot_sets if slot_set[0] not in slots_so_far + ] + else: + slots_so_far = [] + other_slots = slot_sets + + if len(start_flow_actions) == 0: + if len(slot_sets) == 0 and not cancel_flow: + return "comment", [] + elif len(slot_sets) == 0 and cancel_flow: + return "cancel_flow", [] + elif ( + len(slot_sets) == 1 + and isinstance(top_flow_step, QuestionFlowStep) + and top_flow_step.question == slot_sets[0][0] + ): + return "inform", slot_sets + elif ( + len(slot_sets) == 1 + and isinstance(top_flow_step, QuestionFlowStep) + and top_flow_step.question != slot_sets[0][0] + and slot_sets[0][0] in slots_so_far + ): + return CORRECTION_INTENT, slot_sets + elif ( + len(slot_sets) == 1 + and top_flow_step is not None + and slot_sets[0][0] in other_slots + ): + # trying to set a slot from another flow + return "too_complex", [] + elif len(slot_sets) > 1: + return "too_complex", [] + elif len(start_flow_actions) == 1: + if cancel_flow: + return "too_complex", [] + new_flow_id = start_flow_actions[0] + potential_new_flow = flows.flow_by_id(new_flow_id) + if potential_new_flow is not None: + valid_slot_sets = [ + slot_set + for slot_set in slot_sets + if slot_set[0] in potential_new_flow.slots() + ] + return start_flow_actions[0], valid_slot_sets + else: + return "mistake", [] + # TODO: potentially re-prompt or ask for correction on invalid flow name + elif len(start_flow_actions) > 1: + return "too_complex", [] + + return "too_complex", [] + + @classmethod + def create_template_inputs(cls, flows: FlowsList) -> List[Dict[str, Any]]: + result = [] + for flow in flows.underlying_flows: + result.append( + { + "name": flow.id, + "description": flow.description, + "slots": flow.slots(), + } + ) + return result + + def render_template( + self, message: Message, tracker: DialogueStateTracker, flows: FlowsList + ) -> str: + flow_stack = FlowStack.from_tracker(tracker) + top_flow = flow_stack.top_flow(flows) if flow_stack is not None else None + current_step = ( + flow_stack.top_flow_step(flows) if flow_stack is not None else None + ) + if top_flow is not None: + flow_slots = [ + { + "name": k, + "value": (tracker.get_slot(k) or "undefined"), + "type": tracker.slots[k].type_name, + } + for k in top_flow.slots() + ] + else: + flow_slots = [] + + question = ( + current_step.question + if current_step is not None and isinstance(current_step, QuestionFlowStep) + else None + ) + current_conversation = tracker_as_readable_transcript(tracker) + latest_user_message = sanitize_message_for_prompt(message.get(TEXT)) + current_conversation += f"\nUSER: {latest_user_message}" + + inputs = { + "available_flows": self.create_template_inputs(flows), + "current_conversation": current_conversation, + "flow_slots": flow_slots, + "current_flow": top_flow.id if top_flow is not None else None, + "question": question, + "user_message": latest_user_message, + } + + return Template(self.prompt_template).render(**inputs) diff --git a/rasa/shared/constants.py b/rasa/shared/constants.py index 9e437b9561ec..d139fbb2dc0c 100644 --- a/rasa/shared/constants.py +++ b/rasa/shared/constants.py @@ -107,3 +107,5 @@ RESPONSE_CONDITION = "condition" CHANNEL = "channel" + +OPENAI_API_KEY_ENV_VAR = "OPENAI_API_KEY" diff --git a/rasa/shared/core/constants.py b/rasa/shared/core/constants.py index 5b2aef13eaf9..cc5b8854ab69 100644 --- a/rasa/shared/core/constants.py +++ b/rasa/shared/core/constants.py @@ -37,7 +37,6 @@ RULE_SNIPPET_ACTION_NAME = "..." ACTION_EXTRACT_SLOTS = "action_extract_slots" ACTION_VALIDATE_SLOT_MAPPINGS = "action_validate_slot_mappings" -ACTION_FLOW_CONTINUE_INERRUPTED_NAME = "action_flow_continue_interrupted" DEFAULT_ACTION_NAMES = [ @@ -55,7 +54,6 @@ ACTION_SEND_TEXT, RULE_SNIPPET_ACTION_NAME, ACTION_EXTRACT_SLOTS, - ACTION_FLOW_CONTINUE_INERRUPTED_NAME, ] ACTION_SHOULD_SEND_DOMAIN = "send_domain" @@ -82,6 +80,14 @@ REQUESTED_SLOT = "requested_slot" FLOW_STACK_SLOT = "flow_stack" +PREVIOUS_FLOW_SLOT = "rasa_previous_flow" +CORRECTED_SLOTS_SLOT = "rasa_corrected_slots" + +FLOW_SLOT_NAMES = [ + FLOW_STACK_SLOT, + PREVIOUS_FLOW_SLOT, + CORRECTED_SLOTS_SLOT, +] # slots for knowledge base SLOT_LISTED_ITEMS = "knowledge_base_listed_objects" @@ -92,6 +98,8 @@ DEFAULT_SLOT_NAMES = { REQUESTED_SLOT, FLOW_STACK_SLOT, + PREVIOUS_FLOW_SLOT, + CORRECTED_SLOTS_SLOT, SESSION_START_METADATA_SLOT, SLOT_LISTED_ITEMS, SLOT_LAST_OBJECT, diff --git a/rasa/shared/core/domain.py b/rasa/shared/core/domain.py index 5dbe4e194311..5ed7c019c43c 100644 --- a/rasa/shared/core/domain.py +++ b/rasa/shared/core/domain.py @@ -978,20 +978,21 @@ def _add_flow_slots(self) -> None: Add a slot called `flow_stack_slot` to the list of slots. The value of this slot will be a call stack of the flow ids. """ - from rasa.shared.core.constants import FLOW_STACK_SLOT + from rasa.shared.core.constants import FLOW_SLOT_NAMES slot_names = [slot.name for slot in self.slots] - if FLOW_STACK_SLOT not in slot_names: - self.slots.append( - AnySlot(FLOW_STACK_SLOT, mappings=[], influence_conversation=False) - ) - else: - # TODO: figure out what to do here. - logger.warning( - f"Slot {FLOW_STACK_SLOT} is reserved for the next step slot, " - f"but it already exists. 🤔" - ) + for flow_slot in FLOW_SLOT_NAMES: + if flow_slot not in slot_names: + self.slots.append( + AnySlot(flow_slot, mappings=[], influence_conversation=False) + ) + else: + # TODO: figure out what to do here. + logger.warning( + f"Slot {flow_slot} is reserved for Rasa internal usage, " + f"but it already exists. 🤔" + ) def _add_requested_slot(self) -> None: """Add a slot called `requested_slot` to the list of slots. diff --git a/rasa/shared/core/events.py b/rasa/shared/core/events.py index e95e109340c9..0602efbad81d 100644 --- a/rasa/shared/core/events.py +++ b/rasa/shared/core/events.py @@ -1740,6 +1740,10 @@ def __str__(self) -> Text: """Returns text representation of event.""" return f"Loop({self.name})" + def __repr__(self) -> Text: + """Returns event as string for debugging.""" + return f"ActiveLoop({self.name}, {self.timestamp}, {self.metadata})" + def __hash__(self) -> int: """Returns unique hash for event.""" return hash(self.name) diff --git a/rasa/shared/core/flows/flow.py b/rasa/shared/core/flows/flow.py index 9593140bd10d..3afeb9fef59d 100644 --- a/rasa/shared/core/flows/flow.py +++ b/rasa/shared/core/flows/flow.py @@ -7,6 +7,8 @@ import rasa.shared.utils.io +HANDLING_PATTERN_PREFIX = "pattern_" + class UnreachableFlowStepException(RasaException): """Raised when a flow step is unreachable.""" @@ -150,7 +152,7 @@ def step_by_id(self, step_id: Text, flow_id: Text) -> FlowStep: if not flow: raise UnresolvedFlowException(flow_id) - step = flow.step_for_id(step_id) + step = flow.step_by_id(step_id) if not step: raise UnresolvedFlowStepIdException(step_id, flow, referenced_from=None) @@ -240,7 +242,7 @@ def _reachable_steps( reached_steps.add(step.id) for link in step.next.links: reached_steps = _reachable_steps( - self.step_for_id(link.target), reached_steps + self.step_by_id(link.target), reached_steps ) return reached_steps @@ -250,7 +252,7 @@ def _reachable_steps( if step.id not in reached_steps: raise UnreachableFlowStepException(step, self) - def step_for_id(self, step_id: Optional[Text]) -> Optional[FlowStep]: + def step_by_id(self, step_id: Optional[Text]) -> Optional[FlowStep]: """Returns the step with the given id.""" if not step_id: return None @@ -274,6 +276,73 @@ def first_step_in_flow(self) -> Optional[FlowStep]: return None return self.steps[0] + @staticmethod + def slots_from_steps(steps: List[FlowStep]) -> List[str]: + """Return the names of the slots used in the given steps.""" + result = [] + for step in steps: + if isinstance(step, QuestionFlowStep) and step.question not in result: + result.append(step.question) + return result + + def slots(self) -> List[str]: + """Return the names of the slots used in the flow.""" + return self.slots_from_steps(self.steps) + + def slots_up_to_step(self, id: str) -> List[str]: + """Returns the names of the slots used in this flow up to a step.""" + step_ids = [step.id for step in self.steps] + try: + idx = step_ids.index(id) + except ValueError: + idx = -1 + steps = self.steps[: idx + 1] + return self.slots_from_steps(steps) + + def previously_asked_questions(self, step_id: Text) -> List[QuestionFlowStep]: + """Returns the questions asked before the given step. + + Questions are returned roughly in reverse order, i.e. the first + question in the list is the one asked last. But due to circles + in the flow the order is not guaranteed to be exactly reverse. + """ + + def _previously_asked_questions( + current_step_id: Text, visited_steps: Set[Text] + ) -> List[QuestionFlowStep]: + """Returns the questions asked before the given step. + + Keeps track of the steps that have been visited to avoid circles. + """ + current_step = self.step_by_id(current_step_id) + + questions: List[QuestionFlowStep] = [] + + if not current_step: + return questions + + if isinstance(current_step, QuestionFlowStep): + questions.append(current_step) + + visited_steps.add(current_step.id) + + for previous_step in self.steps: + for next_link in previous_step.next.links: + if next_link.target != current_step_id: + continue + if previous_step.id in visited_steps: + continue + questions.extend( + _previously_asked_questions(previous_step.id, visited_steps) + ) + return questions + + return _previously_asked_questions(step_id, set()) + + def is_handling_pattern(self) -> bool: + """Returns whether the flow is handling a pattern.""" + return self.id.startswith(HANDLING_PATTERN_PREFIX) + def step_from_json(flow_step_config: Dict[Text, Any]) -> FlowStep: """Used to read flow steps from parsed YAML. @@ -713,8 +782,10 @@ class SetSlotsFlowStep(FlowStep): @classmethod def from_json(cls, flow_step_config: Dict[Text, Any]) -> SetSlotsFlowStep: """Used to read flow steps from parsed YAML. + Args: flow_step_config: The parsed YAML as a dictionary. + Returns: The parsed flow step. """ @@ -731,6 +802,7 @@ def from_json(cls, flow_step_config: Dict[Text, Any]) -> SetSlotsFlowStep: def as_json(self) -> Dict[Text, Any]: """Returns the flow step as a dictionary. + Returns: The flow step as a dictionary. """ diff --git a/rasa/shared/core/trackers.py b/rasa/shared/core/trackers.py index 8b3c6b8ffbb7..26ef5b80d4d6 100644 --- a/rasa/shared/core/trackers.py +++ b/rasa/shared/core/trackers.py @@ -387,6 +387,32 @@ def get_slot(self, key: Text) -> Optional[Any]: logger.info(f"Tried to access non existent slot '{key}'") return None + def has_bot_message_after_latest_user_message(self) -> bool: + """Checks if there is a bot message after the most recent user message. + + Returns: + `True` if there is an action after the most recent user message. + """ + for event in reversed(self.applied_events()): + if isinstance(event, BotUttered): + return True + elif isinstance(event, UserUttered): + return False + return False + + def has_action_after_latest_user_message(self) -> bool: + """Check if there is an action after the most recent user message. + + Returns: + `True` if there is an action after the most recent user message. + """ + for event in reversed(self.applied_events()): + if isinstance(event, ActionExecuted): + return True + elif isinstance(event, UserUttered): + return False + return False + def get_latest_entity_values( self, entity_type: Text, diff --git a/rasa/shared/importers/importer.py b/rasa/shared/importers/importer.py index 0938f89f61c1..6b06233de2cb 100644 --- a/rasa/shared/importers/importer.py +++ b/rasa/shared/importers/importer.py @@ -3,6 +3,8 @@ from typing import Text, Optional, List, Dict, Set, Any, Tuple, Type, Union, cast import logging +import pkg_resources + import rasa.shared.constants from rasa.shared.core.flows.flow import FlowsList, QuestionFlowStep import rasa.shared.utils.common @@ -10,7 +12,6 @@ import rasa.shared.utils.io from rasa.shared.core.domain import ( KEY_FORMS, - KEY_RESPONSES_TEXT, Domain, KEY_E2E_ACTIONS, KEY_INTENTS, @@ -379,39 +380,69 @@ def get_nlu_data(self, language: Optional[Text] = "en") -> TrainingData: return self._importer.get_nlu_data(language) +DEFAULT_FLOW_FILE_NAME = "default_flows.yml" + + +def load_default_flows() -> FlowsList: + """Loads the default flows from the file system.""" + from rasa.shared.core.flows.yaml_flows_io import YAMLFlowsReader + + default_flows_file = pkg_resources.resource_filename( + "rasa.core.policies", DEFAULT_FLOW_FILE_NAME + ) + + return YAMLFlowsReader.read_from_file(default_flows_file) + + +def load_default_flows_domain() -> Domain: + """Loads the default flows from the file system.""" + default_flows_file = pkg_resources.resource_filename( + "rasa.core.policies", DEFAULT_FLOW_FILE_NAME + ) + + return Domain.from_path(default_flows_file) + + class FlowSyncImporter(PassThroughImporter): """Importer that syncs `flows` between Domain and flow training data.""" + @rasa.shared.utils.common.cached_method + def get_flows(self) -> FlowsList: + flows = self._importer.get_flows() + + if flows.is_empty(): + # if there are no flows, we don't need to add the default flows either + return flows + + default_flows = load_default_flows() + + user_flow_ids = [flow.id for flow in flows.underlying_flows] + missing_default_flows = [ + default_flow + for default_flow in default_flows.underlying_flows + if default_flow.id not in user_flow_ids + ] + + return flows.merge(FlowsList(missing_default_flows)) + @rasa.shared.utils.common.cached_method def get_domain(self) -> Domain: """Merge existing domain with properties of flows.""" - from rasa.core.actions.flows import UTTER_FLOW_CONTINUE_INTERRUPTED - from rasa.shared.core.training_data.story_reader.yaml_story_reader import ( - KEY_METADATA, - ) - domain = self._importer.get_domain() flows = self.get_flows() + if flows.is_empty(): + # if there are no flows, we don't need to add the default flows either + return domain + + default_flows_domain = load_default_flows_domain() + flow_names = [ rasa.shared.constants.FLOW_PREFIX + flow.id for flow in flows.underlying_flows ] - if UTTER_FLOW_CONTINUE_INTERRUPTED not in domain.responses: - text = "Let's continue with the previous topic off {flow_name}." - responses = { - UTTER_FLOW_CONTINUE_INTERRUPTED: [ - { - KEY_RESPONSES_TEXT: text, - KEY_METADATA: {"allow_variation": True}, - } - ] - } - else: - responses = {} - all_question_steps = [ step for flow in flows.underlying_flows @@ -425,11 +456,8 @@ def get_domain(self) -> Domain: rasa.shared.constants.REQUIRED_SLOTS_KEY: [step.question] } - return domain.merge( - Domain.from_dict( - {KEY_ACTIONS: flow_names, KEY_FORMS: forms, KEY_RESPONSES: responses} - ) - ) + flow_domain = Domain.from_dict({KEY_ACTIONS: flow_names, KEY_FORMS: forms}) + return domain.merge(flow_domain.merge(default_flows_domain)) class ResponsesSyncImporter(PassThroughImporter): diff --git a/rasa/shared/nlu/constants.py b/rasa/shared/nlu/constants.py index 1f0b9b865c36..6d83f0aba32a 100644 --- a/rasa/shared/nlu/constants.py +++ b/rasa/shared/nlu/constants.py @@ -40,3 +40,5 @@ SPLIT_ENTITIES_BY_COMMA = "split_entities_by_comma" SPLIT_ENTITIES_BY_COMMA_DEFAULT_VALUE = True SINGLE_ENTITY_ALLOWED_INTERLEAVING_CHARSET = {".", ",", " ", ";"} + +CORRECTION_INTENT = "correction" diff --git a/rasa/utils/llm.py b/rasa/utils/llm.py new file mode 100644 index 000000000000..f9dd42a81c1e --- /dev/null +++ b/rasa/utils/llm.py @@ -0,0 +1,93 @@ +from typing import Optional +import openai +import logging + +from rasa.shared.core.events import BotUttered, UserUttered + +from rasa.shared.core.trackers import DialogueStateTracker + +logger = logging.getLogger(__name__) + +USER = "USER" + +AI = "AI" + +DEFAULT_OPENAI_GENERATE_MODEL_NAME = "text-davinci-003" + +DEFAULT_OPENAI_CHAT_MODEL_NAME = "gpt-3.5-turbo" + +DEFAULT_OPENAI_TEMPERATURE = 0.7 + + +def generate_text_openai_chat( + prompt: str, + model: str = DEFAULT_OPENAI_CHAT_MODEL_NAME, + temperature: float = DEFAULT_OPENAI_TEMPERATURE, +) -> Optional[str]: + """Generates text using the OpenAI chat API. + + Args: + prompt: the prompt to send to the API + model: the model to use for generation + temperature: the temperature to use for generation + + Returns: + The generated text. + """ + # TODO: exception handling + chat_completion = openai.ChatCompletion.create( # type: ignore[no-untyped-call] + model=model, + messages=[{"role": "user", "content": prompt}], + temperature=temperature, + ) + return chat_completion.choices[0].message.content + + +def tracker_as_readable_transcript( + tracker: DialogueStateTracker, human_prefix: str = USER, ai_prefix: str = AI +) -> str: + """Creates a readable dialogue from a tracker. + + Args: + tracker: the tracker to convert + human_prefix: the prefix to use for human utterances + ai_prefix: the prefix to use for ai utterances + + Example: + >>> tracker = Tracker( + ... sender_id="test", + ... slots=[], + ... events=[ + ... UserUttered("hello"), + ... BotUttered("hi"), + ... ], + ... ) + >>> tracker_as_readable_transcript(tracker) + USER: hello + AI: hi + + Returns: + A string representing the transcript of the tracker + """ + transcript = [] + + for event in tracker.events: + if isinstance(event, UserUttered): + transcript.append( + f"{human_prefix}: {sanitize_message_for_prompt(event.text)}" + ) + if isinstance(event, BotUttered): + transcript.append(f"{ai_prefix}: {sanitize_message_for_prompt(event.text)}") + return "\n".join(transcript) + + +def sanitize_message_for_prompt(text: Optional[str]) -> str: + """Removes new lines from a string. + + Args: + text: the text to sanitize + + Returns: + A string with new lines removed. + """ + return text.replace("\n", " ") if text else "" diff --git a/tests/nlu/test_train.py b/tests/nlu/test_train.py index 72211b89341d..f44697ae7e2a 100644 --- a/tests/nlu/test_train.py +++ b/tests/nlu/test_train.py @@ -90,6 +90,7 @@ def pipelines_for_tests() -> List[Tuple[Text, List[Dict[Text, Any]]]]: ), ), ("fallback", as_pipeline("KeywordIntentClassifier", "FallbackClassifier")), + ("dm2", as_pipeline("LLMFlowClassifier")), ]