diff --git a/edsl/agents/Invigilator.py b/edsl/agents/Invigilator.py index 743721d0..89aa449f 100644 --- a/edsl/agents/Invigilator.py +++ b/edsl/agents/Invigilator.py @@ -2,14 +2,13 @@ from typing import Dict, Any, Optional -from edsl.exceptions import AgentRespondedWithBadJSONError from edsl.prompts.Prompt import Prompt from edsl.utilities.decorators import sync_wrapper, jupyter_nb_handler from edsl.prompts.registry import get_classes as prompt_lookup from edsl.exceptions.questions import QuestionAnswerValidationError -from edsl.agents.PromptConstructionMixin import PromptConstructorMixin from edsl.agents.InvigilatorBase import InvigilatorBase from edsl.data_transfer_models import AgentResponseDict, EDSLResultObjectInput +from edsl.agents.PromptConstructor import PromptConstructor class NotApplicable(str): @@ -19,9 +18,13 @@ def __new__(cls): return instance -class InvigilatorAI(PromptConstructorMixin, InvigilatorBase): +class InvigilatorAI(InvigilatorBase): """An invigilator that uses an AI model to answer questions.""" + def get_prompts(self) -> Dict[str, Prompt]: + """Return the prompts used.""" + return self.prompt_constructor.get_prompts() + async def async_answer_question(self) -> AgentResponseDict: """Answer a question using the AI model. diff --git a/edsl/agents/InvigilatorBase.py b/edsl/agents/InvigilatorBase.py index e85610fc..ba056dec 100644 --- a/edsl/agents/InvigilatorBase.py +++ b/edsl/agents/InvigilatorBase.py @@ -14,6 +14,7 @@ from edsl.language_models.LanguageModel import LanguageModel from edsl.data_transfer_models import EDSLResultObjectInput +from edsl.agents.PromptConstructor import PromptConstructor class InvigilatorBase(ABC): @@ -27,16 +28,7 @@ class InvigilatorBase(ABC): This returns an empty prompt because there is no memory the agent needs to have at q0. - >>> InvigilatorBase.example().create_memory_prompt("q0") - Prompt(text=\"""\""") - >>> i = InvigilatorBase.example() - >>> i.current_answers = {"q0": "Prior answer"} - >>> i.memory_plan.add_single_memory("q1", "q0") - >>> i.create_memory_prompt("q1") - Prompt(text=\""" - Before the question you are now answering, you already answered the following question(s): - ... """ def __init__( @@ -72,6 +64,11 @@ def __init__( None # placeholder for the raw response from the model ) + @property + def prompt_constructor(self) -> PromptConstructor: + """Return the prompt constructor.""" + return PromptConstructor(self) + def to_dict(self): attributes = [ "agent", @@ -207,22 +204,6 @@ async def main(): return main() - def create_memory_prompt(self, question_name: str) -> Prompt: - """Create a memory for the agent. - - The returns a memory prompt for the agent. - - >>> i = InvigilatorBase.example() - >>> i.current_answers = {"q0": "Prior answer"} - >>> i.memory_plan.add_single_memory("q1", "q0") - >>> p = i.create_memory_prompt("q1") - >>> p.text.strip().replace("\\n", " ").replace("\\t", " ") - 'Before the question you are now answering, you already answered the following question(s): Question: Do you like school? Answer: Prior answer' - """ - return self.memory_plan.get_memory_prompt_fragment( - question_name, self.current_answers - ) - @classmethod def example( cls, throw_an_exception=False, question=None, scenario=None, survey=None @@ -285,9 +266,9 @@ def example( memory_plan = MemoryPlan(survey=survey) current_answers = None - from edsl.agents.PromptConstructionMixin import PromptConstructorMixin + from edsl.agents.PromptConstructor import PromptConstructor - class InvigilatorExample(PromptConstructorMixin, InvigilatorBase): + class InvigilatorExample(InvigilatorBase): """An example invigilator.""" async def async_answer_question(self): diff --git a/edsl/agents/PromptConstructionMixin.py b/edsl/agents/PromptConstructor.py similarity index 91% rename from edsl/agents/PromptConstructionMixin.py rename to edsl/agents/PromptConstructor.py index b57ada15..0af197b0 100644 --- a/edsl/agents/PromptConstructionMixin.py +++ b/edsl/agents/PromptConstructor.py @@ -137,7 +137,7 @@ def get_prompts(self, **kwargs) -> Dict[str, Prompt]: } -class PromptConstructorMixin: +class PromptConstructor: """Mixin for constructing prompts for the LLM call. The pieces of a prompt are: @@ -149,14 +149,25 @@ class PromptConstructorMixin: This is mixed into the Invigilator class. """ - prompt_plan = PromptPlan() + def __init__(self, invigilator): + self.invigilator = invigilator + self.agent = invigilator.agent + self.question = invigilator.question + self.scenario = invigilator.scenario + self.survey = invigilator.survey + self.model = invigilator.model + self.current_answers = invigilator.current_answers + self.memory_plan = invigilator.memory_plan + self.prompt_plan = PromptPlan() # Assuming PromptPlan is defined elsewhere + + # prompt_plan = PromptPlan() @property def agent_instructions_prompt(self) -> Prompt: """ >>> from edsl.agents.InvigilatorBase import InvigilatorBase >>> i = InvigilatorBase.example() - >>> i.agent_instructions_prompt + >>> i.prompt_constructor.agent_instructions_prompt Prompt(text=\"""You are answering questions as if you were a human. Do not break character.\""") """ if not hasattr(self, "_agent_instructions_prompt"): @@ -176,7 +187,7 @@ def agent_persona_prompt(self) -> Prompt: """ >>> from edsl.agents.InvigilatorBase import InvigilatorBase >>> i = InvigilatorBase.example() - >>> i.agent_persona_prompt + >>> i.prompt_constructor.agent_persona_prompt Prompt(text=\"""You are an agent with the following persona: {'age': 22, 'hair': 'brown', 'height': 5.5}\""") @@ -231,7 +242,7 @@ def question_instructions_prompt(self) -> Prompt: """ >>> from edsl.agents.InvigilatorBase import InvigilatorBase >>> i = InvigilatorBase.example() - >>> i.question_instructions_prompt + >>> i.prompt_constructor.question_instructions_prompt Prompt(text=\"""... ... """ @@ -329,6 +340,23 @@ def prior_question_memory_prompt(self) -> Prompt: self._prior_question_memory_prompt = memory_prompt return self._prior_question_memory_prompt + def create_memory_prompt(self, question_name: str) -> Prompt: + """Create a memory for the agent. + + The returns a memory prompt for the agent. + + >>> from edsl.agents.InvigilatorBase import InvigilatorBase + >>> i = InvigilatorBase.example() + >>> i.current_answers = {"q0": "Prior answer"} + >>> i.memory_plan.add_single_memory("q1", "q0") + >>> p = i.prompt_constructor.create_memory_prompt("q1") + >>> p.text.strip().replace("\\n", " ").replace("\\t", " ") + 'Before the question you are now answering, you already answered the following question(s): Question: Do you like school? Answer: Prior answer' + """ + return self.memory_plan.get_memory_prompt_fragment( + question_name, self.current_answers + ) + def construct_system_prompt(self) -> Prompt: """Construct the system prompt for the LLM call.""" import warnings diff --git a/edsl/inference_services/TestService.py b/edsl/inference_services/TestService.py index 7430dfe8..8fe54db3 100644 --- a/edsl/inference_services/TestService.py +++ b/edsl/inference_services/TestService.py @@ -7,6 +7,7 @@ from edsl.utilities.utilities import fix_partial_correct_response from edsl.enums import InferenceServiceType +import random class TestService(InferenceServiceABC): @@ -60,22 +61,16 @@ async def async_execute_model_call( await asyncio.sleep(0.1) # return {"message": """{"answer": "Hello, world"}"""} if hasattr(self, "throw_exception") and self.throw_exception: - raise Exception("This is a test error") + if hasattr(self, "exception_probability"): + p = self.exception_probability + else: + p = 1 + + if random.random() < p: + raise Exception("This is a test error") return { "message": [{"text": f"{self._canned_response}"}], "usage": {"prompt_tokens": 1, "completion_tokens": 1}, } return TestServiceLanguageModel - - # _inference_service_ = "openai" - # _env_key_name_ = "OPENAI_API_KEY" - # _base_url_ = None - - # _sync_client_ = openai.OpenAI - # _async_client_ = openai.AsyncOpenAI - - # _sync_client_instance = None - # _async_client_instance = None - - # key_sequence = ["choices", 0, "message", "content"] diff --git a/edsl/jobs/interviews/Interview.py b/edsl/jobs/interviews/Interview.py index a2dc89b9..5d8f7aa1 100644 --- a/edsl/jobs/interviews/Interview.py +++ b/edsl/jobs/interviews/Interview.py @@ -4,10 +4,18 @@ import asyncio from typing import Any, Type, List, Generator, Optional, Union +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, + RetryError, +) + from edsl import CONFIG from edsl.surveys.base import EndOfSurvey from edsl.exceptions import QuestionAnswerValidationError -from edsl.exceptions import InterviewTimeoutError +from edsl.exceptions import QuestionAnswerValidationError from edsl.data_transfer_models import AgentResponseDict, EDSLResultObjectInput from edsl.jobs.buckets.ModelBuckets import ModelBuckets @@ -15,21 +23,18 @@ from edsl.jobs.tasks.QuestionTaskCreator import QuestionTaskCreator from edsl.jobs.tasks.TaskCreators import TaskCreators from edsl.jobs.interviews.InterviewStatusLog import InterviewStatusLog -from edsl.jobs.interviews.interview_exception_tracking import ( +from edsl.jobs.interviews.InterviewExceptionCollection import ( InterviewExceptionCollection, ) -from edsl.jobs.interviews.InterviewExceptionEntry import InterviewExceptionEntry -from edsl.jobs.interviews.retry_management import retry_strategy + from edsl.jobs.interviews.InterviewStatusMixin import InterviewStatusMixin from edsl.surveys.base import EndOfSurvey from edsl.jobs.buckets.ModelBuckets import ModelBuckets from edsl.jobs.interviews.InterviewExceptionEntry import InterviewExceptionEntry -from edsl.jobs.interviews.retry_management import retry_strategy from edsl.jobs.tasks.task_status_enum import TaskStatus from edsl.jobs.tasks.QuestionTaskCreator import QuestionTaskCreator -from edsl.exceptions import QuestionAnswerValidationError from edsl import Agent, Survey, Scenario, Cache from edsl.language_models import LanguageModel @@ -39,8 +44,11 @@ from edsl.exceptions.language_models import LanguageModelNoResponseError -class RetryableLanguageModelNoResponseError(LanguageModelNoResponseError): - pass +from edsl import CONFIG + +EDSL_BACKOFF_START_SEC = float(CONFIG.get("EDSL_BACKOFF_START_SEC")) +EDSL_BACKOFF_MAX_SEC = float(CONFIG.get("EDSL_BACKOFF_MAX_SEC")) +EDSL_MAX_ATTEMPTS = int(CONFIG.get("EDSL_MAX_ATTEMPTS")) class Interview(InterviewStatusMixin): @@ -102,9 +110,12 @@ def __init__( ) # will get filled in as interview progresses self.sidecar_model = sidecar_model + # self.stop_on_exception = False + # Trackers self.task_creators = TaskCreators() # tracks the task creators self.exceptions = InterviewExceptionCollection() + self._task_status_log_dict = InterviewStatusLog() self.skip_retry = skip_retry self.raise_validation_errors = raise_validation_errors @@ -257,44 +268,72 @@ async def _answer_question_and_record_task( ) -> "AgentResponseDict": """Answer a question and records the task.""" - invigilator = self._get_invigilator(question) + @retry( + stop=stop_after_attempt(EDSL_MAX_ATTEMPTS), + wait=wait_exponential( + multiplier=EDSL_BACKOFF_START_SEC, max=EDSL_BACKOFF_MAX_SEC + ), + retry=retry_if_exception_type(LanguageModelNoResponseError), + reraise=True, + ) + async def attempt_answer(): + invigilator = self._get_invigilator(question) - if self._skip_this_question(question): - response = invigilator.get_failed_task_result( - failure_reason="Question skipped." - ) + if self._skip_this_question(question): + return invigilator.get_failed_task_result( + failure_reason="Question skipped." + ) - try: - response: EDSLResultObjectInput = await invigilator.async_answer_question() - if response.validated: - self.answers.add_answer(response=response, question=question) - self._cancel_skipped_questions(question) - else: - if ( - hasattr(response, "exception_occurred") - and response.exception_occurred - ): - raise response.exception_occurred + try: + response: EDSLResultObjectInput = ( + await invigilator.async_answer_question() + ) + if response.validated: + self.answers.add_answer(response=response, question=question) + self._cancel_skipped_questions(question) + else: + if ( + hasattr(response, "exception_occurred") + and response.exception_occurred + ): + raise response.exception_occurred + + except QuestionAnswerValidationError as e: + self._handle_exception(e, invigilator, task) + return invigilator.get_failed_task_result( + failure_reason="Question answer validation failed." + ) - except QuestionAnswerValidationError as e: - # there's a response, but it couldn't be validated - self._handle_exception(e, invigilator, task) + except asyncio.TimeoutError as e: + self._handle_exception(e, invigilator, task) + raise LanguageModelNoResponseError( + f"Language model timed out for question '{question.question_name}.'" + ) - except asyncio.TimeoutError as e: - # the API timed-out - this is recorded but as a response isn't generated, the LanguageModelNoResponseError will also be raised - self._handle_exception(e, invigilator, task) + except Exception as e: + self._handle_exception(e, invigilator, task) - except Exception as e: - # there was some other exception - self._handle_exception(e, invigilator, task) + if "response" not in locals(): + raise LanguageModelNoResponseError( + f"Language model did not return a response for question '{question.question_name}.'" + ) - if "response" not in locals(): + # it got fixed! + if question.question_name in self.exceptions: + self.exceptions.record_fixed_question(question.question_name) + # breakpoint() - raise LanguageModelNoResponseError( - f"Language model did not return a response for question '{question.question_name}.'" - ) + return response - return response + try: + return await attempt_answer() + except RetryError as retry_error: + # All retries have failed for LanguageModelNoResponseError + original_error = retry_error.last_attempt.exception() + self._handle_exception( + original_error, self._get_invigilator(question), task + ) + raise original_error # Re-raise the original error after handling def _get_invigilator(self, question: QuestionBase) -> InvigilatorBase: """Return an invigilator for the given question. @@ -334,14 +373,26 @@ def _skip_this_question(self, current_question: "QuestionBase") -> bool: def _handle_exception( self, e: Exception, invigilator: "InvigilatorBase", task=None ): + import copy + + answers = copy.copy(self.answers) exception_entry = InterviewExceptionEntry( exception=e, invigilator=invigilator, + answers=answers, ) if task: task.task_status = TaskStatus.FAILED self.exceptions.add(invigilator.question.question_name, exception_entry) + if hasattr(self, "stop_on_exception"): + stop_on_exception = self.stop_on_exception + else: + stop_on_exception = False + + if stop_on_exception: + raise e + def _cancel_skipped_questions(self, current_question: QuestionBase) -> None: """Cancel the tasks for questions that are skipped. @@ -411,6 +462,7 @@ async def async_conduct_interview( asyncio.exceptions.CancelledError """ self.sidecar_model = sidecar_model + self.stop_on_exception = stop_on_exception # if no model bucket is passed, create an 'infinity' bucket with no rate limits if model_buckets is None or hasattr(self.agent, "answer_question_directly"): @@ -424,7 +476,9 @@ async def async_conduct_interview( self.invigilators = [ self._get_invigilator(question) for question in self.survey.questions ] - await asyncio.gather(*self.tasks, return_exceptions=not stop_on_exception) + await asyncio.gather( + *self.tasks, return_exceptions=not stop_on_exception + ) # not stop_on_exception) self.answers.replace_missing_answers_with_none(self.survey) valid_results = list(self._extract_valid_results()) return self.answers, valid_results diff --git a/edsl/jobs/interviews/interview_exception_tracking.py b/edsl/jobs/interviews/InterviewExceptionCollection.py similarity index 82% rename from edsl/jobs/interviews/interview_exception_tracking.py rename to edsl/jobs/interviews/InterviewExceptionCollection.py index 85b187f6..22645198 100644 --- a/edsl/jobs/interviews/interview_exception_tracking.py +++ b/edsl/jobs/interviews/InterviewExceptionCollection.py @@ -6,6 +6,22 @@ class InterviewExceptionCollection(UserDict): """A collection of exceptions that occurred during the interview.""" + def __init__(self): + super().__init__() + self.fixed = set() + + def unfixed_exceptions(self) -> list: + """Return a list of unfixed exceptions.""" + return {k: v for k, v in self.data.items() if k not in self.fixed} + + def num_unfixed(self) -> list: + """Return a list of unfixed questions.""" + return len([k for k in self.data.keys() if k not in self.fixed]) + + def record_fixed_question(self, question_name: str) -> None: + """Record that a question has been fixed.""" + self.fixed.add(question_name) + def add(self, question_name: str, entry: InterviewExceptionEntry) -> None: """Add an exception entry to the collection.""" question_name = question_name diff --git a/edsl/jobs/interviews/InterviewExceptionEntry.py b/edsl/jobs/interviews/InterviewExceptionEntry.py index f8449841..b9a39e9a 100644 --- a/edsl/jobs/interviews/InterviewExceptionEntry.py +++ b/edsl/jobs/interviews/InterviewExceptionEntry.py @@ -15,12 +15,14 @@ def __init__( # failed_question: FailedQuestion, invigilator: "Invigilator", traceback_format="text", + answers=None, ): self.time = datetime.datetime.now().isoformat() self.exception = exception # self.failed_question = failed_question self.invigilator = invigilator self.traceback_format = traceback_format + self.answers = answers @property def question_type(self): diff --git a/edsl/jobs/interviews/retry_management.py b/edsl/jobs/interviews/retry_management.py deleted file mode 100644 index 931fd9cf..00000000 --- a/edsl/jobs/interviews/retry_management.py +++ /dev/null @@ -1,39 +0,0 @@ -from edsl import CONFIG - -from tenacity import ( - retry, - wait_exponential, - stop_after_attempt, - retry_if_exception_type, - before_sleep, -) - -EDSL_BACKOFF_START_SEC = float(CONFIG.get("EDSL_BACKOFF_START_SEC")) -EDSL_BACKOFF_MAX_SEC = float(CONFIG.get("EDSL_BACKOFF_MAX_SEC")) -EDSL_MAX_ATTEMPTS = int(CONFIG.get("EDSL_MAX_ATTEMPTS")) - - -def print_retry(retry_state, print_to_terminal=True): - "Prints details on tenacity retries." - attempt_number = retry_state.attempt_number - exception = retry_state.outcome.exception() - wait_time = retry_state.next_action.sleep - exception_name = type(exception).__name__ - if print_to_terminal: - print( - f"Attempt {attempt_number} failed with exception '{exception_name}':" - f"{exception}", - f"now waiting {wait_time:.2f} seconds before retrying." - f"Parameters: start={EDSL_BACKOFF_START_SEC}, max={EDSL_BACKOFF_MAX_SEC}, max_attempts={EDSL_MAX_ATTEMPTS}." - "\n\n", - ) - - -retry_strategy = retry( - wait=wait_exponential( - multiplier=EDSL_BACKOFF_START_SEC, max=EDSL_BACKOFF_MAX_SEC - ), # Exponential back-off starting at 1s, doubling, maxing out at 60s - stop=stop_after_attempt(EDSL_MAX_ATTEMPTS), # Stop after 5 attempts - # retry=retry_if_exception_type(Exception), # Customize this as per your specific retry-able exception - before_sleep=print_retry, # Use custom print function for retries -) diff --git a/edsl/jobs/runners/JobsRunnerAsyncio.py b/edsl/jobs/runners/JobsRunnerAsyncio.py index df85efa7..459bc88f 100644 --- a/edsl/jobs/runners/JobsRunnerAsyncio.py +++ b/edsl/jobs/runners/JobsRunnerAsyncio.py @@ -9,34 +9,14 @@ from edsl import shared_globals from edsl.jobs.interviews.Interview import Interview -from edsl.jobs.runners.JobsRunnerStatusMixin import JobsRunnerStatusMixin +from edsl.jobs.runners.JobsRunnerStatus import JobsRunnerStatus + from edsl.jobs.tasks.TaskHistory import TaskHistory from edsl.jobs.buckets.BucketCollection import BucketCollection from edsl.utilities.decorators import jupyter_nb_handler from edsl.data.Cache import Cache from edsl.results.Result import Result from edsl.results.Results import Results -from edsl.jobs.FailedQuestion import FailedQuestion - - -def cache_with_timeout(timeout): - """ "Used to keep the generate table from being run too frequetly.""" - - def decorator(func): - cached_result = {} - last_computation_time = [0] # Using list to store mutable value - - @functools.wraps(func) - def wrapper(*args, **kwargs): - current_time = time.time() - if (current_time - last_computation_time[0]) >= timeout: - cached_result["value"] = func(*args, **kwargs) - last_computation_time[0] = current_time - return cached_result["value"] - - return wrapper - - return decorator class StatusTracker(UserList): @@ -48,7 +28,7 @@ def current_status(self): return print(f"Completed: {len(self.data)} of {self.total_tasks}", end="\r") -class JobsRunnerAsyncio(JobsRunnerStatusMixin): +class JobsRunnerAsyncio: """A class for running a collection of interviews asynchronously. It gets instaniated from a Jobs object. @@ -62,6 +42,10 @@ def __init__(self, jobs: "Jobs"): self.bucket_collection: "BucketCollection" = jobs.bucket_collection self.total_interviews: List["Interview"] = [] + self.jobs_runner_status = JobsRunnerStatus( + self, # progress_bar_stats=["percent_complete"] + ) + async def run_async_generator( self, cache: "Cache", @@ -79,6 +63,7 @@ async def run_async_generator( :param stop_on_exception: Whether to stop the interview if an exception is raised :param sidecar_model: a language model to use in addition to the interview's model :param total_interviews: A list of interviews to run can be provided instead. + :param raise_validation_errors: Whether to raise validation errors """ tasks = [] if total_interviews: # was already passed in total interviews @@ -88,8 +73,6 @@ async def run_async_generator( self._populate_total_interviews(n=n) ) # Populate self.total_interviews before creating tasks - # print("Interviews created") - for interview in self.total_interviews: interviewing_task = self._build_interview_task( interview=interview, @@ -99,11 +82,9 @@ async def run_async_generator( ) tasks.append(asyncio.create_task(interviewing_task)) - # print("Tasks created") - for task in asyncio.as_completed(tasks): - # print(f"Task {task} completed") result = await task + self.jobs_runner_status.add_completed_interview(result) yield result def _populate_total_interviews( @@ -157,12 +138,6 @@ async def _build_interview_task( raise_validation_errors=raise_validation_errors, ) - # answer_key_names = { - # k - # for k in set(answer.keys()) - # if not k.endswith("_comment") and not k.endswith("_generated_tokens") - # } - question_results = {} for result in valid_results: question_results[result.question_name] = result @@ -181,17 +156,6 @@ async def _build_interview_task( answer_dict = {k: answer[k] for k in answer_key_names} assert len(valid_results) == len(answer_key_names) - # breakpoint() - # generated_tokens_dict = { - # k + "_generated_tokens": v.generated_tokens - # for k, v in zip(answer_key_names, valid_results) - # } - - # comments_dict = { - # k + "_comment": v.comment for k, v in zip(answer_key_names, valid_results) - # } - # breakpoint() - # TODO: move this down into Interview question_name_to_prompts = dict({}) for result in valid_results: @@ -203,19 +167,19 @@ async def _build_interview_task( prompt_dictionary = {} for answer_key_name in answer_key_names: - prompt_dictionary[ - answer_key_name + "_user_prompt" - ] = question_name_to_prompts[answer_key_name]["user_prompt"] - prompt_dictionary[ - answer_key_name + "_system_prompt" - ] = question_name_to_prompts[answer_key_name]["system_prompt"] + prompt_dictionary[answer_key_name + "_user_prompt"] = ( + question_name_to_prompts[answer_key_name]["user_prompt"] + ) + prompt_dictionary[answer_key_name + "_system_prompt"] = ( + question_name_to_prompts[answer_key_name]["system_prompt"] + ) raw_model_results_dictionary = {} for result in valid_results: question_name = result.question_name - raw_model_results_dictionary[ - question_name + "_raw_model_response" - ] = result.raw_model_response + raw_model_results_dictionary[question_name + "_raw_model_response"] = ( + result.raw_model_response + ) raw_model_results_dictionary[question_name + "_cost"] = result.cost one_use_buys = ( "NA" @@ -226,7 +190,6 @@ async def _build_interview_task( ) raw_model_results_dictionary[question_name + "_one_usd_buys"] = one_use_buys - # breakpoint() result = Result( agent=interview.agent, scenario=interview.scenario, @@ -272,9 +235,9 @@ async def run( from rich.live import Live from rich.console import Console - @cache_with_timeout(1) + # @cache_with_timeout(1) def generate_table(): - return self.status_table(self.results, self.elapsed_time) + return self.jobs_runner_status.status_table() async def process_results(cache, progress_bar_context=None): """Processes results from interviews.""" @@ -310,7 +273,7 @@ def conditional_context(condition, context_manager): yield with conditional_context( - progress_bar, Live(generate_table(), console=console, refresh_per_second=1) + progress_bar, Live(generate_table(), console=console, refresh_per_second=5) ) as progress_bar_context: with cache as c: progress_task = asyncio.create_task( @@ -350,13 +313,11 @@ def conditional_context(condition, context_manager): results.task_history = task_history results.failed_questions = {} - results.has_exceptions = task_history.has_exceptions + results.has_unfixed_exceptions = task_history.has_unfixed_exceptions - # breakpoint() results.bucket_collection = self.bucket_collection - if results.has_exceptions: - # put the failed interviews in the results object as a list + if results.has_unfixed_exceptions: failed_interviews = [ interview.duplicate( iteration=interview.iteration, cache=interview.cache diff --git a/edsl/jobs/runners/JobsRunnerStatusMixin.py b/edsl/jobs/runners/JobsRunnerStatus.py similarity index 62% rename from edsl/jobs/runners/JobsRunnerStatusMixin.py rename to edsl/jobs/runners/JobsRunnerStatus.py index e2798559..aacecf41 100644 --- a/edsl/jobs/runners/JobsRunnerStatusMixin.py +++ b/edsl/jobs/runners/JobsRunnerStatus.py @@ -1,6 +1,11 @@ from __future__ import annotations -from typing import List, DefaultDict -import asyncio + +from dataclasses import dataclass, asdict +from rich.text import Text +from rich.box import SIMPLE +from rich.table import Table + +from typing import List, DefaultDict, Optional from typing import Type from collections import defaultdict @@ -22,15 +27,6 @@ from edsl.jobs.tokens.InterviewTokenUsage import InterviewTokenUsage -# return {"cache_status": token_usage_type, "details": details, "cost": f"${token_usage.cost(prices):.5f}"} - -from dataclasses import dataclass, asdict - -from rich.text import Text -from rich.box import SIMPLE -from rich.table import Table - - @dataclass class ModelInfo: model_name: str @@ -52,18 +48,49 @@ def elapsed_time(self): InterviewStatistic("elapsed_time", value=elapsed_time, digits=1, units="sec.") -class JobsRunnerStatusMixin: - # @staticmethod - # def status_dict(interviews: List[Type["Interview"]]) -> List[Type[InterviewStatusDictionary]]: - # """ - # >>> from edsl.jobs.interviews.Interview import Interview - # >>> interviews = [Interview.example()] - # >>> JobsRunnerStatusMixin().status_dict(interviews) - # [InterviewStatusDictionary({: 0, : 0, : 0, : 0, : 0, : 0, : 0, : 0, : 0, 'number_from_cache': 0})] - # """ - # return [interview.interview_status for interview in interviews] +import time + + +class JobsRunnerStatus: + + def __init__( + self, + jobs_runner: "JobsRunnerAsyncio", + progress_bar_stats: Optional[List[str]] = None, + ): + self.jobs_runner = jobs_runner + self.start_time = time.time() + self.completed_interviews = [] + self.refresh = False # only refresh if a new interview is added + + if progress_bar_stats is None: + self.statistics = [ + "elapsed_time", + "total_interviews_requested", + "completed_interviews", + "percent_complete", + "average_time_per_interview", + "task_remaining", + "estimated_time_remaining", + "exceptions", + "unfixed_exceptions", + ] + else: + self.statistics = progress_bar_stats + + @property + def total_interviews(self): + return self.jobs_runner.total_interviews + + def add_completed_interview(self, result): + self.refresh = True + self.completed_interviews.append(result.interview_hash) + + def _compute_statistic(self, stat_name: str): + completed_tasks = self.completed_interviews + elapsed_time = time.time() - self.start_time + interviews = self.total_interviews - def _compute_statistic(stat_name: str, completed_tasks, elapsed_time, interviews): stat_definitions = { "elapsed_time": lambda: InterviewStatistic( "elapsed_time", value=elapsed_time, digits=1, units="sec." @@ -104,6 +131,16 @@ def _compute_statistic(stat_name: str, completed_tasks, elapsed_time, interviews digits=1, units="sec.", ), + "exceptions": lambda: InterviewStatistic( + "exceptions", + value=sum(len(i.exceptions) for i in self.total_interviews), + units="", + ), + "unfixed_exceptions": lambda: InterviewStatistic( + "unfixed_exceptions", + value=sum(i.exceptions.num_unfixed() for i in self.total_interviews), + units="", + ), } if stat_name not in stat_definitions: raise ValueError( @@ -111,29 +148,11 @@ def _compute_statistic(stat_name: str, completed_tasks, elapsed_time, interviews ) return stat_definitions[stat_name]() - @staticmethod - def _job_level_info( - completed_tasks: List[Type[asyncio.Task]], - elapsed_time: float, - interviews: List[Type["Interview"]], - ) -> InterviewStatisticsCollection: + def _job_level_info(self) -> InterviewStatisticsCollection: interview_statistics = InterviewStatisticsCollection() - default_statistics = [ - "elapsed_time", - "total_interviews_requested", - "completed_interviews", - "percent_complete", - "average_time_per_interview", - "task_remaining", - "estimated_time_remaining", - ] - for stat_name in default_statistics: - interview_statistics.add_stat( - JobsRunnerStatusMixin._compute_statistic( - stat_name, completed_tasks, elapsed_time, interviews - ) - ) + for stat_name in self.statistics: + interview_statistics.add_stat(self._compute_statistic(stat_name)) return interview_statistics @@ -149,41 +168,18 @@ def _get_model_queues_info(interviews): waiting_dict[interview.model] += interview.interview_status.waiting for model, num_waiting in waiting_dict.items(): - yield JobsRunnerStatusMixin._get_model_info( - model, num_waiting, models_to_tokens - ) + yield JobsRunnerStatus._get_model_info(model, num_waiting, models_to_tokens) - @staticmethod def generate_status_summary( - completed_tasks: List[Type[asyncio.Task]], - elapsed_time: float, - interviews: List[Type["Interview"]], + self, include_model_queues=False, ) -> InterviewStatisticsCollection: - """Generate a summary of the status of the job runner. - - :param completed_tasks: list of completed tasks - :param elapsed_time: time elapsed since the start of the job - :param interviews: list of interviews to be conducted - - >>> from edsl.jobs.interviews.Interview import Interview - >>> interviews = [Interview.example()] - >>> completed_tasks = [] - >>> elapsed_time = 0 - >>> JobsRunnerStatusMixin().generate_status_summary(completed_tasks, elapsed_time, interviews) - {'Elapsed time': '0.0 sec.', 'Total interviews requested': '1 ', 'Completed interviews': '0 ', 'Percent complete': '0 %', 'Average time per interview': 'NA', 'Task remaining': '1 ', 'Estimated time remaining': 'NA'} - """ + """Generate a summary of the status of the job runner.""" - interview_status_summary: InterviewStatisticsCollection = ( - JobsRunnerStatusMixin._job_level_info( - completed_tasks=completed_tasks, - elapsed_time=elapsed_time, - interviews=interviews, - ) - ) + interview_status_summary: InterviewStatisticsCollection = self._job_level_info() if include_model_queues: interview_status_summary.model_queues = list( - JobsRunnerStatusMixin._get_model_queues_info(interviews) + self._get_model_queues_info(interviews) ) else: interview_status_summary.model_queues = None @@ -201,14 +197,6 @@ def _get_model_info( :param model: the model name :param num_waiting: the number of tasks waiting for capacity :param models_to_tokens: a mapping of models to token usage - - >>> from edsl.jobs.interviews.Interview import Interview - >>> interviews = [Interview.example()] - >>> models_to_tokens = defaultdict(InterviewTokenUsage) - >>> model = interviews[0].model - >>> num_waiting = 0 - >>> JobsRunnerStatusMixin()._get_model_info(model, num_waiting, models_to_tokens) - ModelInfo(model_name='...', TPM_limit_k=..., RPM_limit_k=..., num_tasks_waiting=0, token_usage_info=[ModelTokenUsageStats(token_usage_type='new_token_usage', details=[{'type': 'prompt_tokens', 'tokens': 0}, {'type': 'completion_tokens', 'tokens': 0}], cost='$0.00000'), ModelTokenUsageStats(token_usage_type='cached_token_usage', details=[{'type': 'prompt_tokens', 'tokens': 0}, {'type': 'completion_tokens', 'tokens': 0}], cost='$0.00000')]) """ ## TODO: This should probably be a coop method @@ -217,7 +205,7 @@ def _get_model_info( token_usage_info = [] for token_usage_type in ["new_token_usage", "cached_token_usage"]: token_usage_info.append( - JobsRunnerStatusMixin._get_token_usage_info( + JobsRunnerStatus._get_token_usage_info( token_usage_type, models_to_tokens, model, prices ) ) @@ -239,18 +227,7 @@ def _get_token_usage_info( model: str, prices: "TokenPricing", ) -> ModelTokenUsageStats: - """Get the token usage info for a model. - - >>> from edsl.jobs.interviews.Interview import Interview - >>> interviews = [Interview.example()] - >>> models_to_tokens = defaultdict(InterviewTokenUsage) - >>> model = interviews[0].model - >>> prices = get_token_pricing(model.model) - >>> cache_status = "new_token_usage" - >>> JobsRunnerStatusMixin()._get_token_usage_info(cache_status, models_to_tokens, model, prices) - ModelTokenUsageStats(token_usage_type='new_token_usage', details=[{'type': 'prompt_tokens', 'tokens': 0}, {'type': 'completion_tokens', 'tokens': 0}], cost='$0.00000') - - """ + """Get the token usage info for a model.""" all_token_usage: InterviewTokenUsage = models_to_tokens[model] token_usage: TokenUsage = getattr(all_token_usage, token_usage_type) @@ -284,7 +261,7 @@ def display_status_table(status_summary: InterviewStatisticsCollection) -> "Tabl ) ### Job-level statistics - JobsRunnerStatusMixin._add_statistics_to_table(table, status_summary) + JobsRunnerStatus._add_statistics_to_table(table, status_summary) ## Model-level statistics spacing = " " @@ -318,12 +295,16 @@ def display_status_table(status_summary: InterviewStatisticsCollection) -> "Tabl return table - def status_table(self, completed_tasks: List[asyncio.Task], elapsed_time: float): - summary_data = JobsRunnerStatusMixin.generate_status_summary( - completed_tasks=completed_tasks, - elapsed_time=elapsed_time, - interviews=self.total_interviews, - ) + @property + def summary_data(self): + "Return the summary data, refreshing it if necessary." + if self.refresh is True: + self._summary_data = self.generate_status_summary() + self.refresh = False + return self._summary_data + + def status_table(self): + summary_data = self.generate_status_summary() return self.display_status_table(summary_data) diff --git a/edsl/jobs/tasks/TaskHistory.py b/edsl/jobs/tasks/TaskHistory.py index 1e1c0efd..3b05e74a 100644 --- a/edsl/jobs/tasks/TaskHistory.py +++ b/edsl/jobs/tasks/TaskHistory.py @@ -50,6 +50,18 @@ def exceptions(self): """ return [i.exceptions for k, i in self._interviews.items() if i.exceptions != {}] + @property + def unfixed_exceptions(self): + """ + >>> len(TaskHistory.example().unfixed_exceptions) + 0 + """ + return [ + i.exceptions + for k, i in self._interviews.items() + if i.exceptions.num_unfixed() > 0 + ] + @property def indices(self): return [k for k, i in self._interviews.items() if i.exceptions != {}] @@ -78,6 +90,11 @@ def has_exceptions(self) -> bool: """ return len(self.exceptions) > 0 + @property + def has_unfixed_exceptions(self) -> bool: + """Return True if there are any exceptions.""" + return len(self.unfixed_exceptions) > 0 + def _repr_html_(self): """Return an HTML representation of the TaskHistory.""" from edsl.utilities.utilities import data_to_html diff --git a/edsl/questions/QuestionBase.py b/edsl/questions/QuestionBase.py index d2e645e7..b0ed4c9b 100644 --- a/edsl/questions/QuestionBase.py +++ b/edsl/questions/QuestionBase.py @@ -75,8 +75,7 @@ def fake_data_factory(self): if not hasattr(self, "_fake_data_factory"): from polyfactory.factories.pydantic_factory import ModelFactory - class FakeData(ModelFactory[self.response_model]): - ... + class FakeData(ModelFactory[self.response_model]): ... self._fake_data_factory = FakeData return self._fake_data_factory @@ -471,6 +470,7 @@ def html( self, scenario: Optional[dict] = None, agent: Optional[dict] = {}, + answers: Optional[dict] = None, include_question_name: bool = False, height: Optional[int] = None, width: Optional[int] = None, @@ -482,6 +482,13 @@ def html( if scenario is None: scenario = {} + prior_answers_dict = {} + for key, value in answers.items(): + if not key.endswith("_comment") and not key.endswith("_generated_tokens"): + prior_answers_dict[key] = {"answer": value} + + # breakpoint() + base_template = """
{% if include_question_name %} @@ -501,13 +508,25 @@ def html( base_template = Template(base_template) - params = { - "question_name": self.question_name, - "question_text": Template(self.question_text).render(scenario, agent=agent), - "question_type": self.question_type, - "question_content": Template(question_content).render(scenario), - "include_question_name": include_question_name, - } + context = { + "scenario": scenario, + "agent": agent, + } | prior_answers_dict + question_text = Template(self.question_text).render(context) + # breakpoint() + + try: + params = { + "question_name": self.question_name, + "question_text": question_text, + "question_type": self.question_type, + "question_content": Template(question_content).render(scenario), + "include_question_name": include_question_name, + } + except Exception as e: + raise ValueError( + f"Error rendering question: params = {params}, error = {e}" + ) rendered_html = base_template.render(**params) if iframe: @@ -526,6 +545,21 @@ def html( return rendered_html + @classmethod + def example_model(cls): + from edsl import Model + + q = cls.example() + m = Model("test", canned_response=cls._simulate_answer(q)["answer"]) + + return m + + @classmethod + def example_results(cls): + m = cls.example_model() + q = cls.example() + return q.by(m).run(cache=False) + def rich_print(self): """Print the question in a rich format.""" from rich.table import Table diff --git a/edsl/questions/templates/extract/__init__.py b/edsl/questions/templates/extract/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/edsl/questions/templates/rank/__init__.py b/edsl/questions/templates/rank/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/edsl/results/DatasetExportMixin.py b/edsl/results/DatasetExportMixin.py index abde54c2..0320c8c3 100644 --- a/edsl/results/DatasetExportMixin.py +++ b/edsl/results/DatasetExportMixin.py @@ -472,7 +472,11 @@ def to_scenario_list(self, remove_prefix: bool = True) -> list[dict]: from edsl import ScenarioList, Scenario list_of_dicts = self.to_dicts(remove_prefix=remove_prefix) - return ScenarioList([Scenario(d) for d in list_of_dicts]) + scenarios = [] + for d in list_of_dicts: + scenarios.append(Scenario(d)) + return ScenarioList(scenarios) + # return ScenarioList([Scenario(d) for d in list_of_dicts]) def to_agent_list(self, remove_prefix: bool = True): """Convert the results to a list of dictionaries, one per agent. diff --git a/edsl/results/Results.py b/edsl/results/Results.py index e8bf990d..d33f6ba3 100644 --- a/edsl/results/Results.py +++ b/edsl/results/Results.py @@ -245,7 +245,9 @@ def __add__(self, other: Results) -> Results: ) def __repr__(self) -> str: - return f"Results(data = {self.data}, survey = {repr(self.survey)}, created_columns = {self.created_columns})" + import reprlib + + return f"Results(data = {reprlib.repr(self.data)}, survey = {repr(self.survey)}, created_columns = {self.created_columns})" def _repr_html_(self) -> str: from IPython.display import HTML diff --git a/edsl/scenarios/FileStore.py b/edsl/scenarios/FileStore.py index 9337f867..46486fd6 100644 --- a/edsl/scenarios/FileStore.py +++ b/edsl/scenarios/FileStore.py @@ -155,8 +155,16 @@ def view(self): class PDFFileStore(FileStore): - def __init__(self, filename): - super().__init__(filename, suffix=".pdf") + def __init__( + self, + filename, + binary: Optional[bool] = None, + suffix: Optional[str] = None, + base64_string: Optional[str] = None, + ): + super().__init__( + filename, binary=binary, base64_string=base64_string, suffix=".pdf" + ) def view(self): pdf_path = self.to_tempfile() diff --git a/edsl/scenarios/Scenario.py b/edsl/scenarios/Scenario.py index 7462f8f5..5fcc3da2 100644 --- a/edsl/scenarios/Scenario.py +++ b/edsl/scenarios/Scenario.py @@ -5,6 +5,8 @@ import base64 import hashlib import os +import reprlib + from collections import UserDict from typing import Union, List, Optional, Generator from uuid import uuid4 @@ -48,11 +50,11 @@ def has_image(self) -> bool: if not hasattr(self, "_has_image"): self._has_image = False return self._has_image - - @property + + @property def has_jinja_braces(self) -> bool: """Return whether the scenario has jinja braces. This matters for rendering. - + >>> s = Scenario({"food": "I love {{wood chips}}"}) >>> s.has_jinja_braces True @@ -61,19 +63,23 @@ def has_jinja_braces(self) -> bool: if "{{" in str(value) and "}}" in value: return True return False - - def convert_jinja_braces(self, replacement_left = "<<", replacement_right = ">>") -> Scenario: + + def convert_jinja_braces( + self, replacement_left="<<", replacement_right=">>" + ) -> Scenario: """Convert Jinja braces to some other character. - + >>> s = Scenario({"food": "I love {{wood chips}}"}) >>> s.convert_jinja_braces() Scenario({'food': 'I love <>'}) - + """ new_scenario = Scenario() for key, value in self.items(): if isinstance(value, str): - new_scenario[key] = value.replace("{{", replacement_left).replace("}}", replacement_right) + new_scenario[key] = value.replace("{{", replacement_left).replace( + "}}", replacement_right + ) else: new_scenario[key] = value return new_scenario @@ -171,6 +177,7 @@ def print(self): print_json(json.dumps(self.to_dict())) def __repr__(self): + # return "Scenario(" + reprlib.repr(self.data) + ")" return "Scenario(" + repr(self.data) + ")" def _repr_html_(self): diff --git a/edsl/scenarios/ScenarioList.py b/edsl/scenarios/ScenarioList.py index 0b214bea..96ae0fae 100644 --- a/edsl/scenarios/ScenarioList.py +++ b/edsl/scenarios/ScenarioList.py @@ -43,7 +43,7 @@ def __init__(self, data: Optional[list] = None, codebook: Optional[dict] = None) def has_jinja_braces(self) -> bool: """Check if the ScenarioList has Jinja braces.""" return any([scenario.has_jinja_braces for scenario in self]) - + def convert_jinja_braces(self) -> ScenarioList: """Convert Jinja braces to Python braces.""" return ScenarioList([scenario.convert_jinja_braces() for scenario in self]) @@ -282,6 +282,10 @@ def _repr_html_(self) -> str: for s in data["scenarios"]: _ = s.pop("edsl_version") _ = s.pop("edsl_class_name") + for scenario in data["scenarios"]: + for key, value in scenario.items(): + if hasattr(value, "to_dict"): + data[key] = value.to_dict() return data_to_html(data) def tally(self, field) -> dict: diff --git a/edsl/templates/error_reporting/interview_details.html b/edsl/templates/error_reporting/interview_details.html index 1969e9d6..5be9083b 100644 --- a/edsl/templates/error_reporting/interview_details.html +++ b/edsl/templates/error_reporting/interview_details.html @@ -31,7 +31,12 @@

Exception details

Human-readable question - {{ interview.survey.get_question(question).html(scenario = interview.scenario, agent = interview.agent) }} + {{ interview.survey.get_question(question).html( + scenario = interview.scenario, + agent = interview.agent, + answers = exception_message.answers) + + }} Scenario diff --git a/edsl/utilities/utilities.py b/edsl/utilities/utilities.py index 088aa362..038a8e9a 100644 --- a/edsl/utilities/utilities.py +++ b/edsl/utilities/utilities.py @@ -20,6 +20,14 @@ from typing import Callable, Union +class CustomEncoder(json.JSONEncoder): + def default(self, obj): + try: + return json.JSONEncoder.default(self, obj) + except TypeError: + return str(obj) + + def time_it(func): @wraps(func) def wrapper(*args, **kwargs): @@ -124,7 +132,7 @@ def data_to_html(data, replace_new_lines=False): from pygments.formatters import HtmlFormatter from IPython.display import HTML - json_str = json.dumps(data, indent=4) + json_str = json.dumps(data, indent=4, cls=CustomEncoder) formatted_json = highlight( json_str, JsonLexer(), diff --git a/pyproject.toml b/pyproject.toml index 010198a5..ad4f5735 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,9 @@ license = "MIT" name = "edsl" readme = "README.md" version = "0.1.33.dev1" +include = [ + "edsl/questions/templates/**/*", +] [tool.poetry.dependencies] python = ">=3.9.1,<3.13" diff --git a/tests/agents/test_prompt_construction.py b/tests/agents/test_prompt_construction.py index 95be3f67..188da156 100644 --- a/tests/agents/test_prompt_construction.py +++ b/tests/agents/test_prompt_construction.py @@ -7,7 +7,7 @@ def test_system_prompt_traits_passed(): agent = Agent(traits={"age": 10, "hair": "brown", "height": 5.5}) i = agent._create_invigilator(question=q.example(), survey=q.example().to_survey()) - system_prompt = i.construct_system_prompt() + system_prompt = i.prompt_constructor.construct_system_prompt() assert True == all([key in system_prompt for key in agent.traits.keys()]) @@ -17,7 +17,7 @@ def test_user_prompt_question_text_passed(): from edsl import Survey i = agent._create_invigilator(question=q.example(), survey=Survey([q.example()])) - user_prompt = i.construct_user_prompt() + user_prompt = i.prompt_constructor.construct_user_prompt() assert q.example().question_text in user_prompt @@ -37,5 +37,5 @@ def test_scenario_render_in_user_prompt(): i = agent._create_invigilator( question=q, scenario=s, survey=q_no_nesting.to_survey() ) - user_prompt = i.construct_user_prompt() + user_prompt = i.prompt_constructor.construct_user_prompt() assert "Peter" in user_prompt diff --git a/tests/jobs/test_repair.py b/tests/jobs/test_repair.py new file mode 100644 index 00000000..44d65281 --- /dev/null +++ b/tests/jobs/test_repair.py @@ -0,0 +1,18 @@ +from edsl import Model, QuestionFreeText +import time +import pytest + +m = Model("test", canned_response="Hi", exception_probability=0.1, throw_exception=True) +q = QuestionFreeText(question_text="What is your name?", question_name="name") + + +def test_repair_enabled(): + results = q.by(m).run(n=100, progress_bar=False, cache=False) + assert len([x for x in results.select("answer.name").to_list() if x == None]) == 0 + + +def test_repair_off(): + with pytest.raises(Exception): + results = q.by(m).run( + n=100, progress_bar=False, cache=False, stop_on_exception=True + ) diff --git a/tests/questions/test_examples.py b/tests/questions/test_examples.py new file mode 100644 index 00000000..f2292dce --- /dev/null +++ b/tests/questions/test_examples.py @@ -0,0 +1,13 @@ +import pytest +from edsl import Question + + +@pytest.mark.parametrize("question_type", Question.available()) +def test_individual_questions(question_type): + if question_type != "functional": + q = Question.example(question_type) + r = q.example_results() + _ = hash(r) + _ = r._repr_html_() + else: + pytest.skip("Skipping functional question type")