From 2fd0b71d97295b5350b993615fa543178a486fdf Mon Sep 17 00:00:00 2001 From: Leon van Bokhorst Date: Sat, 16 Nov 2024 11:21:53 +0100 Subject: [PATCH 1/2] As a CASA researcher, I want to store conversation metrics in CSV format for easy analysis while making AI conversations more natural. Fixes #57 --- casa/01_politeness.py | 220 ++++++++++++++++++++++++++++++++---------- 1 file changed, 170 insertions(+), 50 deletions(-) diff --git a/casa/01_politeness.py b/casa/01_politeness.py index d85091b..73ee191 100644 --- a/casa/01_politeness.py +++ b/casa/01_politeness.py @@ -2,21 +2,33 @@ import json import uuid from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, List import ollama from dataclasses import dataclass -from tqdm import tqdm # For progress bars +from tqdm import tqdm +import pandas as pd +import os +from pathlib import Path + +MODEL_NAME = "llama3.2:latest" + @dataclass class ExperimentParams: """Parameters for Monte Carlo experiments.""" + temperature: float top_p: float model_name: str num_iterations: int = 100 + metrics_file: str = "casa_metrics_{experiment_id}_{datetime}.csv" + results_dir: str = "casa/results" + class LLMAgent: - def __init__(self, name: str, role: str, personality: str, params: ExperimentParams): + def __init__( + self, name: str, role: str, personality: str, params: ExperimentParams + ): self.name = name self.role = role self.personality = personality @@ -40,9 +52,11 @@ def _format_history(self) -> str: """Format the conversation history for context.""" if not self.conversation_history: return "No previous conversation." - + history = [] - for entry in self.conversation_history[-3:]: # Only include last 3 turns for context + for entry in self.conversation_history[ + -3: + ]: # Only include last 3 turns for context history.append(f"Input: {entry['input']}") history.append(f"Response: {entry['response']}\n") return "\n".join(history) @@ -50,6 +64,15 @@ def _format_history(self) -> str: async def send_message(self, message: str) -> Dict: """Send message to Ollama and get response with metadata.""" start_time = datetime.now() + + # Count social markers in the message + social_markers = self._extract_social_markers(message) + contains_question = "?" in message + references_previous = any( + ref in message.lower() + for ref in ["earlier", "previous", "before", "you said"] + ) + try: response = ollama.generate( model=self.params.model_name, @@ -57,9 +80,9 @@ async def send_message(self, message: str) -> Dict: options={ "temperature": self.params.temperature, "top_p": self.params.top_p, - } + }, ) - + response_text = response["response"] success = True error = None @@ -80,53 +103,142 @@ async def send_message(self, message: str) -> Dict: "parameters": { "temperature": self.params.temperature, "top_p": self.params.top_p, - "model": self.params.model_name - } + "model": self.params.model_name, + }, + "prompt_length": len(message), + "response_length": len(response_text), + "ratio": len(response_text) / len(message) if len(message) > 0 else 0, + "contains_question": contains_question, + "references_previous": references_previous, + "social_markers": social_markers, } - + self.conversation_history.append(response_data) return response_data + def _extract_social_markers(self, text: str) -> List[str]: + """Extract social markers like emotions and gestures from text.""" + markers = [] + # Simple regex or keyword matching could be expanded + if "*" in text: # Check for emotes like *smiles* + markers.extend(word.strip("*") for word in text.split("*")[1::2]) + return markers + + class CASAExperiment: def __init__(self, params: ExperimentParams): self.experiment_id = str(uuid.uuid4()) self.params = params + # Create results directory if it doesn't exist + self.results_dir = Path(params.results_dir) + self.results_dir.mkdir(parents=True, exist_ok=True) + + # Update metrics file path to be in results directory + self.metrics_file = self.results_dir / params.metrics_file.format( + experiment_id=self.experiment_id, + datetime=datetime.now().strftime("%Y%m%d_%H%M%S"), + ) + # Initialize all required agents self.tutor = LLMAgent( "Tutor_AI", "academic tutor", "Professional, supportive, and slightly formal.", - params + params, ) self.student = LLMAgent( - "Student_AI", - "student", - "Curious, engaged, and respectful.", - params + "Student_AI", "student", "Curious, engaged, and respectful.", params ) self.evaluator = LLMAgent( "Evaluator_AI", "social interaction evaluator", "Objective, analytical, and detail-oriented.", - params + params, ) + # Initialize DataFrame with explicit dtypes + self.metrics_df = pd.DataFrame( + columns=[ + "timestamp", + "experiment_id", + "turn_number", + "speaker_id", + "prompt_length", + "response_length", + "ratio", + "contains_question", + "references_previous", + "social_markers", + "response_time", + "temperature", + "top_p", + ] + ).astype( + { + "timestamp": str, + "experiment_id": str, + "turn_number": int, + "speaker_id": str, + "prompt_length": int, + "response_length": int, + "ratio": float, + "contains_question": bool, + "references_previous": bool, + "social_markers": str, + "response_time": float, + "temperature": float, + "top_p": float, + } + ) + + def _log_metrics(self, turn_data: Dict, turn_number: int) -> None: + """Log conversation metrics to DataFrame and save to CSV.""" + new_row = pd.DataFrame( + [ + { + "timestamp": turn_data["timestamp"], + "experiment_id": self.experiment_id, + "turn_number": turn_number, + "speaker_id": turn_data["agent_id"], + "prompt_length": turn_data["prompt_length"], + "response_length": turn_data["response_length"], + "ratio": turn_data["ratio"], + "contains_question": turn_data["contains_question"], + "references_previous": turn_data["references_previous"], + "social_markers": json.dumps(turn_data["social_markers"]), + "response_time": turn_data["response_time"], + "temperature": turn_data["parameters"]["temperature"], + "top_p": turn_data["parameters"]["top_p"], + } + ] + ) + + self.metrics_df = pd.concat([self.metrics_df, new_row], ignore_index=True) + # Save after each update to prevent data loss + self.metrics_df.to_csv(self.metrics_file, index=False) + async def run_direct_evaluation(self, topic: str) -> Dict: """Run a direct interaction between tutor and student.""" # Initial question from student about the topic student_query = f"Could you please explain {topic} to me?" student_response = await self.student.send_message(student_query) - + # Tutor's explanation tutor_response = await self.tutor.send_message(student_response["response"]) - + # Student's follow-up question follow_up = await self.student.send_message(tutor_response["response"]) - + # Tutor's final response final_response = await self.tutor.send_message(follow_up["response"]) - + + # Add metrics logging for each turn + for turn_number, response in enumerate( + [student_response, tutor_response, follow_up, final_response], 1 + ): + self._log_metrics(response, turn_number) + return { "interaction_type": "direct", "topic": topic, @@ -134,34 +246,36 @@ async def run_direct_evaluation(self, topic: str) -> Dict: student_response, tutor_response, follow_up, - final_response + final_response, ], "timestamp": datetime.now().isoformat(), - "experiment_id": self.experiment_id + "experiment_id": self.experiment_id, } async def run_indirect_evaluation(self, topic: str) -> Dict: """Run an indirect evaluation where evaluator assesses the interaction.""" # First, run a direct interaction direct_result = await self.run_direct_evaluation(topic) - + # Format the conversation for evaluation - conversation_text = self._format_conversation_for_evaluation(direct_result["conversation"]) - + conversation_text = self._format_conversation_for_evaluation( + direct_result["conversation"] + ) + # Get evaluator's assessment evaluation_prompt = ( f"Please evaluate the following conversation about {topic} " f"in terms of politeness, clarity, and effectiveness:\n\n{conversation_text}" ) evaluation = await self.evaluator.send_message(evaluation_prompt) - + return { "interaction_type": "indirect", "topic": topic, "base_conversation": direct_result["conversation"], "evaluation": evaluation, "timestamp": datetime.now().isoformat(), - "experiment_id": self.experiment_id + "experiment_id": self.experiment_id, } def _format_conversation_for_evaluation(self, conversation: List[Dict]) -> str: @@ -175,58 +289,64 @@ def _format_conversation_for_evaluation(self, conversation: List[Dict]) -> str: async def run_monte_carlo(self, topics: List[str]) -> List[Dict]: """Run Monte Carlo simulation across multiple topics and iterations.""" all_results = [] - + for topic in tqdm(topics, desc="Topics"): - for iteration in tqdm(range(self.params.num_iterations), desc=f"Iterations for {topic}"): + for iteration in tqdm( + range(self.params.num_iterations), desc=f"Iterations for {topic}" + ): try: direct_result = await self.run_direct_evaluation(topic) indirect_result = await self.run_indirect_evaluation(topic) - + # Add Monte Carlo metadata for result in [direct_result, indirect_result]: - result.update({ - "monte_carlo_metadata": { - "iteration": iteration, - "parameters": self.params.__dict__ + result.update( + { + "monte_carlo_metadata": { + "iteration": iteration, + "parameters": self.params.__dict__, + } } - }) - + ) + all_results.extend([direct_result, indirect_result]) - - # Save intermediate results periodically - #if iteration % 10 == 0: - self._save_results(all_results, topic, "intermediate") - + + # Remove intermediate saves + # self._save_results(all_results, topic, "intermediate") + except Exception as e: print(f"Error in iteration {iteration} for topic {topic}: {str(e)}") continue - + return all_results def _save_results(self, results: List[Dict], topic: str, suffix: str = "") -> None: - """Save results to JSON file.""" + """Save results to JSON file in results directory.""" filename = ( - f'monte_carlo_results_{self.experiment_id}_{topic}' + f"monte_carlo_results_{self.experiment_id}_{topic}" f'_{datetime.now().strftime("%Y%m%d_%H%M%S")}_{suffix}.json' ) - with open(filename, "w") as f: + filepath = self.results_dir / filename + with open(filepath, "w") as f: json.dump(results, f, indent=2) + async def main(): """Run Monte Carlo CASA experiments.""" # Example parameter configurations params = ExperimentParams( temperature=0.7, top_p=0.9, - model_name="llama3.2:latest", - num_iterations=10 + model_name=MODEL_NAME, + num_iterations=5, ) - + experiment = CASAExperiment(params) - topics = ["photosynthesis", "gravity", "climate change"] - + topics = ["Unicorns"] + results = await experiment.run_monte_carlo(topics) experiment._save_results(results, "all_topics", "final") + if __name__ == "__main__": asyncio.run(main()) From 43baf26a8920f99e44cad503fc1a5c19b30b31ba Mon Sep 17 00:00:00 2001 From: Leon van Bokhorst Date: Sat, 16 Nov 2024 11:31:16 +0100 Subject: [PATCH 2/2] refactor(casa): add response length controls and conversation metrics This commit refactors the `LLMAgent` class in `casa/01_politeness.py` to include response length controls and conversation metrics. The `response_limits` dictionary is added to the `ExperimentParams` class to define limits for response length, including maximum tokens, minimum tokens, maximum ratio of response to prompt length, and a recovery threshold. The `_apply_length_controls` method is implemented to truncate responses that exceed the maximum ratio, and the `_needs_recovery` method is added to check if response length needs rebalancing. Additionally, the `_calculate_turn_balance` method is introduced to calculate the turn balance score based on recent conversation history. The changes aim to improve the naturalness of AI conversations by controlling response length and monitoring conversation metrics. fixes As a CASA researcher, I want to enforce natural conversation patterns through length controls and response balancing to achieve more human-like interactions. fixes #58 --- casa/01_politeness.py | 57 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/casa/01_politeness.py b/casa/01_politeness.py index 73ee191..9deeaf9 100644 --- a/casa/01_politeness.py +++ b/casa/01_politeness.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import Dict, List import ollama -from dataclasses import dataclass +from dataclasses import dataclass, field from tqdm import tqdm import pandas as pd import os @@ -23,6 +23,12 @@ class ExperimentParams: num_iterations: int = 100 metrics_file: str = "casa_metrics_{experiment_id}_{datetime}.csv" results_dir: str = "casa/results" + response_limits: Dict = field(default_factory=lambda: { + "max_tokens": 150, # based on typical human chat + "min_tokens": 25, # avoid one-word responses + "max_ratio": 2.0, # response:prompt length + "recovery_threshold": 5.0 # trigger length rebalancing + }) class LLMAgent: @@ -35,6 +41,11 @@ def __init__( self.params = params self.conversation_history: List[Dict] = [] self.agent_id = str(uuid.uuid4()) + self.flow_metrics = { + "turn_balance": True, + "sustained_ratio": 1.0, + "recovery_attempts": 0 + } def _construct_prompt(self, message: str) -> str: """Construct a prompt that includes agent context and personality.""" @@ -80,10 +91,12 @@ async def send_message(self, message: str) -> Dict: options={ "temperature": self.params.temperature, "top_p": self.params.top_p, + "num_tokens": self.params.response_limits["max_tokens"], }, ) response_text = response["response"] + response_text = self._apply_length_controls(message, response_text) success = True error = None except Exception as e: @@ -92,6 +105,10 @@ async def send_message(self, message: str) -> Dict: error = str(e) end_time = datetime.now() + + # Calculate ratio first + ratio = len(response_text) / len(message) if len(message) > 0 else 0 + response_data = { "timestamp": start_time.isoformat(), "input": message, @@ -107,10 +124,13 @@ async def send_message(self, message: str) -> Dict: }, "prompt_length": len(message), "response_length": len(response_text), - "ratio": len(response_text) / len(message) if len(message) > 0 else 0, + "ratio": ratio, "contains_question": contains_question, "references_previous": references_previous, "social_markers": social_markers, + "recovery_triggered": self._needs_recovery(ratio), + "recovery_successful": False, # Will be updated in next turn + "turn_balance_score": self._calculate_turn_balance(), } self.conversation_history.append(response_data) @@ -124,6 +144,30 @@ def _extract_social_markers(self, text: str) -> List[str]: markers.extend(word.strip("*") for word in text.split("*")[1::2]) return markers + def _apply_length_controls(self, message: str, response: str) -> str: + """Apply length controls to maintain natural conversation patterns.""" + current_ratio = len(response) / len(message) if len(message) > 0 else 0 + + if current_ratio > self.params.response_limits["max_ratio"]: + # Truncate response to maintain ratio + max_length = int(len(message) * self.params.response_limits["max_ratio"]) + response = response[:max_length].rsplit('.', 1)[0] + '.' + self.flow_metrics["recovery_attempts"] += 1 + + return response + + def _needs_recovery(self, ratio: float) -> bool: + """Check if response length needs rebalancing.""" + return ratio > self.params.response_limits["recovery_threshold"] + + def _calculate_turn_balance(self) -> float: + """Calculate turn balance score based on recent conversation history.""" + if len(self.conversation_history) < 2: + return 1.0 + + recent_ratios = [entry["ratio"] for entry in self.conversation_history[-3:]] + return sum(abs(1 - ratio) for ratio in recent_ratios) / len(recent_ratios) + class CASAExperiment: def __init__(self, params: ExperimentParams): @@ -173,6 +217,9 @@ def __init__(self, params: ExperimentParams): "response_time", "temperature", "top_p", + "recovery_triggered", + "recovery_successful", + "turn_balance_score", ] ).astype( { @@ -189,6 +236,9 @@ def __init__(self, params: ExperimentParams): "response_time": float, "temperature": float, "top_p": float, + "recovery_triggered": bool, + "recovery_successful": bool, + "turn_balance_score": float, } ) @@ -210,6 +260,9 @@ def _log_metrics(self, turn_data: Dict, turn_number: int) -> None: "response_time": turn_data["response_time"], "temperature": turn_data["parameters"]["temperature"], "top_p": turn_data["parameters"]["top_p"], + "recovery_triggered": turn_data["recovery_triggered"], + "recovery_successful": turn_data["recovery_successful"], + "turn_balance_score": turn_data["turn_balance_score"], } ] )