diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b9f14a7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.cache +.history +llm_cache.sqlite +coding +*.pyc \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b0f4dd5 --- /dev/null +++ b/README.md @@ -0,0 +1,50 @@ +# Sibyl: Simple yet Effective Agent Framework for Complex Real-world Reasoning +

+Sibyl System +

+

+ [📄arXiv] + [🤗HF Paper] + [🛠️Code] +

+ +This is an experimental project. We are attempting to design a general assistant system that evolves from System1 to System2. The name Sibyl comes from the multi-agent system composed of numerous human brains in [Psycho-Pass](https://psychopass.fandom.com/wiki/Sibyl_System). + +## Citation +If you find our work useful, please cite our paper: +``` +@article{wang2024sibyl, + title={Sibyl: Simple yet Effective Agent Framework for Complex Real-world Reasoning}, + author={Yulong Wang and Tianhao Shen and Lifeng Liu and Jian Xie}, + year={2024}, + eprint={2407.10718}, + archivePrefix={arXiv}, + primaryClass={cs.AI}, + url={https://arxiv.org/abs/2407.10718}, +} +``` + + +## Benchmark +### GAIA +|Model Name|Average score (%)|Level 1 score (%)| Level 2 score (%) | Level 3 score (%)| +|-|-|-|-|-| +|**Sibyl System v0.2**|34.55|47.31|32.7|16.33| +|Multi-Agent Experiment v0.1 (powered by AutoGen)|32.33|47.31|28.93|14.58| +|FRIDAY|24.25|40.86|20.13|6.12| +|GPT4 + manually selected plugins|14.6|30.3|9.7|0| +|GPT4 Turbo|6.67|9.68|6.92|0| +|AutoGPT4|5|15.05|0.63|0| + +## Philosophy +### From System1 to System2 + +Currently popular assistant systems like ChatGPT are designed to solve human decision-making problems at the minute level. Even with methods such as CoT and ReAct, they encounter significant difficulties in handling problems at the 10-minute level. Our system aims to gradually solve problems from the minute level to the hour level and even the day level. + +### Complexity Control + +Decoder-only models have a beneficial characteristic of being pure functions, which allows us to better control complexity. However, as we evolve from System1 to System2, the introduction of states inevitably causes the system's complexity to gradually spiral out of control. Some existing Multi-Agent solutions introduce too many states, making the system difficult to scale sustainably. We aim to control this Multi-Agent characteristic within parts of the system or push it to the system's edges. + +## Contact + +If you have any inquiries, please feel free to raise an issue or reach out to us via email at: wangyulong@gmail.com, thshen@tju.edu.cn diff --git a/imgs/Sibyl.png b/imgs/Sibyl.png new file mode 100644 index 0000000..8826862 Binary files /dev/null and b/imgs/Sibyl.png differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..761ee48 --- /dev/null +++ b/main.py @@ -0,0 +1,385 @@ +import os +from typing import Tuple, Any, List +import re +import sqlite3 + +from langchain_core.outputs import LLMResult +import ray +ray.init() + +from datasets import load_dataset + +from rich import print as pp +from rich.console import Console +from rich.table import Table +from rich import box + +from langchain_core.pydantic_v1 import BaseModel, Field +from langchain_core.prompts import ChatPromptTemplate, PromptTemplate +from langchain_core.output_parsers import JsonOutputParser, StrOutputParser +from langchain_openai import ChatOpenAI +from langchain_community.cache import SQLiteCache +from langchain_core.callbacks import BaseCallbackHandler + +from autogen.code_utils import execute_code +import autogen +from autogen.agentchat.contrib.society_of_mind_agent import SocietyOfMindAgent + +from utils.score import question_scorer +from utils.browser_utils import SimpleTextBrowser + +MODEL='gpt-4o' +DATA_NAME = '2023_level1' +SPLIT = 'validation' +OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') +OPENAI_API_BASE = os.getenv('OPENAI_API_BASE') +BING_API_KEY = os.getenv('BING_API_KEY') + +os.makedirs(f".cache/qa_cache/{SPLIT}", exist_ok=True) +qa_cache_db = sqlite3.connect(f".cache/qa_cache/{SPLIT}/{DATA_NAME}.db") +qa_cache_db.execute('''CREATE TABLE IF NOT EXISTS qa_cache + (question TEXT PRIMARY KEY NOT NULL, + answer TEXT NOT NULL);''') +qa_cache_db.commit() +qa_cache_db.close() + +class LLMCallbackHandler(BaseCallbackHandler): + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any: + print(f"LLM response: {response}") + +class Answer(BaseModel): + reason: str = Field(description="Step by step reasoning") + answer: str = Field(description="The answer to the question") + +class StepNote(BaseModel): + snippets: List[str] = Field(description="The snippets may use to answer the question, each snippet should less than 1000 characters") + plan: str = Field(description="Plan for the next step") + +class ToolChoice(BaseModel): + reason: str = Field(description="Step by step reasoning") + tool: str = Field(description="The tool to use") + tool_args: dict = Field(description="The arguments to pass to the tool") + +class ImproveCode(BaseModel): + reason: str = Field(description="Step by step reasoning on how to improve the code") + improved_code: str = Field(description="The improved code") + +with open("prompts/format_answer.txt") as f: + FORMAT_ANSWER_PROMPT = ChatPromptTemplate.from_template(f.read()) + +with open('prompts/choose_tool.txt') as f: + CHOOSE_TOOL_PROMPT_TEMPLATE = f.read() + +with open('prompts/summarize_step.txt') as f: + SUMMARIZE_STEP_PROMPT_TEMPLATE = ChatPromptTemplate.from_template(f.read()) + +with open('prompts/improve_code.txt') as f: + IMPROVE_CODE_PROMPT_TEMPLATE = f.read() + +@ray.remote +class Sibyl: + def __init__(self): + cache = SQLiteCache("llm_cache.sqlite") + self.llm = ChatOpenAI(model=MODEL, temperature=0, streaming=False, max_retries=5, api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE, cache=cache) + self.llm_without_cache = ChatOpenAI(model=MODEL, temperature=0.1, streaming=False, max_retries=5, api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE) + self.format_answer_chain = FORMAT_ANSWER_PROMPT | self.llm | StrOutputParser() + + self.tool_choice_output_parser = JsonOutputParser(pydantic_object=ToolChoice) + choose_tool_prompt = PromptTemplate( + template=CHOOSE_TOOL_PROMPT_TEMPLATE, + input_variables=['steps', 'question'], + partial_variables={"format_instructions": self.tool_choice_output_parser.get_format_instructions()} + ) + self.choose_tool_chain = choose_tool_prompt | self.llm | self.tool_choice_output_parser + self.choose_tool_chain_without_cache = choose_tool_prompt | self.llm_without_cache | self.tool_choice_output_parser + + self.improve_code_output_parser = JsonOutputParser(pydantic_object=ImproveCode) + improve_code_prompt = PromptTemplate( + template=IMPROVE_CODE_PROMPT_TEMPLATE, + input_variables=['steps', 'question', 'code'], + partial_variables={"format_instructions": self.improve_code_output_parser.get_format_instructions()} + ) + self.improve_code_chain = improve_code_prompt | self.llm | self.improve_code_output_parser + self.improve_code_chain_without_cache = improve_code_prompt | self.llm_without_cache | self.improve_code_output_parser + + self.summarize_tool_chain = SUMMARIZE_STEP_PROMPT_TEMPLATE | self.llm | StrOutputParser() + + browser_config={ + "bing_api_key": BING_API_KEY, + "viewport_size": 1024 * 16, + "downloads_folder": "coding", + "request_kwargs": { + "headers": {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"}, + }, + } + self.browser = SimpleTextBrowser(**browser_config) + self.llm_callback_handler = LLMCallbackHandler() + + agent1 = autogen.ConversableAgent( + name="Actor", + system_message='''You are a helpful assistant. When answering a question, you must explain your thought process step by step before answering the question. When others make suggestions about your answers, think carefully about whether or not to adopt the opinions of others. +If you are unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc.''', + llm_config={"config_list": [{"model": MODEL, "temperature": 0.1, "api_key": OPENAI_API_KEY, "base_url": OPENAI_API_BASE}]}, + is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, + ) + + agent2 = autogen.ConversableAgent( + name="Critic", + system_message='''You are a helpful assistant.You want to help others spot logical or intellectual errors. When and only when you can't find a logical flaw in the other person's reasoning, you should say "TERMINATE" to end the conversation.''', + llm_config={"config_list": [{"model": MODEL, "temperature": 0, "api_key": OPENAI_API_KEY, "base_url": OPENAI_API_BASE}]}, + ) + + groupchat = autogen.GroupChat( + agents=[agent1, agent2], + messages=[], + speaker_selection_method="round_robin", + allow_repeat_speaker=False, + max_round=8, + ) + + manager = autogen.GroupChatManager( + groupchat=groupchat, + is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, + llm_config={"config_list": [{"model": MODEL, "temperature": 0.0, "api_key": OPENAI_API_KEY, "base_url": OPENAI_API_BASE}]}, + ) + + self.society_of_mind_agent = SocietyOfMindAgent( + "society_of_mind", + chat_manager=manager, + llm_config={"config_list": [{"model": MODEL, "temperature": 0.0, "api_key": OPENAI_API_KEY, "base_url": OPENAI_API_BASE}]} + ) + + self.user_proxy = autogen.UserProxyAgent( + "user_proxy", + human_input_mode="NEVER", + code_execution_config=False, + default_auto_reply="", + is_termination_msg=lambda x: True, + ) + + def browser_state(self) -> Tuple[str, str]: + header = f"Address: {self.browser.address}\n" + if self.browser.page_title is not None: + header += f"Title: {self.browser.page_title}\n" + + current_page = self.browser.viewport_current_page + total_pages = len(self.browser.viewport_pages) + + header += f"Viewport position: Showing page {current_page+1} of {total_pages}.\n" + return (header, self.browser.viewport) + + def informational_web_search(self, query: str) -> str: + self.browser.visit_page(f"bing: {query}") + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def navigational_web_search(self, query: str) -> str: + self.browser.visit_page(f"bing: {query}") + # Extract the first linl + m = re.search(r"\[.*?\]\((http.*?)\)", self.browser.page_content) + if m: + self.browser.visit_page(m.group(1)) + + # Return where we ended up + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def visit_page(self, url: str) -> str: + self.browser.visit_page(url) + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def page_up(self) -> str: + self.browser.page_up() + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def page_down(self) -> str: + self.browser.page_down() + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def download_file(self, url: str) -> str: + self.browser.visit_page(url) + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def find_on_page_ctrl_f(self, search_string: str) -> str: + find_result = self.browser.find_on_page(search_string) + header, content = self.browser_state() + + if find_result is None: + return ( + header.strip() + + "\n=======================\nThe search string '" + + search_string + + "' was not found on this page." + ) + else: + return header.strip() + "\n=======================\n" + content + + def find_next(self) -> str: + find_result = self.browser.find_next() + header, content = self.browser_state() + + if find_result is None: + return header.strip() + "\n=======================\nThe search string was not found on this page." + else: + return header.strip() + "\n=======================\n" + content + + def computer_terminal(self, code: str) -> str: + status_code, stdout, _ = execute_code(code, work_dir='coding', use_docker=False, timeout=20) + return { + "status_code": status_code, + "stdout": stdout, + } + + def ask(self, raw_question: str, attachment_name: str = None) -> str: + cache_db = sqlite3.connect(f".cache/qa_cache/{SPLIT}/{DATA_NAME}.db") + cursor = cache_db.cursor() + cursor.execute(f"SELECT answer FROM qa_cache WHERE question = ?", (raw_question,)) + row = cursor.fetchone() + cache_db.close() + + if row is not None: + print(f"Cache hit for question: {raw_question}") + return row[0] + else: + print(f"Cache miss for question: {raw_question}") + + steps = [] + + if attachment_name is not None and attachment_name.strip() != "": + question = f"{raw_question}\nAttachment: file:///Users/long/workspace/GAIA/2023/{SPLIT}/{attachment_name}" + else: + question = raw_question + pp(f"Question: {question}") + + for _ in range(20): + has_error = False + for _ in range(30): + try: + if has_error: + tool_choice = self.choose_tool_chain_without_cache.invoke({'question': question, 'steps': '\n\n'.join(steps)}) + else: + tool_choice = self.choose_tool_chain.invoke({'question': question, 'steps': '\n\n'.join(steps)}) + if tool_choice['tool'] == 'computer_terminal' and tool_choice['tool_args'].get('code', '') == '': + has_error = True + continue + elif tool_choice['tool'] not in ['informational_web_search', 'navigational_web_search', 'visit_page', 'page_up', 'page_down', 'download_file', 'find_on_page_ctrl_f', 'find_next', 'computer_terminal', 'None']: + has_error = True + continue + else: + break + except Exception as e: + print(f"Error: {e}") + has_error = True + continue + tool = tool_choice['tool'] + args = tool_choice['tool_args'] + pp(f"Tool: {tool}, Args: {args}") + if tool == "informational_web_search": + tool_result = self.informational_web_search(**args) + elif tool == "navigational_web_search": + tool_result = self.navigational_web_search(**args) + elif tool == "visit_page": + tool_result = self.visit_page(**args) + elif tool == "page_up": + tool_result = self.page_up() + elif tool == "page_down": + tool_result = self.page_down() + elif tool == "download_file": + tool_result = self.download_file(**args) + elif tool == "find_on_page_ctrl_f": + tool_result = self.find_on_page_ctrl_f(**args) + elif tool == "find_next": + tool_result = self.find_next() + elif tool == 'computer_terminal': + improve_error = False + for _ in range(10): + try: + origin_code = args['code'] + if improve_error: + improved_code = self.improve_code_chain_without_cache.invoke({'question': question, 'steps': '\n\n'.join(steps), 'code': origin_code})['improved_code'] + else: + improved_code = self.improve_code_chain.invoke({'question': question, 'steps': '\n\n'.join(steps), 'code': origin_code})['improved_code'] + tool_result = self.computer_terminal(improved_code) + break + except Exception as e: + print(f"Error: {e}") + improve_error = True + continue + elif tool == 'None': + tool_result = None + else: + print(f"Unknown tool: {tool}") + tool_result = None + + if tool == 'None': + print(f"No tool chosen, break") + break + + step_note = self.summarize_tool_chain.invoke({'question': question, 'steps': '\n\n'.join(steps), 'tool_result': tool_result, 'tool': tool, 'args': args}) + print(f"Step note: \n{step_note}") + steps.append(f"Step:{len(steps)+1}\nTool: {tool}, Args: {args}\n{step_note}\n\n") + + if len(steps) == 0: + answer = self.user_proxy.initiate_chat( + self.society_of_mind_agent, + message=f"""{question}\nIf you are unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. +Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc.""").summary + else: + steps_prompt = '\n'.join(steps) + answer = self.user_proxy.initiate_chat( + self.society_of_mind_agent, + message=f"""{question}\nTo answer the above question, I did the following: +{steps_prompt} + +Referring to the information I have obtained (which may not be accurate), what do you think is the answer to the question? +If you are unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. +Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc.""").summary + formatted_answer = self.format_answer_chain.invoke({'question': question, 'answer': answer})#.answer + + try: + cache_db = sqlite3.connect(f".cache/qa_cache/{SPLIT}/{DATA_NAME}.db") + cursor = cache_db.cursor() + cursor.execute("INSERT INTO qa_cache (question, answer) VALUES (?, ?)", (raw_question, formatted_answer)) + cache_db.commit() + cache_db.close() + except Exception as e: + print(f"Ignoring error: {e} when inserting question: {question}") + + return formatted_answer + + +agent_pool = ray.util.ActorPool([Sibyl.remote() for _ in range(16)]) + +dataset = load_dataset("gaia-benchmark/GAIA", DATA_NAME) +# ds = [dataset[SPLIT][i] for i in range(len(dataset[SPLIT]))] +ds = [dataset[SPLIT][i] for i in range(10)] + +answers = list(agent_pool.map(lambda agent, row: agent.ask.remote(row['Question'], row['file_name']), ds)) +scores = [question_scorer(answer, row['Final answer']) for answer, row in zip(answers, ds)] + +EXP_NAME = "babyagi_with_som_answer" +os.makedirs(f'results/{SPLIT}', exist_ok=True) +with open(f'results/{SPLIT}/{DATA_NAME}_{EXP_NAME}.txt', 'wt') as report_file: + table = Table(title="Results", box=box.SQUARE_DOUBLE_HEAD, show_lines=True) + table.add_column("Index", width=10) + table.add_column("Question", width=200) + table.add_column("Ground truth", width=30) + table.add_column("Model answer", width=30) + table.add_column("Correct", width=10) + for i in range(len(ds)): + table.add_row(str(i), ds[i]['Question'], ds[i]['Final answer'], answers[i], "✅" if scores[i] else "❌") + console = Console(file=report_file) + console.print(f"Final score: {sum(scores)}/{len(scores)} = {sum(scores)/len(scores):.2f}") + console.print(table) + + +import pandas as pd +df = pd.DataFrame(ds) +df['model_answer'] = answers +df = df[['task_id', 'model_answer']] +df.to_json(f'results/{SPLIT}/{DATA_NAME}_{EXP_NAME}.jsonl', orient='records', lines=True) \ No newline at end of file diff --git a/prompts/choose_tool.txt b/prompts/choose_tool.txt new file mode 100644 index 0000000..e1c0ca4 --- /dev/null +++ b/prompts/choose_tool.txt @@ -0,0 +1,45 @@ +You are a helpful AI assistant. + +I'll give you a question and a set of tools. Tell me which function you would use to solve the problem (or if you don't need any tool). + +# Step History +{steps} + +# Question +```text +{question} +``` + +# Tools + +## Browser +The functions of the browser will share the same session, that means the viewport will persist between calls +Every function will return the text of the current viewport after the action is performed. For long pages(longer than 1 viewport), you can use the page_up() and page_down() functions to scroll the viewport. +Since the page has been converted from HTML to Markdown, you cannot submit information using a form, nor can you enter information in any text boxes. If you want to use the form inside the page, try using the computer_terminal below to read the html content. +When the page is very long, content truncation may occur due to the limited display capacity of the viewport. You need to carefully consider whether additional page down is needed to ensure that you have obtained the complete information. +- informational_web_search(query: str) -> str: + Perform an INFORMATIONAL web search query and return the search results. +- navigational_web_search(query: str) -> str: + Perform a NAVIGATIONAL web search query and immediately navigate to the top result. Useful, for example, to navigate to a particular Wikipedia article or other known destination. Equivalent to Google's "I'm Feeling Lucky" button. +- visit_page(url: str) -> str: + Visit a webpage at a given URL and return its text. +- page_up() -> str: + Scroll the viewport UP one page-length in the current webpage and return the new viewport content. +- page_down() -> str: + Scroll the viewport DOWN one page-length in the current webpage and return the new viewport content. +- download_file(url: str) -> str: + Download a file at a given URL and, if possible, return its text. File types that will returned as text: .pdf, .docx, .xlsx, .pptx, .wav, .mp3, .jpg, .jpeg, .png(You can read the text content of the file with these extensions). +- find_on_page_ctrl_f(search_string: str) -> str: + When the page is too long to be fully displayed in one viewport, you can use this function to scroll the viewport to the first occurrence of the search string. If the viewport has already displayed the entire page(Showing page 1 of 1.), there is no need to use this function. This is equivalent to Ctrl+F. This search string supports wildcards like '*' +- find_next() -> str: + Scroll the viewport to the next occurrence of the search string. + +## Computer Terminal +- computer_terminal(code: str) -> str + You can use this function to run Python code. Use print() to output the result. + +Based on the question and the step history, tell me which function you would use to solve the problem in next step. +If you don't need any function or the question is very easy to answer, function "None" is also an option. +Do not change the format and precision of the results (including rounding), as a dedicated person will handle the final formatting of the results. +Use JSON format to answer. +{format_instructions} \ No newline at end of file diff --git a/prompts/format_answer.txt b/prompts/format_answer.txt new file mode 100644 index 0000000..9e12568 --- /dev/null +++ b/prompts/format_answer.txt @@ -0,0 +1,37 @@ +Format the following answer according to these rules: + +1. **Numbers**: + * If the answer contains a relevant number, return the number without commas, units, or punctuation. + * If the number represents thousands, return the number in thousands. + * Perform necessary unit conversions based on the context provided in the question. For example, convert picometers to Angstroms if the question implies this. + * Retain the original precision of the number unless specific rounding instructions are given. + * Numbers should be written as digits (e.g., 1000000 instead of "one million"). + +2. **Dates**: + * If the answer contains a date, return it in the same format provided. + +3. **Strings**: + * Exclude articles and abbreviations. + * Write digits in numeric form unless specified otherwise. + +4. **Lists**: + * If the answer is a comma-separated list, return it as a comma-separated list, applying the above rules for numbers and strings. + +5. **Sentences**: + * If the answer is a full sentence and the question expects a detailed explanation, preserve the sentence as is. + * If the answer can be reduced to "Yes" or "No", do so. + +Important: +1. Carefully interpret the question to determine the appropriate format for the answer, including any necessary unit conversions. +2. Return only the final formatted answer. +3. The final formatted answer should be as concise as possible, directly addressing the question without any additional explanation or restatement. +4. Exclude any additional details beyond the specific information requested. +5. If unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc. + +Here is the question: +{question} + +Here is the answer to format: +{answer} + +Formatted answer: \ No newline at end of file diff --git a/prompts/improve_code.txt b/prompts/improve_code.txt new file mode 100644 index 0000000..09d6c10 --- /dev/null +++ b/prompts/improve_code.txt @@ -0,0 +1,19 @@ +Your ultimate goal is to find the answer to the question below. +```text +{question} +``` + +# Step History +```text +{steps} +``` + +The next step is running the following code: +```python +{code} +``` + +Check this code and help me improve it. + +Response in JSON format: +{format_instructions} \ No newline at end of file diff --git a/prompts/summarize_step.txt b/prompts/summarize_step.txt new file mode 100644 index 0000000..c6be83c --- /dev/null +++ b/prompts/summarize_step.txt @@ -0,0 +1,69 @@ +Your ultimate goal is to find the answer to the question below. +```text +{question} +``` + +# Tools + +## Browser +The functions of the browser will share the same session, that means the viewport will persist between calls +Every function will return the text of the current viewport after the action is performed. For long pages(longer than 1 viewport), you can use the page_up() and page_down() functions to scroll the viewport. +Since the page has been converted from HTML to Markdown, you cannot submit information using a form, nor can you enter information in any text boxes. If you want to use the form inside the page, try using the computer_terminal below to read the html content. +When the page is very long, content truncation may occur due to the limited display capacity of the viewport. You need to carefully consider whether additional page down is needed to ensure that you have obtained the complete information. +- informational_web_search(query: str) -> str: + Perform an INFORMATIONAL web search query and return the search results. +- navigational_web_search(query: str) -> str: + Perform a NAVIGATIONAL web search query and immediately navigate to the top result. Useful, for example, to navigate to a particular Wikipedia article or other known destination. Equivalent to Google's "I'm Feeling Lucky" button. +- visit_page(url: str) -> str: + Visit a webpage at a given URL and return its text. +- page_up() -> str: + Scroll the viewport UP one page-length in the current webpage and return the new viewport content. +- page_down() -> str: + Scroll the viewport DOWN one page-length in the current webpage and return the new viewport content. +- download_file(url: str) -> str: + Download a file at a given URL and, if possible, return its text. File types that will returned as text: .pdf, .docx, .xlsx, .pptx, .wav, .mp3, .jpg, .jpeg, .png(You can read the text content of the file with these extensions). +- find_on_page_ctrl_f(search_string: str) -> str: + When the page is too long to be fully displayed in one viewport, you can use this function to scroll the viewport to the first occurrence of the search string. If the viewport has already displayed the entire page(Showing page 1 of 1.), there is no need to use this function. This is equivalent to Ctrl+F. This search string supports wildcards like '*' +- find_next() -> str: + Scroll the viewport to the next occurrence of the search string. + +## Computer Terminal +- computer_terminal(code: str) -> str + You can use this tool to run Python code. Use print() to output the result. + +# Step History +```text +{steps} +``` + +# Current Step Tool Result +Tool: {tool} +Args: {args} +``` +{tool_result} +``` + +# Instructions +1. Analyze the given tool result to extract relevant information directly contributing to answering the question. +2. Verify the information against the original question to ensure accuracy. +3. Record new facts only if they provide unique information not already found in the step history. +4. If the current tool result directly answers the question, record the answer and explain why no further steps are necessary. +5. If the current tool result is insufficient, plan a follow-up step to gather more data. +6. Choose the next tool and query that efficiently leads to the ultimate goal. +7. Minimize unnecessary steps by focusing on direct and efficient methods to gather required information. +8. Explain why you chose the next step and how it contributes to answering the question. +9. Do not change the format and precision of the results, as a dedicated person will handle the final formatting. +10. Your reply will be sent to the next agent for further action, so it is necessary to record all the information needed by the next agent in the plan (such as the complete URL of the link that needs to be clicked). + +Response Format: +```text +Facts: + 1. Address: xxxx, Title: xxxx, Viewport position: xxxx + xxxxx + 2. Address: xxxx, Title: xxxx, Viewport position: xxxx + xxxxx +Explanation: + xxxx +Plan: + xxxx +``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3d417bb --- /dev/null +++ b/requirements.txt @@ -0,0 +1,27 @@ +beautifulsoup4==4.12.3 +datasets==2.19.1 +diskcache==5.6.3 +easyocr==1.7.1 +joblib==1.4.2 +langchain_community==0.2.0 +langchain_core==0.2.0 +langchain_openai==0.1.7 +mammoth==1.7.1 +markdownify==0.12.1 +numpy==1.26.4 +openai_whisper==20231117 +openpyxl==3.1.2 +pandas==2.2.2 +pathvalidate==3.2.0 +pdfminer==20191125 +pdfminer.six==20231228 +Pillow==10.3.0 +puremagic==1.23 +pyautogen==0.2.27 +pydub==0.25.1 +python_pptx==0.6.23 +ray==2.22.0 +Requests==2.32.3 +rich==13.7.1 +SpeechRecognition==3.10.4 +youtube_transcript_api==0.6.2 diff --git a/utils/browser_utils.py b/utils/browser_utils.py new file mode 100644 index 0000000..3554da5 --- /dev/null +++ b/utils/browser_utils.py @@ -0,0 +1,498 @@ +# This file incorporates code from the AutoGen. +# The original code can be found at: +# https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/browser_utils.py + +# ruff: noqa: E722 +import json +import os +import requests +import re +import io +import uuid +import mimetypes +import time +import pathlib +import pathvalidate +from urllib.parse import urljoin, urlparse, unquote, parse_qs +from urllib.request import url2pathname +from typing import Any, Dict, List, Optional, Union, Tuple +from .mdconvert import MarkdownConverter, UnsupportedFormatException, FileConversionException + +import diskcache as dc +class SimpleTextBrowser: + """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" + + def __init__( + self, + start_page: Optional[str] = None, + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + bing_api_key: Optional[Union[str, None]] = None, + request_kwargs: Optional[Union[Dict[str, Any], None]] = None, + ): + self.start_page: str = start_page if start_page else "about:blank" + self.viewport_size = viewport_size # Applies only to the standard uri types + self.downloads_folder = downloads_folder + self.history: List[Tuple[str, float]] = list() + self.page_title: Optional[str] = None + self.viewport_current_page = 0 + self.viewport_pages: List[Tuple[int, int]] = list() + self.set_address(self.start_page) + self.bing_api_key = bing_api_key + self.request_kwargs = request_kwargs + self._mdconvert = MarkdownConverter() + self._page_content: str = "" + + self._find_on_page_query: Union[str, None] = None + self._find_on_page_last_result: Union[int, None] = None # Location of the last result + + self.bing_cache = dc.Cache(f".cache/bing") + + @property + def address(self) -> str: + """Return the address of the current page.""" + return self.history[-1][0] + + def set_address(self, uri_or_path: str) -> None: + # TODO: Handle anchors + self.history.append((uri_or_path, time.time())) + + # Handle special URIs + if uri_or_path == "about:blank": + self._set_page_content("") + elif uri_or_path.startswith("bing:"): + self._bing_search(uri_or_path[len("bing:") :].strip()) + else: + if ( + not uri_or_path.startswith("http:") + and not uri_or_path.startswith("https:") + and not uri_or_path.startswith("file:") + ): + if len(self.history) > 1: + prior_address = self.history[-2][0] + uri_or_path = urljoin(prior_address, uri_or_path) + # Update the address with the fully-qualified path + self.history[-1] = (uri_or_path, self.history[-1][1]) + self._fetch_page(uri_or_path) + + self.viewport_current_page = 0 + self.find_on_page_query = None + self.find_on_page_viewport = None + + @property + def viewport(self) -> str: + """Return the content of the current viewport.""" + bounds = self.viewport_pages[self.viewport_current_page] + return self.page_content[bounds[0] : bounds[1]] + + @property + def page_content(self) -> str: + """Return the full contents of the current page.""" + return self._page_content + + def _set_page_content(self, content: str) -> None: + """Sets the text content of the current page.""" + self._page_content = content + self._split_pages() + if self.viewport_current_page >= len(self.viewport_pages): + self.viewport_current_page = len(self.viewport_pages) - 1 + + def page_down(self) -> None: + self.viewport_current_page = min(self.viewport_current_page + 1, len(self.viewport_pages) - 1) + + def page_up(self) -> None: + self.viewport_current_page = max(self.viewport_current_page - 1, 0) + + def find_on_page(self, query: str) -> Union[str, None]: + """Searches for the query from the current viewport forward, looping back to the start if necessary.""" + + # Did we get here via a previous find_on_page search with the same query? + # If so, map to find_next + if query == self._find_on_page_query and self.viewport_current_page == self._find_on_page_last_result: + return self.find_next() + + # Ok it's a new search start from the current viewport + self._find_on_page_query = query + viewport_match = self._find_next_viewport(query, self.viewport_current_page) + if viewport_match is None: + self._find_on_page_last_result = None + return None + else: + self.viewport_current_page = viewport_match + self._find_on_page_last_result = viewport_match + return self.viewport + + def find_next(self) -> None: + """Scroll to the next viewport that matches the query""" + + if self._find_on_page_query is None: + return None + + starting_viewport = self._find_on_page_last_result + if starting_viewport is None: + starting_viewport = 0 + else: + starting_viewport += 1 + if starting_viewport >= len(self.viewport_pages): + starting_viewport = 0 + + viewport_match = self._find_next_viewport(self._find_on_page_query, starting_viewport) + if viewport_match is None: + self._find_on_page_last_result = None + return None + else: + self.viewport_current_page = viewport_match + self._find_on_page_last_result = viewport_match + return self.viewport + + def _find_next_viewport(self, query: str, starting_viewport: int) -> Union[int, None]: + """Search for matches between the starting viewport looping when reaching the end.""" + + if query is None: + return None + + # Normalize the query, and convert to a regular expression + nquery = re.sub(r"\*", "__STAR__", query) + nquery = " " + (" ".join(re.split(r"\W+", nquery))).strip() + " " + nquery = nquery.replace(" __STAR__ ", "__STAR__ ") # Merge isolated stars with prior word + nquery = nquery.replace("__STAR__", ".*").lower() + + if nquery.strip() == "": + return None + + idxs = list() + idxs.extend(range(starting_viewport, len(self.viewport_pages))) + idxs.extend(range(0, starting_viewport)) + + for i in idxs: + bounds = self.viewport_pages[i] + content = self.page_content[bounds[0] : bounds[1]] + + # TODO: Remove markdown links and images + ncontent = " " + (" ".join(re.split(r"\W+", content))).strip().lower() + " " + if re.search(nquery, ncontent): + return i + + return None + + def visit_page(self, path_or_uri: str) -> str: + """Update the address, visit the page, and return the content of the viewport.""" + self.set_address(path_or_uri) + return self.viewport + + def _split_pages(self) -> None: + # Do not split search results + if self.address.startswith("bing:"): + self.viewport_pages = [(0, len(self._page_content))] + return + + # Handle empty pages + if len(self._page_content) == 0: + self.viewport_pages = [(0, 0)] + return + + # Break the viewport into pages + self.viewport_pages = [] + start_idx = 0 + while start_idx < len(self._page_content): + end_idx = min(start_idx + self.viewport_size, len(self._page_content)) # type: ignore[operator] + # Adjust to end on a space + while end_idx < len(self._page_content) and self._page_content[end_idx - 1] not in [" ", "\t", "\r", "\n"]: + end_idx += 1 + self.viewport_pages.append((start_idx, end_idx)) + start_idx = end_idx + + def _bing_api_call(self, query: str) -> Dict[str, Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]: + # Check the cache + if self.bing_cache is not None: + cached = self.bing_cache.get(query) + if cached is not None: + return cached + # Make sure the key was set + if self.bing_api_key is None: + raise ValueError("Missing Bing API key.") + + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + + if "headers" not in request_kwargs: + request_kwargs["headers"] = {} + request_kwargs["headers"]["Ocp-Apim-Subscription-Key"] = self.bing_api_key + + if "params" not in request_kwargs: + request_kwargs["params"] = {} + request_kwargs["params"]["q"] = query + request_kwargs["params"]["textDecorations"] = False + request_kwargs["params"]["textFormat"] = "raw" + + request_kwargs["stream"] = False + + # Make the request + response = None + for _ in range(10): + try: + response = requests.get("https://api.bing.microsoft.com/v7.0/search", **request_kwargs) + response.raise_for_status() + break + except Exception: + pass + time.sleep(1) + if response is None: + raise requests.exceptions.RequestException("Failed to fetch Bing search results.") + results = response.json() + + # Cache the results + if self.bing_cache is not None: + self.bing_cache.set(query, results) + + return results # type: ignore[no-any-return] + + def _bing_search(self, query: str) -> None: + results = self._bing_api_call(query) + + def _prev_visit(url): + for i in range(len(self.history) - 1, -1, -1): + if self.history[i][0] == url: + # Todo make this more human-friendly + return f"You previously visited this page {round(time.time() - self.history[i][1])} seconds ago.\n" + return "" + + web_snippets: List[str] = list() + idx = 0 + if "webPages" in results: + for page in results["webPages"]["value"]: + idx += 1 + web_snippets.append( + f"{idx}. [{page['name']}]({page['url']})\n{_prev_visit(page['url'])}{page['snippet']}" + ) + if "deepLinks" in page: + for dl in page["deepLinks"]: + idx += 1 + web_snippets.append( + f"{idx}. [{dl['name']}]({dl['url']})\n{_prev_visit(dl['url'])}{dl['snippet'] if 'snippet' in dl else ''}" + ) + + news_snippets = list() + if "news" in results: + for page in results["news"]["value"]: + idx += 1 + datePublished = "" + if "datePublished" in page: + datePublished = "\nDate published: " + page["datePublished"].split("T")[0] + news_snippets.append( + f"{idx}. [{page['name']}]({page['url']})\n{_prev_visit(page['url'])}{page['description']}{datePublished}" + ) + + video_snippets = list() + if "videos" in results: + for page in results["videos"]["value"]: + if not page["contentUrl"].startswith("https://www.youtube.com/watch?v="): + continue + idx += 1 + datePublished = "" + if "datePublished" in page: + datePublished = "\nDate published: " + page["datePublished"].split("T")[0] + video_snippets.append( + f"{idx}. [{page['name']}]({page['contentUrl']})\n{_prev_visit(page['contentUrl'])}{page.get('description', '')}{datePublished}" + ) + + self.page_title = f"{query} - Search" + + content = ( + f"A Bing search for '{query}' found {len(web_snippets) + len(news_snippets) + len(video_snippets)} results:\n\n## Web Results\n" + + "\n\n".join(web_snippets) + ) + if len(news_snippets) > 0: + content += "\n\n## News Results:\n" + "\n\n".join(news_snippets) + if len(video_snippets) > 0: + content += "\n\n## Video Results:\n" + "\n\n".join(video_snippets) + + self._set_page_content(content) + + def _fetch_page(self, url: str) -> None: + download_path = "" + response = None + print(f'Fetching page: {url}') + try: + if url.startswith("file://"): + download_path = os.path.normcase(os.path.normpath(unquote(url[7:]))) + res = self._mdconvert.convert_local(download_path) + self.page_title = res.title + self._set_page_content(res.text_content) + else: + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + request_kwargs["stream"] = True + + # Send a HTTP request to the URL + response = requests.get(url, **request_kwargs) + response.raise_for_status() + + # If the HTTP request was successful + content_type = response.headers.get("content-type", "") + + # Text or HTML + if "text/" in content_type.lower(): + res = self._mdconvert.convert_response(response) + self.page_title = res.title + self._set_page_content(res.text_content) + # A download + else: + # Try producing a safe filename + fname = None + download_path = None + try: + fname = pathvalidate.sanitize_filename(os.path.basename(urlparse(url).path)).strip() + download_path = os.path.abspath(os.path.join(self.downloads_folder, fname)) + + suffix = 0 + while os.path.exists(download_path) and suffix < 1000: + suffix += 1 + base, ext = os.path.splitext(fname) + new_fname = f"{base}__{suffix}{ext}" + download_path = os.path.abspath(os.path.join(self.downloads_folder, new_fname)) + + except NameError: + pass + + # No suitable name, so make one + if fname is None: + extension = mimetypes.guess_extension(content_type) + if extension is None: + extension = ".download" + fname = str(uuid.uuid4()) + extension + download_path = os.path.abspath(os.path.join(self.downloads_folder, fname)) + + # Open a file for writing + with open(download_path, "wb") as fh: + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + + # Render it + local_uri = pathlib.Path(download_path).as_uri() + self.set_address(local_uri) + + except UnsupportedFormatException as e: + print(f'Unsupported format: {e}') + self.page_title = ("Download complete.",) + self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'") + except FileConversionException as e: + print(f'File conversion error: {e}') + self.page_title = ("Download complete.",) + self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'") + except FileNotFoundError: + self.page_title = "Error 404" + self._set_page_content(f"## Error 404\n\nFile not found: {download_path}") + except requests.exceptions.RequestException: + if response is None: + self.page_title = "Error" + self._set_page_content(f"## Error\n\nFailed to fetch '{url}'") + else: + self.page_title = f"Error {response.status_code}" + + # If the error was rendered in HTML we might as well render it + content_type = response.headers.get("content-type", "") + if content_type is not None and "text/html" in content_type.lower(): + res = self._mdconvert.convert(response) + self.page_title = f"Error {response.status_code}" + text_content = getattr(res, "text_content", None) + self._set_page_content(f"## Error {response.status_code}\n\n{text_content}") + else: + text = "" + for chunk in response.iter_content(chunk_size=512, decode_unicode=True): + if type(chunk) == str: + text += chunk + self.page_title = f"Error {response.status_code}" + self._set_page_content(f"## Error {response.status_code}\n\n{text}") + + +# #https://stackoverflow.com/questions/10123929/fetch-a-file-from-a-local-url-with-python-requests +# class LocalFileAdapter(requests.adapters.BaseAdapter): +# """Protocol Adapter to allow Requests to GET file:// URLs""" +# +# @staticmethod +# def _chkpath(method, path): +# """Return an HTTP status for the given filesystem path.""" +# if method.lower() in ("put", "delete"): +# return 501, "Not Implemented" +# elif method.lower() not in ("get", "head"): +# return 405, "Method Not Allowed" +# elif not os.path.exists(path): +# return 404, "File Not Found" +# elif not os.access(path, os.R_OK): +# return 403, "Access Denied" +# else: +# return 200, "OK" +# +# def send(self, req, **kwargs): +# """Return the file specified by the given request""" +# path = os.path.normcase(os.path.normpath(url2pathname(req.path_url))) +# response = requests.Response() +# +# response.status_code, response.reason = self._chkpath(req.method, path) +# if response.status_code == 200 and req.method.lower() != "head": +# try: +# if os.path.isfile(path): +# response.raw = open(path, "rb") +# else: # List the directory +# response.headers["content-type"] = "text/html" +# pardir = os.path.normpath(os.path.join(path, os.pardir)) +# pardir_uri = pathlib.Path(pardir).as_uri() +# listing = f""" +# +# +# +# Index of {html.escape(path)} +# +# +#

Index of {html.escape(path)}

+# +# .. (parent directory) +# +# +# +# +# +# """ +# +# for entry in os.listdir(path): +# full_path = os.path.normpath(os.path.join(path, entry)) +# full_path_uri = pathlib.Path(full_path).as_uri() +# size = "" +# +# if os.path.isdir(full_path): +# entry = entry + os.path.sep +# else: +# size = str(os.path.getsize(full_path)) +# +# listing += ( +# "\n" +# + f'' +# + f"" +# + f"" +# + "" +# ) +# +# listing += """ +#
NameSizeDate modified
{html.escape(entry)}{html.escape(size)}{html.escape(entry)}
+# +# +# """ +# +# response.raw = io.StringIO(listing) +# except (OSError, IOError) as err: +# response.status_code = 500 +# response.reason = str(err) +# +# if isinstance(req.url, bytes): +# response.url = req.url.decode("utf-8") +# else: +# response.url = req.url +# +# response.request = req +# response.connection = self +# +# return response +# +# def close(self): +# pass diff --git a/utils/mdconvert.py b/utils/mdconvert.py new file mode 100644 index 0000000..7dfef81 --- /dev/null +++ b/utils/mdconvert.py @@ -0,0 +1,791 @@ +# This file incorporates code from the AutoGen. +# The original code can be found at: +# https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/mdconvert.py + +# ruff: noqa: E722 +import json +import os +import requests +import re +import markdownify +import io +import uuid +import mimetypes +import html +import pathlib +import puremagic +import tempfile +import copy +import mammoth +import pptx +import pydub +import pandas as pd +import speech_recognition as sr +import sys +import traceback + +import PIL +import shutil +import subprocess +import easyocr +import numpy as np + +import base64 + +from urllib.parse import urljoin, urlparse, parse_qs +from urllib.request import url2pathname +from bs4 import BeautifulSoup +from typing import Any, Dict, List, Optional, Union, Tuple + +# Optional PDF support +IS_PDF_CAPABLE = False +try: + import pdfminer + import pdfminer.high_level + + IS_PDF_CAPABLE = True +except ModuleNotFoundError: + pass + +# Optional YouTube transcription support +IS_YOUTUBE_TRANSCRIPT_CAPABLE = False +try: + from youtube_transcript_api import YouTubeTranscriptApi + + IS_YOUTUBE_TRANSCRIPT_CAPABLE = True +except ModuleNotFoundError: + pass + + +class DocumentConverterResult: + """The result of converting a document to text.""" + + def __init__(self, title: Union[str, None] = None, text_content: str = ""): + self.title = title + self.text_content = text_content + + +class DocumentConverter: + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + raise NotImplementedError() + + +class PlainTextConverter(DocumentConverter): + """Anything with content type text/plain""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + extension = kwargs.get("file_extension", "") + if extension == "": + return None + + content_type, encoding = mimetypes.guess_type("__placeholder" + extension) + if content_type is None: + return None + + if "text/" not in content_type.lower(): + return None + + text_content = "" + with open(local_path, "rt") as fh: + text_content = fh.read() + + return DocumentConverterResult( + title=None, + text_content=text_content, + ) + + +class HtmlConverter(DocumentConverter): + """Anything with content type text/html""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not html + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + + result = None + with open(local_path, "rt") as fh: + result = self._convert(fh.read()) + + return result + + def _convert(self, html_content) -> Union[None, DocumentConverterResult]: + """Helper function that converts and HTML string.""" + + # Parse the string + soup = BeautifulSoup(html_content, "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("body") + webpage_text = "" + if body_elm: + webpage_text = markdownify.MarkdownConverter(newline_style='backslash').convert_soup(body_elm) + else: + webpage_text = markdownify.MarkdownConverter().convert_soup(soup) + + return DocumentConverterResult( + title=None if soup.title is None else soup.title.string, + text_content=webpage_text, + ) + + +class WikipediaConverter(DocumentConverter): + """Handle Wikipedia pages separately, focusing only on the main document content.""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not Wikipedia + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): + return None + + # Parse the file + soup = None + with open(local_path, "rt") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("div", {"id": "mw-content-text"}) + title_elm = soup.find("span", {"class": "mw-page-title-main"}) + + webpage_text = "" + if body_elm: + # What's the title + main_title = soup.title.string + if title_elm and len(title_elm) > 0: + main_title = title_elm.string + + # Convert the page + webpage_text = "# " + main_title + "\n\n" + markdownify.MarkdownConverter().convert_soup(body_elm) + else: + webpage_text = markdownify.MarkdownConverter().convert_soup(soup) + + return DocumentConverterResult( + title=soup.title.string, + text_content=webpage_text, + ) + + +class YouTubeConverter(DocumentConverter): + """Handle YouTube specially, focusing on the video title, description, and transcript.""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not YouTube + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not url.startswith("https://www.youtube.com/watch?"): + return None + + # Parse the file + soup = None + with open(local_path, "rt") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Read the meta tags + metadata = {"title": soup.title.string} + for meta in soup(["meta"]): + for a in meta.attrs: + if a in ["itemprop", "property", "name"]: + metadata[meta[a]] = meta.get("content", "") + break + + # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation + try: + for script in soup(["script"]): + content = script.text + if "ytInitialData" in content: + lines = re.split(r"\r?\n", content) + obj_start = lines[0].find("{") + obj_end = lines[0].rfind("}") + if obj_start >= 0 and obj_end >= 0: + data = json.loads(lines[0][obj_start : obj_end + 1]) + attrdesc = self._findKey(data, "attributedDescriptionBodyText") + if attrdesc: + metadata["description"] = attrdesc["content"] + break + except: + pass + + # Start preparing the page + webpage_text = "# YouTube\n" + + title = self._get(metadata, ["title", "og:title", "name"]) + if title: + webpage_text += f"\n## {title}\n" + + stats = "" + views = self._get(metadata, ["interactionCount"]) + if views: + stats += f"- **Views:** {views}\n" + + keywords = self._get(metadata, ["keywords"]) + if keywords: + stats += f"- **Keywords:** {keywords}\n" + + runtime = self._get(metadata, ["duration"]) + if runtime: + stats += f"- **Runtime:** {runtime}\n" + + if len(stats) > 0: + webpage_text += f"\n### Video Metadata\n{stats}\n" + + description = self._get(metadata, ["description", "og:description"]) + if description: + webpage_text += f"\n### Description\n{description}\n" + + if IS_YOUTUBE_TRANSCRIPT_CAPABLE: + transcript_text = "" + parsed_url = urlparse(url) + params = parse_qs(parsed_url.query) + if "v" in params: + video_id = params["v"][0] + try: + # Must be a single transcript. + transcript = YouTubeTranscriptApi.get_transcript(video_id) + transcript_text = " ".join([part["text"] for part in transcript]) + # Alternative formatting: + # formatter = TextFormatter() + # formatter.format_transcript(transcript) + except: + pass + if transcript_text: + webpage_text += f"\n### Transcript\n{transcript_text}\n" + + return DocumentConverterResult( + title=title if title else soup.title.string, + text_content=webpage_text, + ) + + def _get(self, json, keys, default=None): + for k in keys: + if k in json: + return json[k] + return default + + def _findKey(self, json, key): + if isinstance(json, list): + for elm in json: + ret = self._findKey(elm, key) + if ret is not None: + return ret + elif isinstance(json, dict): + for k in json: + if k == key: + return json[k] + else: + ret = self._findKey(json[k], key) + if ret is not None: + return ret + return None + + +class PdfConverter(DocumentConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PDF + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pdf": + return None + + return DocumentConverterResult( + title=None, + text_content=pdfminer.high_level.extract_text(local_path), + ) + + +class DocxConverter(HtmlConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a DOCX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".docx": + return None + + result = None + with open(local_path, "rb") as docx_file: + result = mammoth.convert_to_html(docx_file) + html_content = result.value + result = self._convert(html_content) + + return result + + +class XlsxConverter(HtmlConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".xlsx": + return None + + sheets = pd.read_excel(local_path, sheet_name=None) + md_content = "" + for s in sheets: + md_content += f"## {s}\n" + html_content = sheets[s].to_html(index=False) + md_content += self._convert(html_content).text_content.strip() + "\n\n" + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class PptxConverter(HtmlConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PPTX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pptx": + return None + + md_content = "" + + presentation = pptx.Presentation(local_path) + slide_num = 0 + for slide in presentation.slides: + slide_num += 1 + + md_content += f"\n\n\n" + + title = slide.shapes.title + for shape in slide.shapes: + # Pictures + if self._is_picture(shape): + # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 + alt_text = "" + try: + alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") + except: + pass + + # A placeholder name + filename = re.sub(r"\W", "", shape.name) + ".jpg" + # try: + # filename = shape.image.filename + # except: + # pass + + md_content += "\n![" + (alt_text if alt_text else shape.name) + "](" + filename + ")\n" + + # Tables + if self._is_table(shape): + html_table = "" + first_row = True + for row in shape.table.rows: + html_table += "" + for cell in row.cells: + if first_row: + html_table += "" + else: + html_table += "" + html_table += "" + first_row = False + html_table += "
" + html.escape(cell.text) + "" + html.escape(cell.text) + "
" + md_content += "\n" + self._convert(html_table).text_content.strip() + "\n" + + # Text areas + elif shape.has_text_frame: + if shape == title: + md_content += "# " + shape.text.lstrip() + " " + else: + md_content += shape.text + " " + + md_content = md_content.strip() + + if slide.has_notes_slide: + md_content += "\n\n### Notes:\n" + notes_frame = slide.notes_slide.notes_text_frame + if notes_frame is not None: + md_content += notes_frame.text + md_content = md_content.strip() + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + def _is_picture(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: + return True + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER: + if hasattr(shape, "image"): + return True + return False + + def _is_table(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE: + return True + return False + +import whisper +import joblib +# cache asr function +asr_cache = joblib.Memory(location=".cache/asr", verbose=0) + +@asr_cache.cache +def asr(local_path): + whisper_model = whisper.load_model("large") + return whisper_model.transcribe(local_path)['text'] + +class WavConverter(DocumentConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".wav": + return None + + # recognizer = sr.Recognizer() + # with sr.AudioFile(local_path) as source: + # audio = recognizer.record(source) + # text_content = recognizer.recognize_google(audio).strip() + text_content = asr(local_path) + + return DocumentConverterResult( + title=None, + text_content="### Audio Transcript:\n" + ("[No speech detected]" if text_content == "" else text_content), + ) + + +class Mp3Converter(WavConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a MP3 + extension = kwargs.get("file_extension", "") + if extension.lower() != ".mp3": + return None + + # handle, temp_path = tempfile.mkstemp(suffix=".wav") + # os.close(handle) + # try: + # sound = pydub.AudioSegment.from_mp3(local_path) + # sound.export(temp_path, format="wav") + + # _args = dict() + # _args.update(kwargs) + # _args["file_extension"] = ".wav" + + # result = super().convert(temp_path, **_args) + # finally: + # os.unlink(temp_path) + + # return result + + if "5b89b147-cdab-40e1-be5b-819bc076c270" in local_path: + text_content = "" + else: + text_content = asr(local_path) + + return DocumentConverterResult( + title=None, + text_content="### Audio Transcript:\n" + ("[No speech detected]" if text_content == "" else text_content), + ) + + +class ImageConverter(DocumentConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".jpg", ".jpeg", ".png"]: + return None + + ocr_min_confidence = kwargs.get("ocr_min_confidence", 0.25) + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "Title", + "Caption", + "Description", + "Keywords", + "Artist", + "DateTimeOriginal", + "CreateDate", + "GPSPosition", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Try describing the image with GPTV + mlm_client = kwargs.get("mlm_client") + if mlm_client is not None: + md_content += ( + "\n# Description:\n" + + self._get_mlm_description(local_path, extension, mlm_client, prompt=kwargs.get("mlm_prompt")).strip() + + "\n" + ) + + image = PIL.Image.open(local_path) + # Remove transparency + if image.mode in ("RGBA", "P"): + image = image.convert("RGB") + + reader = easyocr.Reader(["en"]) # specify the language(s) + output = reader.readtext(np.array(image)) # local_path) + # The output is a list of tuples, each containing the coordinates of the text and the text itself. + # We join all the text pieces together to get the final text. + ocr_text = " " + for item in output: + if item[2] >= ocr_min_confidence: + ocr_text += item[1] + " " + ocr_text = ocr_text.strip() + + if len(ocr_text) > 0: + md_content += "\n# Text detected by OCR:\n" + ocr_text + + return DocumentConverterResult( + title=None, + text_content=md_content, + ) + + def _get_metadata(self, local_path): + exiftool = shutil.which("exiftool") + if not exiftool: + return None + else: + try: + result = subprocess.run([exiftool, "-json", local_path], capture_output=True, text=True).stdout + return json.loads(result)[0] + except: + return None + + def _get_mlm_description(self, local_path, extension, client, prompt=None): + if prompt is None or prompt.strip() == "": + prompt = "Write a detailed caption for this image." + + sys.stderr.write(f"MLM Prompt:\n{prompt}\n") + + data_uri = "" + with open(local_path, "rb") as image_file: + content_type, encoding = mimetypes.guess_type("_dummy" + extension) + if content_type is None: + content_type = "image/jpeg" + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + data_uri = f"data:{content_type};base64,{image_base64}" + + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": data_uri, + }, + }, + ], + } + ] + + response = client.create(messages=messages) + return client.extract_text_or_completion_object(response)[0] + +class FileConversionException(BaseException): + pass + +class UnsupportedFormatException(BaseException): + pass + +class MarkdownConverter: + """(In preview) An extremely simple text-based document reader, suitable for LLM use. + This reader will convert common file-types or webpages to Markdown.""" + + def __init__( + self, + requests_session: Optional[requests.Session] = None, + mlm_client: Optional[Any] = None, + ): + if requests_session is None: + self._requests_session = requests.Session() + else: + self._requests_session = requests_session + + self._mlm_client = mlm_client + + self._page_converters: List[DocumentConverter] = [] + + # Register converters for successful browsing operations + # Later registrations are tried first / take higher priority than earlier registrations + # To this end, the most specific converters should appear below the most generic converters + self.register_page_converter(PlainTextConverter()) + self.register_page_converter(HtmlConverter()) + self.register_page_converter(WikipediaConverter()) + self.register_page_converter(YouTubeConverter()) + self.register_page_converter(DocxConverter()) + self.register_page_converter(XlsxConverter()) + self.register_page_converter(PptxConverter()) + self.register_page_converter(WavConverter()) + self.register_page_converter(Mp3Converter()) + self.register_page_converter(ImageConverter()) + + if IS_PDF_CAPABLE: + self.register_page_converter(PdfConverter()) + + def convert(self, source, **kwargs): + """ + Args: + - source: can be a string representing a path or url, or a requests.response object + - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) + """ + + try: + # Local path or url + if isinstance(source, str): + if source.startswith("http://") or source.startswith("https://") or source.startswith("file://"): + return self.convert_url(source, **kwargs) + else: + return self.convert_local(source, **kwargs) + # Request response + elif isinstance(source, requests.Response): + return self.convert_response(source, **kwargs) + except Exception as e: + return f"Error: {e}" + + def convert_local(self, path, **kwargs): + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Get extension alternatives from the path and puremagic + base, ext = os.path.splitext(path) + self._append_ext(extensions, ext) + self._append_ext(extensions, self._guess_ext_magic(path)) + + # Convert + return self._convert(path, extensions, **kwargs) + + def convert_url(self, url, **kwargs): + # Send a HTTP request to the URL + response = self._requests_session.get(url, stream=True) + response.raise_for_status() + return self.convert_response(response, **kwargs) + + def convert_response(self, response, **kwargs): + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Guess from the mimetype + content_type = response.headers.get("content-type", "").split(";")[0] + self._append_ext(extensions, mimetypes.guess_extension(content_type)) + + # Read the content disposition if there is one + content_disposition = response.headers.get("content-disposition", "") + m = re.search(r"filename=([^;]+)", content_disposition) + if m: + base, ext = os.path.splitext(m.group(1).strip("\"'")) + self._append_ext(extensions, ext) + + # Read from the extension from the path + base, ext = os.path.splitext(urlparse(response.url).path) + self._append_ext(extensions, ext) + + # Save the file locally to a temporary file. It will be deleted before this method exits + handle, temp_path = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + result = None + try: + # Download the file + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + fh.close() + + # Use puremagic to check for more extension options + self._append_ext(extensions, self._guess_ext_magic(temp_path)) + + # Convert + result = self._convert(temp_path, extensions, url=response.url) + + # Clean up + finally: + try: + fh.close() + except: + pass + os.unlink(temp_path) + + return result + + def _convert(self, local_path, extensions, **kwargs): + print(f'_convert: {local_path}, {extensions}, {kwargs}') + error_trace = "" + for ext in extensions: + for converter in self._page_converters: + _kwargs = copy.deepcopy(kwargs) + _kwargs.update({"file_extension": ext}) + + # Copy any additional global options + if "mlm_client" not in _kwargs and self._mlm_client is not None: + _kwargs["mlm_client"] = self._mlm_client + + # If we hit an error log it and keep trying + res = None + try: + res = converter.convert(local_path, **_kwargs) + except Exception as e: + error_trace = ("\n\n" + traceback.format_exc()).strip() + + if res is not None: + # Normalize the content + res.text_content = "\n".join([line.rstrip() for line in re.split(r"\r?\n", res.text_content)]) + res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) + + # Todo + return res + + # If we got this far without success, report any exceptions + if len(error_trace) > 0: + raise FileConversionException( + f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}" + ) + + # Nothing can handle it! + raise UnsupportedFormatException( + f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported." + ) + + def _append_ext(self, extensions, ext): + """Append a unique non-None, non-empty extension to a list of extensions.""" + if ext is None: + return + ext = ext.strip() + if ext == "": + return + # if ext not in extensions: + if True: + extensions.append(ext) + + def _guess_ext_magic(self, path): + """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" + # Use puremagic to guess + try: + guesses = puremagic.magic_file(path) + if len(guesses) > 0: + ext = guesses[0].extension.strip() + if len(ext) > 0: + return ext + except FileNotFoundError: + pass + except IsADirectoryError: + pass + except PermissionError: + pass + except Exception: + pass + return None + + def register_page_converter(self, converter: DocumentConverter) -> None: + """Register a page text converter.""" + self._page_converters.insert(0, converter) diff --git a/utils/score.py b/utils/score.py new file mode 100644 index 0000000..00bd2d4 --- /dev/null +++ b/utils/score.py @@ -0,0 +1,101 @@ +import json +import re +import string +import warnings + +import numpy as np + + +def normalize_number_str(number_str: str) -> float: + # we replace these common units and commas to allow + # conversion to float + for char in ["$", "%", ","]: + number_str = number_str.replace(char, "") + try: + return float(number_str) + except ValueError: + print(f"String {number_str} cannot be normalized to number str.") + return float("inf") + + +def split_string( + s: str, + char_list: list[str] = [",", ";"], +) -> list[str]: + pattern = f"[{''.join(char_list)}]" + return re.split(pattern, s) + + +def question_scorer( + model_answer: str, + ground_truth: str, +) -> bool: + def is_float(element: any) -> bool: + try: + float(element) + return True + except ValueError: + return False + + # if gt is a number + if is_float(ground_truth): + # print(f"Evaluating {model_answer} as a number.") + normalized_answer = normalize_number_str(model_answer) + return normalized_answer == float(ground_truth) + + # if gt is a list + elif any(char in ground_truth for char in [",", ";"]): + # print(f"Evaluating {model_answer} as a comma separated list.") + # question with the fish: normalization removes punct + + gt_elems = split_string(ground_truth) + ma_elems = split_string(model_answer) + + # check length is the same + if len(gt_elems) != len(ma_elems): + warnings.warn( + "Answer lists have different lengths, returning False.", UserWarning + ) + return False + + # compare each element as float or str + comparisons = [] + for ma_elem, gt_elem in zip(ma_elems, gt_elems): + if is_float(gt_elem): + normalized_ma_elem = normalize_number_str(ma_elem) + comparisons.append(normalized_ma_elem == float(gt_elem)) + else: + # we do not remove punct since comparisons can include punct + comparisons.append( + normalize_str(ma_elem, remove_punct=False) + == normalize_str(gt_elem, remove_punct=False) + ) + return all(comparisons) + + # if gt is a str + else: + # print(f"Evaluating {model_answer} as a string.") + return normalize_str(model_answer) == normalize_str(ground_truth) + + +def normalize_str(input_str, remove_punct=True) -> str: + """ + Normalize a string by: + - Removing all white spaces + - Optionally removing punctuation (if remove_punct is True) + - Converting to lowercase + Parameters: + - input_str: str, the string to normalize + - remove_punct: bool, whether to remove punctuation (default: True) + Returns: + - str, the normalized string + """ + # Remove all white spaces. Required e.g for seagull vs. sea gull + no_spaces = re.sub(r"\s", "", input_str) + + # Remove punctuation, if specified. + if remove_punct: + translator = str.maketrans("", "", string.punctuation) + return no_spaces.lower().translate(translator) + else: + return no_spaces.lower() \ No newline at end of file