From 4c1ee630f648f1877efea708e429771ee0b7e1a6 Mon Sep 17 00:00:00 2001 From: yzlin Date: Thu, 29 Feb 2024 18:23:26 +0800 Subject: [PATCH] merge mle & interpreter --- metagpt/actions/mi/debug_code.py | 93 -------------- metagpt/actions/mi/execute_nb_code.py | 2 +- metagpt/actions/mi/ml_action.py | 59 --------- metagpt/actions/mi/write_analysis_code.py | 93 +++++++++++--- metagpt/prompts/mi/ml_action.py | 60 +-------- metagpt/prompts/mi/write_analysis_code.py | 146 +++++++++++++++++++--- metagpt/provider/base_llm.py | 9 +- metagpt/provider/constant.py | 16 --- metagpt/provider/openai_api.py | 27 +--- metagpt/roles/mi/interpreter.py | 44 +++++-- metagpt/roles/mi/ml_engineer.py | 63 ---------- metagpt/tools/tool_type.py | 3 +- metagpt/utils/common.py | 23 ++++ 13 files changed, 273 insertions(+), 365 deletions(-) delete mode 100644 metagpt/actions/mi/debug_code.py delete mode 100644 metagpt/actions/mi/ml_action.py delete mode 100644 metagpt/roles/mi/ml_engineer.py diff --git a/metagpt/actions/mi/debug_code.py b/metagpt/actions/mi/debug_code.py deleted file mode 100644 index 8259d3de10..0000000000 --- a/metagpt/actions/mi/debug_code.py +++ /dev/null @@ -1,93 +0,0 @@ -from __future__ import annotations - -import json - -from metagpt.actions import Action -from metagpt.schema import Message -from metagpt.utils.common import CodeParser - -DEBUG_REFLECTION_EXAMPLE = ''' -Example 1: -[previous impl]: -```python -def add(a: int, b: int) -> int: - """ - Given integers a and b, return the total value of a and b. - """ - return a - b -``` - -[runtime Error]: -Tested passed: - -Tests failed: -assert add(1, 2) == 3 # output: -1 -assert add(1, 2) == 4 # output: -1 - -[reflection on previous impl]: -The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input. - -[improved impl]: -```python -def add(a: int, b: int) -> int: - """ - Given integers a and b, return the total value of a and b. - """ - return a + b -``` -''' - -REFLECTION_PROMPT = """ -Here is an example for you. -{debug_example} -[context] -{context} - -[previous impl] -{code} -[runtime Error] -{runtime_result} - -Analysis the error step by step, provide me improve method and code. Remember to follow [context] requirement. Don't forget write code for steps behind the error step. - -Output a json following the format: -```json -{{ - "reflection": str = "Reflection on previous implementation", - "improved_impl": str = "Refined code after reflection.", -}} -``` -""" - - -class DebugCode(Action): - async def run( - self, - context: list[Message] = None, - code: str = "", - runtime_result: str = "", - ) -> str: - """ - Execute the debugging process based on the provided context, code, and runtime_result. - - Args: - context (list[Message]): A list of Message objects representing the context. - code (str): The code to be debugged. - runtime_result (str): The result of the code execution. - - Returns: - str: The improved implementation based on the debugging process. - """ - - reflection_prompt = REFLECTION_PROMPT.format( - debug_example=DEBUG_REFLECTION_EXAMPLE, - context=context, - code=code, - runtime_result=runtime_result, - ) - system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation " - - rsp = await self._aask(reflection_prompt, system_msgs=[system_prompt]) - reflection = json.loads(CodeParser.parse_code(block=None, text=rsp)) - - return {"code": reflection["improved_impl"]} diff --git a/metagpt/actions/mi/execute_nb_code.py b/metagpt/actions/mi/execute_nb_code.py index a8c9c30859..02b1fb1685 100644 --- a/metagpt/actions/mi/execute_nb_code.py +++ b/metagpt/actions/mi/execute_nb_code.py @@ -201,7 +201,7 @@ def truncate(result: str, keep_len: int = 2000, is_success: bool = True): if is_success: desc = f"Executed code successfully. Truncated to show only first {keep_len} characters\n" else: - desc = f"Executed code failed, please reflect the cause of bug and then debug. Truncated to show only last {keep_len} characters\n" + desc = f"Executed code failed, please reflect on the cause of bug and then debug. Truncated to show only last {keep_len} characters\n" if result.strip().startswith(" Tuple[list[Message], str]: - # prepare tool schemas and tool-type-specific instruction - tool_schemas, tool_type_usage_prompt = await self._prepare_tools(plan=plan) - - # ML-specific variables to be used in prompt - finished_tasks = plan.get_finished_tasks() - code_context = [remove_comments(task.code) for task in finished_tasks] - code_context = "\n\n".join(code_context) - - # prepare prompt depending on tool availability & LLM call - prompt = ML_PROMPT.format( - user_requirement=plan.goal, - history_code=code_context, - current_task=plan.current_task.instruction, - column_info=column_info, - tool_type_usage_prompt=tool_type_usage_prompt, - tool_schemas=tool_schemas, - examples=USE_TOOLS_EXAMPLE if tool_schemas else USE_NO_TOOLS_EXAMPLE, - ) - - rsp = await self.llm.aask_code(prompt) - - # Extra output to be used for potential debugging - context = [Message(content=prompt, role="user")] - - return context, rsp - - -class UpdateDataColumns(Action): - async def run(self, plan: Plan = None) -> dict: - finished_tasks = plan.get_finished_tasks() - code_context = [remove_comments(task.code) for task in finished_tasks] - code_context = "\n\n".join(code_context) - prompt = UPDATE_DATA_COLUMNS.format(history_code=code_context) - rsp = await self.llm.aask_code(prompt) - return rsp diff --git a/metagpt/actions/mi/write_analysis_code.py b/metagpt/actions/mi/write_analysis_code.py index 9337487c26..aa2ced8921 100644 --- a/metagpt/actions/mi/write_analysis_code.py +++ b/metagpt/actions/mi/write_analysis_code.py @@ -11,14 +11,19 @@ from metagpt.actions import Action from metagpt.logs import logger +from metagpt.prompts.mi.ml_action import MODEL_TRAIN_EXAMPLE, USE_ML_TOOLS_EXAMPLE from metagpt.prompts.mi.write_analysis_code import ( + CHECK_DATA_PROMPT, + DEBUG_REFLECTION_EXAMPLE, + REFLECTION_PROMPT, + STRUCTUAL_PROMPT, TOOL_RECOMMENDATION_PROMPT, - TOOL_USAGE_PROMPT, ) from metagpt.schema import Message, Plan, SystemMessage from metagpt.tools import TOOL_REGISTRY from metagpt.tools.tool_registry import validate_tool_names -from metagpt.utils.common import CodeParser +from metagpt.tools.tool_type import ToolType +from metagpt.utils.common import CodeParser, process_message, remove_comments class WriteCodeWithTools(Action): @@ -27,7 +32,6 @@ class WriteCodeWithTools(Action): use_tools: bool = True # selected tools to choose from, listed by their names. An empty list means selection from all tools. selected_tools: list[str] = [] - DEFAULT_SYSTEM_MSG: str = """You are Code Interpreter, a world-class programmer that can complete any goal by executing code. Strictly follow the plan and generate code step by step. Each step of the code will be executed on the user's machine, and the user will provide the code execution results to you.**Notice: The code for the next step depends on the code for the previous step. Must reuse variables in the lastest other code directly, dont creat it again, it is very import for you. Use !pip install in a standalone block to install missing packages.Usually the libraries you need are already installed.Dont check if packages already imported.**""" # prompt reference: https://github.com/KillianLucas/open-interpreter/blob/v0.1.4/interpreter/system_message.txt def _insert_system_message(self, context: list[Message], system_msg: str = None): system_msg = system_msg or self.DEFAULT_SYSTEM_MSG @@ -98,6 +102,16 @@ async def _prepare_tools(self, plan: Plan) -> Tuple[dict, str]: TOOL_REGISTRY.get_tool_type(tool_type).usage_prompt if TOOL_REGISTRY.has_tool_type(tool_type) else "" ) + # ML-specific tool usage examples + examples = "" + if plan.current_task.task_type in [ + ToolType.DATA_PREPROCESS.type_name, + ToolType.FEATURE_ENGINEERING.type_name, + ]: + examples = USE_ML_TOOLS_EXAMPLE + elif plan.current_task.task_type in [ToolType.MODEL_TRAIN.type_name]: + examples = MODEL_TRAIN_EXAMPLE + # prepare schemas of available tools tool_schemas = {} available_tools = self._get_tools_by_type(tool_type) @@ -105,27 +119,72 @@ async def _prepare_tools(self, plan: Plan) -> Tuple[dict, str]: available_tools = {tool_name: tool.schemas["description"] for tool_name, tool in available_tools.items()} tool_schemas = await self._recommend_tool(plan.current_task.instruction, available_tools) - return tool_schemas, tool_type_usage_prompt + return tool_schemas, tool_type_usage_prompt, examples + + async def _debug_with_reflection(self, context: list[Message], working_memory: list[Message]): + reflection_prompt = REFLECTION_PROMPT.format( + debug_example=DEBUG_REFLECTION_EXAMPLE, + context=context, + previous_impl=working_memory, + ) + # print(reflection_prompt) + system_prompt = "You are an AI Python assistant. You will be given your previous implementation code of a task, runtime error results, and a hint to change the implementation appropriately. Write your full implementation " + + rsp = await self._aask(reflection_prompt, system_msgs=[system_prompt]) + reflection = json.loads(CodeParser.parse_code(block=None, text=rsp)) + + return reflection["improved_impl"] async def run( self, - context: list[Message], plan: Plan, + working_memory: list[Message] = [], + use_reflection: bool = False, **kwargs, ) -> str: - if self.use_tools: - # prepare tool schemas and tool-type-specific instruction - tool_schemas, tool_type_usage_prompt = await self._prepare_tools(plan=plan) + # prepare tool schemas and tool-type-specific instruction + tool_schemas, tool_type_usage_prompt, examples = await self._prepare_tools(plan=plan) + + # necessary components to be used in prompt + finished_tasks = plan.get_finished_tasks() + code_written = [remove_comments(task.code) for task in finished_tasks] + code_written = "\n\n".join(code_written) + task_results = [task.result for task in finished_tasks] + task_results = "\n\n".join(task_results) + + # structure prompt + structual_prompt = STRUCTUAL_PROMPT.format( + user_requirement=plan.goal, + code_written=code_written, + task_results=task_results, + current_task=plan.current_task.instruction, + tool_type_usage_prompt=tool_type_usage_prompt, + tool_schemas=tool_schemas, + examples=examples, + ) + context = [Message(content=structual_prompt, role="user")] + working_memory + context = process_message(context) + + # temp = context + working_memory + # print(*temp, sep="***\n\n***") - # form a complete tool usage instruction and include it as a message in context - tools_instruction = TOOL_USAGE_PROMPT.format( - tool_schemas=tool_schemas, tool_type_usage_prompt=tool_type_usage_prompt - ) - context.append(Message(content=tools_instruction, role="user")) + # LLM call + if not use_reflection: + rsp = await self.llm.aask(context, **kwargs) + code = CodeParser.parse_code(block=None, text=rsp) - # prepare prompt & LLM call - prompt = self._insert_system_message(context) + else: + code = await self._debug_with_reflection(context=context, working_memory=working_memory) - rsp = await self.llm.aask_code(prompt) + return code - return rsp + +class CheckData(Action): + async def run(self, plan: Plan = None) -> dict: + finished_tasks = plan.get_finished_tasks() + code_written = [remove_comments(task.code) for task in finished_tasks] + code_written = "\n\n".join(code_written) + prompt = CHECK_DATA_PROMPT.format(code_written=code_written) + rsp = await self._aask(prompt) + code = CodeParser.parse_code(block=None, text=rsp) + return code diff --git a/metagpt/prompts/mi/ml_action.py b/metagpt/prompts/mi/ml_action.py index ed7ea59e4e..4d769379a7 100644 --- a/metagpt/prompts/mi/ml_action.py +++ b/metagpt/prompts/mi/ml_action.py @@ -4,62 +4,7 @@ # @Author : lidanyang # @File : ml_action # @Desc : -UPDATE_DATA_COLUMNS = """ -# Background -Keep dataset column information updated before model train. -## Tasks Done -```python -{history_code} -```end - -# Task -Print the the latest column information after 'Tasks Done' code. Use the following code: -```python -from metagpt.tools.libs.data_preprocess import get_column_info - -column_info = get_column_info(df) -print("column_info") -print(column_info) -```end - -# Constraints: -- Use the DataFrame variable from 'Tasks Done' in place of df. -- Your code is to be added to a new cell in jupyter. -""" - -ML_PROMPT = """ -# Background -As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook. - -## Done Tasks -```python -{history_code} -```end - -## Current Task -{current_task} - -# Latest Data Info -Latest data info after previous tasks: -{column_info} - -# Task -Write complete code for 'Current Task'. And avoid duplicating code from 'Done Tasks', such as repeated import of packages, reading data, etc. -Specifically, {tool_type_usage_prompt} - -# Capabilities -- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class. -- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc.. - -# Available Tools: -Each Class tool is described in JSON format. When you call a tool, import the tool from its path first. -{tool_schemas} - -{examples} -""" - -USE_NO_TOOLS_EXAMPLE = """ -# Output Example: +MODEL_TRAIN_EXAMPLE = """ when current task is "train a lightgbm model on training data", the code can be like: ```python # Step 1: check data type and convert to numeric @@ -80,8 +25,7 @@ - Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed. """ -USE_TOOLS_EXAMPLE = """ -# Output Example: +USE_ML_TOOLS_EXAMPLE = """ when current task is "do data preprocess, like fill missing value, handle outliers, etc.", the code can be like: ```python # Step 1: fill missing value diff --git a/metagpt/prompts/mi/write_analysis_code.py b/metagpt/prompts/mi/write_analysis_code.py index 6cae15e24b..0974f368f0 100644 --- a/metagpt/prompts/mi/write_analysis_code.py +++ b/metagpt/prompts/mi/write_analysis_code.py @@ -1,3 +1,130 @@ +STRUCTUAL_PROMPT = """ +# Background +As a data scientist, you need to help user to achieve their goal [{user_requirement}] step-by-step in an continuous Jupyter notebook. Since it is a notebook environment, don't use asyncio.run. Instead, use await if you need to call an async function. + +# Finished Tasks +## code +```python +{code_written} +``` + +## execution result +{task_results} + +# Current Task +{current_task} + +# Instruction +Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc. +Specifically, {tool_type_usage_prompt} + +# Capabilities +- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python class or function. +- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc.. + +# Available Tools: +Each tool is described in JSON format. When you call a tool, import the tool from its path first. +{tool_schemas} + +# Examples +{examples} + +# Output +Output code in the following format: +```python +your code +``` +""" + +DEBUG_REFLECTION_EXAMPLE = ''' +[previous impl]: +assistant: +```python +def add(a: int, b: int) -> int: + """ + Given integers a and b, return the total value of a and b. + """ + return a - b +``` + +user: +Tests failed: +assert add(1, 2) == 3 # output: -1 +assert add(1, 2) == 4 # output: -1 + +[reflection on previous impl]: +The implementation failed the test cases where the input integers are 1 and 2. The issue arises because the code does not add the two integers together, but instead subtracts the second integer from the first. To fix this issue, we should change the operator from `-` to `+` in the return statement. This will ensure that the function returns the correct output for the given input. + +[improved impl]: +```python +def add(a: int, b: int) -> int: + """ + Given integers a and b, return the total value of a and b. + """ + return a + b +``` +''' + +REFLECTION_PROMPT = """ +[example] +Here is an example of debugging with reflection. +{debug_example} +[/example] + +[context] +{context} + +[previous impl]: +{previous_impl} + +[instruction] +Analyze your previous code and error in [context] step by step, provide me with improved method and code. Remember to follow [context] requirement. Don't forget to write code for steps behind the error step. +Output a json following the format: +```json +{{ + "reflection": str = "Reflection on previous implementation", + "improved_impl": str = "Refined code after reflection.", +}} +``` +""" + +CHECK_DATA_PROMPT = """ +# Background +Check latest data info to guide subsequent tasks. + +## Finished Tasks +```python +{code_written} +```end + +# Task +Check code in finished tasks, print key variables to guide your following actions. +Specifically, if it is a data analysis or machine learning task, print the the latest column information using the following code, with DataFrame variable from 'Finished Tasks' in place of df: +```python +from metagpt.tools.libs.data_preprocess import get_column_info + +column_info = get_column_info(df) +print("column_info") +print(column_info) +```end +Otherwise, you may write any codes you see fit. Return an empty string if you think there is no important data to check. + +# Constraints: +- Your code is to be added to a new cell in jupyter. + +# Instruction +Output code following the format: +```python +your code +``` +""" + +DATA_INFO = """ +# Latest Data Info +Latest data info after previous tasks: +{info} +""" + TOOL_RECOMMENDATION_PROMPT = """ ## User Requirement: {current_task} @@ -18,22 +145,3 @@ ["tool_name1", "tool_name2", ...] ``` """ - - -TOOL_USAGE_PROMPT = """ -# Instruction -Write complete code for 'Current Task'. And avoid duplicating code from finished tasks, such as repeated import of packages, reading data, etc. -Specifically, {tool_type_usage_prompt} - -# Capabilities -- You can utilize pre-defined tools in any code lines from 'Available Tools' in the form of Python Class. -- You can freely combine the use of any other public packages, like sklearn, numpy, pandas, etc.. - -# Available Tools (can be empty): -Each Class tool is described in JSON format. When you call a tool, import the tool first. -{tool_schemas} - -# Constraints: -- Ensure the output new code is executable in the same Jupyter notebook with previous tasks code have been executed. -- Always prioritize using pre-defined tools for the same functionality. -""" diff --git a/metagpt/provider/base_llm.py b/metagpt/provider/base_llm.py index 2137bb2dc3..966a9fcd61 100644 --- a/metagpt/provider/base_llm.py +++ b/metagpt/provider/base_llm.py @@ -69,7 +69,7 @@ def _default_system_msg(self): async def aask( self, - msg: str, + msg: Union[str, list[dict[str, str]]], system_msgs: Optional[list[str]] = None, format_msgs: Optional[list[dict[str, str]]] = None, images: Optional[Union[str, list[str]]] = None, @@ -84,7 +84,10 @@ async def aask( message = [] if format_msgs: message.extend(format_msgs) - message.append(self._user_msg(msg, images=images)) + if isinstance(msg, str): + message.append(self._user_msg(msg, images=images)) + else: + message.extend(msg) logger.debug(message) rsp = await self.acompletion_text(message, stream=stream, timeout=timeout) return rsp @@ -103,7 +106,7 @@ async def aask_batch(self, msgs: list, timeout=3) -> str: return self._extract_assistant_rsp(context) async def aask_code( - self, messages: Union[str, Message, list[dict]], timeout=3, language: str = "", **kwargs + self, messages: Union[str, Message, list[dict]], timeout=3, include_language: bool = False, **kwargs ) -> dict: raise NotImplementedError diff --git a/metagpt/provider/constant.py b/metagpt/provider/constant.py index cfece6a79a..dee78dc3bc 100644 --- a/metagpt/provider/constant.py +++ b/metagpt/provider/constant.py @@ -26,22 +26,6 @@ } -CODE_ONLY_FUNCTION_SCHEMA = { - "name": "add_new_code", - "description": "Add new code cell of current task to the end of an active Jupyter notebook.", - "parameters": { - "type": "object", - "properties": { - "code": { - "type": "string", - "description": "The code to be added to a new cell in jupyter.", - }, - }, - "required": ["code"], - }, -} - - # tool_choice value for general_function_schema # https://platform.openai.com/docs/api-reference/chat/create#chat-create-tool_choice GENERAL_TOOL_CHOICE = {"type": "function", "function": {"name": "execute"}} diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index f1d53c75a8..388149256b 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -28,8 +28,7 @@ from metagpt.provider.base_llm import BaseLLM from metagpt.provider.constant import CODE_ONLY_FUNCTION_SCHEMA, GENERAL_FUNCTION_SCHEMA from metagpt.provider.llm_provider_registry import register_provider -from metagpt.schema import Message -from metagpt.utils.common import CodeParser, decode_image +from metagpt.utils.common import CodeParser, decode_image, process_message from metagpt.utils.cost_manager import CostManager, Costs from metagpt.utils.exceptions import handle_exception from metagpt.utils.token_counter import ( @@ -145,32 +144,10 @@ async def acompletion_text(self, messages: list[dict], stream=False, timeout=3) rsp = await self._achat_completion(messages, timeout=timeout) return self.get_choice_text(rsp) - def _process_message(self, messages: Union[str, Message, list[dict], list[Message], list[str]]) -> list[dict]: - """convert messages to list[dict].""" - # 全部转成list - if not isinstance(messages, list): - messages = [messages] - - # 转成list[dict] - processed_messages = [] - for msg in messages: - if isinstance(msg, str): - processed_messages.append({"role": "user", "content": msg}) - elif isinstance(msg, dict): - assert set(msg.keys()) == set(["role", "content"]) - processed_messages.append(msg) - elif isinstance(msg, Message): - processed_messages.append(msg.to_dict()) - else: - raise ValueError( - f"Only support message type are: str, Message, dict, but got {type(messages).__name__}!" - ) - return processed_messages - async def _achat_completion_function( self, messages: list[dict], timeout: int = 3, **chat_configs ) -> ChatCompletion: - messages = self._process_message(messages) + messages = process_message(messages) kwargs = self._cons_kwargs(messages=messages, timeout=timeout, **chat_configs) rsp: ChatCompletion = await self.aclient.chat.completions.create(**kwargs) self._update_costs(rsp.usage) diff --git a/metagpt/roles/mi/interpreter.py b/metagpt/roles/mi/interpreter.py index d5607bfe3e..0e26dd71a4 100644 --- a/metagpt/roles/mi/interpreter.py +++ b/metagpt/roles/mi/interpreter.py @@ -4,10 +4,12 @@ from metagpt.actions.mi.ask_review import ReviewConst from metagpt.actions.mi.execute_nb_code import ExecuteNbCode -from metagpt.actions.mi.write_analysis_code import WriteCodeWithTools +from metagpt.actions.mi.write_analysis_code import CheckData, WriteCodeWithTools from metagpt.logs import logger +from metagpt.prompts.mi.write_analysis_code import DATA_INFO from metagpt.roles import Role from metagpt.schema import Message, Task, TaskResult +from metagpt.tools.tool_type import ToolType class Interpreter(Role): @@ -15,6 +17,7 @@ class Interpreter(Role): profile: str = "Interpreter" auto_run: bool = True use_tools: bool = False + use_reflection: bool = False execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True) tools: list[str] = [] @@ -48,14 +51,16 @@ async def _write_and_exec_code(self, max_retry: int = 3): counter = 0 success = False + await self._check_data() + while not success and counter < max_retry: ### write code ### - code, cause_by = await self._write_code() + code, cause_by = await self._write_code(counter) - self.working_memory.add(Message(content=code["code"], role="assistant", cause_by=cause_by)) + self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) ### execute code ### - result, success = await self.execute_code.run(**code) + result, success = await self.execute_code.run(code) print(result) self.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode)) @@ -69,14 +74,33 @@ async def _write_and_exec_code(self, max_retry: int = 3): if ReviewConst.CHANGE_WORDS[0] in review: counter = 0 # redo the task again with help of human suggestions - return code["code"], result, success + return code, result, success - async def _write_code(self): + async def _write_code(self, counter): todo = WriteCodeWithTools(use_tools=self.use_tools, selected_tools=self.tools) logger.info(f"ready to {todo.name}") - - context = self.planner.get_useful_memories() - # print(*context, sep="\n***\n") - code = await todo.run(context=context, plan=self.planner.plan, temperature=0.0) + use_reflection = counter > 0 and self.use_reflection + code = await todo.run( + plan=self.planner.plan, working_memory=self.working_memory.get(), use_reflection=use_reflection + ) return code, todo + + async def _check_data(self): + current_task = self.planner.plan.current_task + if current_task.task_type not in [ + ToolType.DATA_PREPROCESS.type_name, + ToolType.FEATURE_ENGINEERING.type_name, + ToolType.MODEL_TRAIN.type_name, + ]: + return + logger.info("Check updated data") + code = await CheckData().run(self.planner.plan) + if not code: + return + success = False + result, success = await self.execute_code.run(code) + if success: + print(result) + data_info = DATA_INFO.format(info=result) + self.working_memory.add(Message(content=data_info, role="user", cause_by=CheckData)) diff --git a/metagpt/roles/mi/ml_engineer.py b/metagpt/roles/mi/ml_engineer.py deleted file mode 100644 index 7d7676eef9..0000000000 --- a/metagpt/roles/mi/ml_engineer.py +++ /dev/null @@ -1,63 +0,0 @@ -from metagpt.actions.mi.debug_code import DebugCode -from metagpt.actions.mi.execute_nb_code import ExecuteNbCode -from metagpt.actions.mi.ml_action import UpdateDataColumns, WriteCodeWithToolsML -from metagpt.logs import logger -from metagpt.roles.mi.interpreter import Interpreter -from metagpt.tools.tool_type import ToolType -from metagpt.utils.common import any_to_str - - -class MLEngineer(Interpreter): - name: str = "Mark" - profile: str = "MLEngineer" - debug_context: list = [] - latest_code: str = "" - - async def _write_code(self): - if not self.use_tools: - return await super()._write_code() - - # In a trial and errors settings, check whether this is our first attempt to tackle the task. If there is no code execution before, then it is. - is_first_trial = any_to_str(ExecuteNbCode) not in [msg.cause_by for msg in self.working_memory.get()] - - if is_first_trial: - # For the first trial, write task code from scratch - column_info = await self._update_data_columns() - - logger.info("Write code with tools") - tool_context, code = await WriteCodeWithToolsML(selected_tools=self.tools).run( - context=[], # context assembled inside the Action - plan=self.planner.plan, - column_info=column_info, - ) - self.debug_context = tool_context - cause_by = WriteCodeWithToolsML - - else: - # Previous trials resulted in error, debug and rewrite the code - logger.warning("We got a bug, now start to debug...") - code = await DebugCode().run( - code=self.latest_code, - runtime_result=self.working_memory.get(), - context=self.debug_context, - ) - cause_by = DebugCode - - self.latest_code = code["code"] - - return code, cause_by - - async def _update_data_columns(self): - current_task = self.planner.plan.current_task - if current_task.task_type not in [ - ToolType.DATA_PREPROCESS.type_name, - ToolType.FEATURE_ENGINEERING.type_name, - ToolType.MODEL_TRAIN.type_name, - ]: - return "" - logger.info("Check columns in updated data") - code = await UpdateDataColumns().run(self.planner.plan) - success = False - result, success = await self.execute_code.run(**code) - print(result) - return result if success else "" diff --git a/metagpt/tools/tool_type.py b/metagpt/tools/tool_type.py index e9c9a3482b..093a69bf7a 100644 --- a/metagpt/tools/tool_type.py +++ b/metagpt/tools/tool_type.py @@ -19,7 +19,8 @@ class ToolType(Enum): ) DATA_PREPROCESS = ToolTypeDef( name="data_preprocess", - desc="Only for changing value inplace.", + desc="For preprocessing dataset in a data analysis or machine learning task ONLY," + "general data operation doesn't fall into this type", usage_prompt=DATA_PREPROCESS_PROMPT, ) EMAIL_LOGIN = ToolTypeDef( diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index f829c1289d..4f86deab10 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -666,3 +666,26 @@ def decode_image(img_url_or_b64: str) -> Image: img_data = BytesIO(base64.b64decode(b64_data)) img = Image.open(img_data) return img + + +def process_message(messages: Union[str, Message, list[dict], list[Message], list[str]]) -> list[dict]: + """convert messages to list[dict].""" + from metagpt.schema import Message + + # 全部转成list + if not isinstance(messages, list): + messages = [messages] + + # 转成list[dict] + processed_messages = [] + for msg in messages: + if isinstance(msg, str): + processed_messages.append({"role": "user", "content": msg}) + elif isinstance(msg, dict): + assert set(msg.keys()) == set(["role", "content"]) + processed_messages.append(msg) + elif isinstance(msg, Message): + processed_messages.append(msg.to_dict()) + else: + raise ValueError(f"Only support message type are: str, Message, dict, but got {type(messages).__name__}!") + return processed_messages