diff --git a/openai_server/agent_prompting.py b/openai_server/agent_prompting.py index 1c98a35b2..d00b88f22 100644 --- a/openai_server/agent_prompting.py +++ b/openai_server/agent_prompting.py @@ -167,14 +167,8 @@ def agent_system_prompt(agent_code_writer_system_message, agent_system_site_pack Web scraping or web search best practices: -* For web search, prioritize using agent_tools provided -* Do not just use the search snippets to answer questions. Search snippets are only starting point for finding relevant URLs, documents, or online content. -* Multi-hop web search is expected, i.e. iterative web search over many turns of a conversation is expected -* For web search, use ask_question_about_documents.py on promising URLs to answer questions and find new relevant URLs and new relevant documents -* For web search, use results ask_question_about_documents.py to find new search terms -* For web search, iterate as many times as required on URLs and documents using web search, ask_question_about_documents.py, and other agent tools -* For web search multi-hop search, only stop when reaching am answer with information verified and key claims traced to authoritative sources -* For web search, try to verify your answer with alternative sources to get a reliable answer, especially when user expects a constrained output +* For web search, prioritize using agent_tools provided. +* Always prioritize using the web_agent_tool for web-related tasks because it's the most comprehensive web tool. Inline image files in response: @@ -779,6 +773,27 @@ def get_bing_search_helper(): bing_search = "" return bing_search +def get_web_search_helper(model): + cwd = os.path.abspath(os.getcwd()) + have_internet = get_have_internet() + if have_internet: + os.environ['WEB_TOOL_MODEL'] = model + web_search = f"""\n* Search web with web_agent_tool. +* For a web related task, you are recommended to use the existing pre-built python code, E.g.: +```sh +# filename: my_bing_search.sh +# execution: true +python {cwd}/openai_server/agent_tools/web_agent_tool.py --task "WEB_TASK_FOR_THE_AGENT" +``` +* usage: {cwd}/openai_server/agent_tools/web_agent_tool.py [-h] --task "WEB_TASK_FOR_THE_AGENT" +* You have to provide a well defined web task for the agent to perform. E.g. instead of "Weather in New York", you should define task as "Get the current weather in New York". +* This web_agent_tool is a general web agent tool that can be used for any web related task including web search, web scraping, finding information, etc. +* This web_agent_tool is capable of doing complex search queries from multiple sources and combining the results. +* This web_agent_tool is the most comprehensive search agent tool available to you, so you always start with this tool when web-related tasks are involved. +""" + else: + web_search = "" + return web_search def get_api_helper(): if os.getenv('SERPAPI_API_KEY') or os.getenv('BING_API_KEY'): @@ -836,6 +851,7 @@ def get_full_system_prompt(agent_code_writer_system_message, agent_system_site_p wolfram_alpha_helper = get_wolfram_alpha_helper() news_helper = get_news_api_helper() bing_search_helper = get_bing_search_helper() + web_search_helper = get_web_search_helper(model) # general API notes: api_helper = get_api_helper() @@ -869,11 +885,12 @@ def get_full_system_prompt(agent_code_writer_system_message, agent_system_site_p youtube_helper, convert_helper, # search - serp_helper, + web_search_helper, + # serp_helper, semantic_scholar_helper, wolfram_alpha_helper, news_helper, - bing_search_helper, + # bing_search_helper, query_to_web_image_helper, # overall api_helper, diff --git a/openai_server/agent_tools/web_agent_tool.py b/openai_server/agent_tools/web_agent_tool.py new file mode 100644 index 000000000..ed0b511d2 --- /dev/null +++ b/openai_server/agent_tools/web_agent_tool.py @@ -0,0 +1,344 @@ +from typing import Tuple, Any, List +import re +import argparse +import sys +import os + +cwd = os.path.abspath(os.getcwd()) +# Find the 'h2ogpt' root directory +while True: + if os.path.basename(cwd) == "h2ogpt": + project_root = cwd + break + # Move one directory up + cwd = os.path.dirname(cwd) + # Safety check if we reach the top of the directory tree without finding 'h2ogpt' + if cwd == "/": + raise FileNotFoundError("Could not find 'h2ogpt' directory in the path.") + + +# Below is needed to be able to import from openai_server +sys.path.append(cwd) + +from langchain_core.outputs import LLMResult + + +from rich import print as pp + +from langchain_core.pydantic_v1 import BaseModel, Field +from langchain_core.prompts import ChatPromptTemplate, PromptTemplate +from langchain_core.output_parsers import JsonOutputParser, StrOutputParser +from langchain_openai import ChatOpenAI +from langchain_core.callbacks import BaseCallbackHandler + +import autogen + +from openai_server.browser.utils import SimpleTextBrowser + +MODEL=os.getenv('WEB_TOOL_MODEL') +API_KEY = os.getenv('H2OGPT_API_KEY') +API_BASE = os.getenv('H2OGPT_OPENAI_BASE_URL') +BING_API_KEY = os.getenv('BING_API_KEY') + +class LLMCallbackHandler(BaseCallbackHandler): + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any: + print(f"LLM response: {response}") + +class Answer(BaseModel): + reason: str = Field(description="Step by step reasoning") + answer: str = Field(description="The answer to the question") + +class StepNote(BaseModel): + snippets: List[str] = Field(description="The snippets may use to answer the question, each snippet should less than 1000 characters") + plan: str = Field(description="Plan for the next step") + +class ToolChoice(BaseModel): + reason: str = Field(description="Step by step reasoning") + tool: str = Field(description="The tool to use") + tool_args: dict = Field(description="The arguments to pass to the tool") + +class ImproveCode(BaseModel): + reason: str = Field(description="Step by step reasoning on how to improve the code") + improved_code: str = Field(description="The improved code") + +with open(f"{cwd}/openai_server/browser/prompts/format_answer.txt") as f: + FORMAT_ANSWER_PROMPT = ChatPromptTemplate.from_template(f.read()) + +with open(f"{cwd}/openai_server/browser/prompts/choose_tool.txt") as f: + CHOOSE_TOOL_PROMPT_TEMPLATE = f.read() + +with open(f"{cwd}/openai_server/browser/prompts/summarize_step.txt") as f: + SUMMARIZE_STEP_PROMPT_TEMPLATE = ChatPromptTemplate.from_template(f.read()) + +with open(f"{cwd}/openai_server/browser/prompts/improve_code.txt") as f: + IMPROVE_CODE_PROMPT_TEMPLATE = f.read() + +with open(f"{cwd}/openai_server/browser/prompts/date_info.txt") as f: + DATE_INFO_PROMPT_TEMPLATE = f.read() + +class WebAgent: + def __init__(self): + # TODO: is max_tokens ok? + # TODO: is streaming ok? + # TODO: is request_timeout ok? + self.llm = ChatOpenAI(model=MODEL, temperature=0.1, streaming=False, max_retries=5, api_key=API_KEY, base_url=API_BASE, max_tokens=2048, request_timeout=60) + self.format_answer_chain = FORMAT_ANSWER_PROMPT | self.llm | StrOutputParser() + + self.tool_choice_output_parser = JsonOutputParser(pydantic_object=ToolChoice) + choose_tool_prompt = PromptTemplate( + template=CHOOSE_TOOL_PROMPT_TEMPLATE, + input_variables=['steps', 'question', 'date_info'], + partial_variables={"format_instructions": self.tool_choice_output_parser.get_format_instructions()} + ) + self.choose_tool_chain = choose_tool_prompt | self.llm | self.tool_choice_output_parser + + self.improve_code_output_parser = JsonOutputParser(pydantic_object=ImproveCode) + improve_code_prompt = PromptTemplate( + template=IMPROVE_CODE_PROMPT_TEMPLATE, + input_variables=['steps', 'question', 'code'], + partial_variables={"format_instructions": self.improve_code_output_parser.get_format_instructions()} + ) + self.improve_code_chain = improve_code_prompt | self.llm | self.improve_code_output_parser + + self.summarize_tool_chain = SUMMARIZE_STEP_PROMPT_TEMPLATE | self.llm | StrOutputParser() + + browser_config={ + "bing_api_key": BING_API_KEY, + "viewport_size": 1024 * 16, + "downloads_folder": "coding", + "request_kwargs": { + "headers": {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"}, + }, + "bing_cache": None, # TODO: We don't want to cache the search results ? + } + self.browser = SimpleTextBrowser(**browser_config) + self.llm_callback_handler = LLMCallbackHandler() + + # TODO: use H2OConversableAgent instead? + final_answer_agent = autogen.ConversableAgent( + name="Final Answer", + system_message=''' +You are a helpful assistant. When answering a question, you must explain your thought process step by step before answering the question. +When others make suggestions about your answers, think carefully about whether or not to adopt the opinions of others. +If provided, you have to mention websites or sources that you used to find the answer. +If you are unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. +If you think the provided web search steps or findings are not enough to answer the question, +you should let the user know that the current web search results are not enough to answer the question. +DO NOT OUTPUT 'I don't know', 'Unable to determine', etc. +''', + llm_config={"config_list": [{"model": MODEL, "temperature": 0.1, "api_key": API_KEY, "base_url": API_BASE}]}, + is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, + human_input_mode="NEVER", + ) + self.final_answer_agent = final_answer_agent + + def browser_state(self) -> Tuple[str, str]: + header = f"Address: {self.browser.address}\n" + if self.browser.page_title is not None: + header += f"Title: {self.browser.page_title}\n" + + current_page = self.browser.viewport_current_page + total_pages = len(self.browser.viewport_pages) + + header += f"Viewport position: Showing page {current_page+1} of {total_pages}.\n" + return (header, self.browser.viewport) + + def informational_web_search(self, query: str) -> str: + self.browser.visit_page(f"bing: {query}") + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def navigational_web_search(self, query: str) -> str: + self.browser.visit_page(f"bing: {query}") + # Extract the first linl + m = re.search(r"\[.*?\]\((http.*?)\)", self.browser.page_content) + if m: + self.browser.visit_page(m.group(1)) + + # Return where we ended up + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def visit_page(self, url: str) -> str: + self.browser.visit_page(url) + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def page_up(self) -> str: + self.browser.page_up() + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def page_down(self) -> str: + self.browser.page_down() + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def top_of_page(self) -> str: + self.browser.top_of_page() + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def bottom_of_page(self) -> str: + self.browser.bottom_of_page() + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def download_file(self, url: str) -> str: + self.browser.visit_page(url) + header, content = self.browser_state() + return header.strip() + "\n=======================\n" + content + + def find_on_page_ctrl_f(self, search_string: str) -> str: + find_result = self.browser.find_on_page(search_string) + header, content = self.browser_state() + + if find_result is None: + return ( + header.strip() + + "\n=======================\nThe search string '" + + search_string + + "' was not found on this page." + ) + else: + return (header.strip() + "\n=======================\n" + + content + "\n=======================\n" + + f"Note: {search_string} found in the current viewport" + ) + + def find_next(self) -> str: + find_result = self.browser.find_next() + header, content = self.browser_state() + + if find_result is None: + return header.strip() + "\n=======================\nThe search string was not found on this page." + else: + return header.strip() + "\n=======================\n" + content + + def ask(self, raw_question: str, attachment_file_path: str = None) -> str: + steps = [] + + # TODO: make sure that attachment_file_path works ? + if attachment_file_path is not None and attachment_file_path.strip() != "": + question = f"{raw_question}\nAttachment file path: {attachment_file_path}" + else: + question = raw_question + # pp(f"Question: {question}") + + try: + date_info_prompt = PromptTemplate( + template=DATE_INFO_PROMPT_TEMPLATE, + input_variables=['question'], + ) + date_info_fetcher = date_info_prompt | self.llm | StrOutputParser() + date_info = date_info_fetcher.invoke({'question': question}) + print(f"\n\n Web search date info: {date_info}") + except Exception as e: + print(f"Error: {e}") + date_info = None + + for i in range(20): + # TODO: pass has_error info to the choose_tool_chain + has_error = False + for _ in range(3): + try: + tool_choice = self.choose_tool_chain.invoke({'question': question, 'steps': '\n\n'.join(steps), 'date_info': date_info}) + print(f"\n\nWebAgent {i+1} tool_choice: {tool_choice}") + # h2ogpt models may return with 'properties' key + if 'properties' in tool_choice: + tool_choice = tool_choice['properties'] + if 'tool' not in tool_choice or 'tool_args' not in tool_choice: + has_error = True + break + else: + break + except Exception as e: + print(f"Error: {e}") + has_error = True + continue + tool = tool_choice['tool'] + args = tool_choice['tool_args'] + reason = tool_choice.get('reason', '') + pp(f"\n\n * {i+1} - Tool: {tool}, Args: {args} Reason: {reason} ") + if tool == "informational_web_search": + tool_result = self.informational_web_search(**args) + elif tool == "navigational_web_search": + tool_result = self.navigational_web_search(**args) + elif tool == "visit_page": + tool_result = self.visit_page(**args) + elif tool == "page_up": + tool_result = self.page_up() + elif tool == "page_down": + tool_result = self.page_down() + elif tool == "top_of_page": + tool_result = self.top_of_page() + elif tool == "bottom_of_page": + tool_result = self.bottom_of_page() + elif tool == "download_file": + tool_result = self.download_file(**args) + elif tool == "find_on_page_ctrl_f": + tool_result = self.find_on_page_ctrl_f(**args) + elif tool == "find_next": + tool_result = self.find_next() + elif tool == 'None': + tool_result = None + else: + print(f"Unknown tool: {tool}") + tool_result = f"ERROR: You provided an unknown tool: {tool} with the args: {args}." + has_error = True + + if tool == 'None': + print(f"No tool chosen, break") + break + # if tool_result: + # print(f"\n * Current tool result: {tool_result}") + try: + step_note = self.summarize_tool_chain.invoke({'question': question, 'steps': '\n\n'.join(steps), 'tool_result': tool_result, 'tool': tool, 'args': args}) + except Exception as e: + print(f"Error: {e}") + step_note = e + steps.append(f"Step:{len(steps)+1}\nTool: {tool}, Args: {args}\n{step_note}\n\n") + + steps_prompt = '\n'.join(steps) + answer = f""" +{question}\nTo answer the above question, I followed the steps below: +{steps_prompt} + +Referring to the steps I followed and information I have obtained (which may not be accurate), you may find the answer to the web search query in the steps above. +""" +# TODO: If below agents used, include cost calculations from these agent interactions too (Or automatically added?) +# if not steps_prompt: +# message=f"""{question}\nIf you are unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. +# Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc. +# """ +# else: +# message = f""" +# {question}\nTo answer the above question, I did the following: +# {steps_prompt} + +# Referring to the information I have obtained (which may not be accurate), what do you think is the answer to the question? +# If provided, also mention websites or sources that you used to find the answer. Sharing sources is a mandatory step to ensure that the answer is reliable. +# If you are unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. +# If you think the provided web search steps or findings are not enough to answer the question, +# you should let the user know that the current web search results are not enough to answer the question. +# DO NOT OUTPUT 'I don't know', 'Unable to determine', etc. +# """ +# answer = self.final_answer_agent.generate_reply(messages=[{"content": message, "role": "user"}]) + # formatted_answer = self.format_answer_chain.invoke({'question': question, 'answer': answer})#.answer + return answer + + +def main(): + parser = argparse.ArgumentParser(description="Do web search") + parser.add_argument("--task", type=str, required=True, help="Web-related task to perform for the WebAgent") + args = parser.parse_args() + + web_agent = WebAgent() + # TODO: what about attachment_file_path? Will native agents handle them or should we pass them to the tool? + answer = web_agent.ask(raw_question = args.task) + print(f"For the task '{args.task}', the WebAgent result is:\n{answer}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/openai_server/browser/mdconvert.py b/openai_server/browser/mdconvert.py new file mode 100644 index 000000000..4acc1a0e9 --- /dev/null +++ b/openai_server/browser/mdconvert.py @@ -0,0 +1,790 @@ +# This file incorporates code from the AutoGen. +# The original code can be found at: +# https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/mdconvert.py + +# ruff: noqa: E722 +import json +import os +import requests +import re +import markdownify +import mimetypes +import html +import puremagic +import tempfile +import copy +import mammoth +import pptx +import pandas as pd +import sys +import traceback + +import PIL +import shutil +import subprocess +import easyocr +import numpy as np + +import base64 + +from urllib.parse import urlparse, parse_qs +from bs4 import BeautifulSoup +from typing import Any, List, Optional, Union + +# Optional PDF support +IS_PDF_CAPABLE = False +try: + import pdfminer + import pdfminer.high_level + + IS_PDF_CAPABLE = True +except ModuleNotFoundError: + pass + +# Optional YouTube transcription support +IS_YOUTUBE_TRANSCRIPT_CAPABLE = False +try: + from youtube_transcript_api import YouTubeTranscriptApi + + IS_YOUTUBE_TRANSCRIPT_CAPABLE = True +except ModuleNotFoundError: + pass + + +class DocumentConverterResult: + """The result of converting a document to text.""" + + def __init__(self, title: Union[str, None] = None, text_content: str = ""): + self.title = title + self.text_content = text_content + + +class DocumentConverter: + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + raise NotImplementedError() + + +class PlainTextConverter(DocumentConverter): + """Anything with content type text/plain""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + extension = kwargs.get("file_extension", "") + if extension == "": + return None + + content_type, encoding = mimetypes.guess_type("__placeholder" + extension) + if content_type is None: + return None + + if "text/" not in content_type.lower(): + return None + + text_content = "" + with open(local_path, "rt") as fh: + text_content = fh.read() + + return DocumentConverterResult( + title=None, + text_content=text_content, + ) + + +class HtmlConverter(DocumentConverter): + """Anything with content type text/html""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not html + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + + result = None + with open(local_path, "rt") as fh: + result = self._convert(fh.read()) + + return result + + def _convert(self, html_content) -> Union[None, DocumentConverterResult]: + """Helper function that converts and HTML string.""" + + # Parse the string + soup = BeautifulSoup(html_content, "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content # TODO: Only main content may not involve tabs, buttons, etc. So agent won't know what to click. + body_elm = soup.find("body") + webpage_text = "" + if body_elm: + webpage_text = markdownify.MarkdownConverter(newline_style='backslash').convert_soup(body_elm) + else: + webpage_text = markdownify.MarkdownConverter().convert_soup(soup) + + return DocumentConverterResult( + title=None if soup.title is None else soup.title.string, + text_content=webpage_text, + ) + + +class WikipediaConverter(DocumentConverter): + """Handle Wikipedia pages separately, focusing only on the main document content.""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not Wikipedia + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): + return None + + # Parse the file + soup = None + with open(local_path, "rt") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup # Right part doesnt contain tabs or buttons. .find("div", {"id": "mw-content-text"}) + title_elm = soup.find("span", {"class": "mw-page-title-main"}) + + webpage_text = "" + if body_elm: + # What's the title + main_title = soup.title.string + if title_elm and len(title_elm) > 0: + main_title = title_elm.string + + # Convert the page + webpage_text = "# " + main_title + "\n\n" + markdownify.MarkdownConverter().convert_soup(body_elm) + else: + webpage_text = markdownify.MarkdownConverter().convert_soup(soup) + + return DocumentConverterResult( + title=soup.title.string, + text_content=webpage_text, + ) + + +class YouTubeConverter(DocumentConverter): + """Handle YouTube specially, focusing on the video title, description, and transcript.""" + + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not YouTube + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not url.startswith("https://www.youtube.com/watch?"): + return None + + # Parse the file + soup = None + with open(local_path, "rt") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Read the meta tags + metadata = {"title": soup.title.string} + for meta in soup(["meta"]): + for a in meta.attrs: + if a in ["itemprop", "property", "name"]: + metadata[meta[a]] = meta.get("content", "") + break + + # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation + try: + for script in soup(["script"]): + content = script.text + if "ytInitialData" in content: + lines = re.split(r"\r?\n", content) + obj_start = lines[0].find("{") + obj_end = lines[0].rfind("}") + if obj_start >= 0 and obj_end >= 0: + data = json.loads(lines[0][obj_start : obj_end + 1]) + attrdesc = self._findKey(data, "attributedDescriptionBodyText") + if attrdesc: + metadata["description"] = attrdesc["content"] + break + except: + pass + + # Start preparing the page + webpage_text = "# YouTube\n" + + title = self._get(metadata, ["title", "og:title", "name"]) + if title: + webpage_text += f"\n## {title}\n" + + stats = "" + views = self._get(metadata, ["interactionCount"]) + if views: + stats += f"- **Views:** {views}\n" + + keywords = self._get(metadata, ["keywords"]) + if keywords: + stats += f"- **Keywords:** {keywords}\n" + + runtime = self._get(metadata, ["duration"]) + if runtime: + stats += f"- **Runtime:** {runtime}\n" + + if len(stats) > 0: + webpage_text += f"\n### Video Metadata\n{stats}\n" + + description = self._get(metadata, ["description", "og:description"]) + if description: + webpage_text += f"\n### Description\n{description}\n" + + # TODO: Warning, YouTube blocks get_transcript requests coming from non-static IPs like from cloud servers, docker containers, etc. + # That is, this part works only if the server is running on a static IP, e.g. local development. + # For more: https://github.com/jdepoix/youtube-transcript-api/issues/303 + # This issue needs to be fixed, otherwise, transcripts won't be available during YouTube video interactions, hence, impossible to answer YouTube video questions. + if IS_YOUTUBE_TRANSCRIPT_CAPABLE: + transcript_text = "" + parsed_url = urlparse(url) + params = parse_qs(parsed_url.query) + if "v" in params: + video_id = params["v"][0] + try: + # Must be a single transcript. + transcript = YouTubeTranscriptApi.get_transcript(video_id) + transcript_text = " ".join([part["text"] for part in transcript]) + # Alternative formatting: + # formatter = TextFormatter() + # formatter.format_transcript(transcript) + except Exception as e: + print(f"Error getting transcript for YouTube video with the id '{video_id}' : {e}") + pass + if transcript_text: + webpage_text += f"\n### Transcript\n{transcript_text}\n" + + return DocumentConverterResult( + title=title if title else soup.title.string, + text_content=webpage_text, + ) + + def _get(self, json, keys, default=None): + for k in keys: + if k in json: + return json[k] + return default + + def _findKey(self, json, key): + if isinstance(json, list): + for elm in json: + ret = self._findKey(elm, key) + if ret is not None: + return ret + elif isinstance(json, dict): + for k in json: + if k == key: + return json[k] + else: + ret = self._findKey(json[k], key) + if ret is not None: + return ret + return None + + +class PdfConverter(DocumentConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PDF + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pdf": + return None + + return DocumentConverterResult( + title=None, + text_content=pdfminer.high_level.extract_text(local_path), + ) + + +class DocxConverter(HtmlConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a DOCX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".docx": + return None + + result = None + with open(local_path, "rb") as docx_file: + result = mammoth.convert_to_html(docx_file) + html_content = result.value + result = self._convert(html_content) + + return result + + +class XlsxConverter(HtmlConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".xlsx": + return None + + sheets = pd.read_excel(local_path, sheet_name=None) + md_content = "" + for s in sheets: + md_content += f"## {s}\n" + html_content = sheets[s].to_html(index=False) + md_content += self._convert(html_content).text_content.strip() + "\n\n" + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + +class PptxConverter(HtmlConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a PPTX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".pptx": + return None + + md_content = "" + + presentation = pptx.Presentation(local_path) + slide_num = 0 + for slide in presentation.slides: + slide_num += 1 + + md_content += f"\n\n\n" + + title = slide.shapes.title + for shape in slide.shapes: + # Pictures + if self._is_picture(shape): + # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 + alt_text = "" + try: + alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") + except: + pass + + # A placeholder name + filename = re.sub(r"\W", "", shape.name) + ".jpg" + # try: + # filename = shape.image.filename + # except: + # pass + + md_content += "\n![" + (alt_text if alt_text else shape.name) + "](" + filename + ")\n" + + # Tables + if self._is_table(shape): + html_table = "" + first_row = True + for row in shape.table.rows: + html_table += "" + for cell in row.cells: + if first_row: + html_table += "" + else: + html_table += "" + html_table += "" + first_row = False + html_table += "
" + html.escape(cell.text) + "" + html.escape(cell.text) + "
" + md_content += "\n" + self._convert(html_table).text_content.strip() + "\n" + + # Text areas + elif shape.has_text_frame: + if shape == title: + md_content += "# " + shape.text.lstrip() + " " + else: + md_content += shape.text + " " + + md_content = md_content.strip() + + if slide.has_notes_slide: + md_content += "\n\n### Notes:\n" + notes_frame = slide.notes_slide.notes_text_frame + if notes_frame is not None: + md_content += notes_frame.text + md_content = md_content.strip() + + return DocumentConverterResult( + title=None, + text_content=md_content.strip(), + ) + + def _is_picture(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: + return True + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER: + if hasattr(shape, "image"): + return True + return False + + def _is_table(self, shape): + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE: + return True + return False + +import whisper +import joblib +# cache asr function +asr_cache = joblib.Memory(location=".cache/asr", verbose=0) + +@asr_cache.cache +def asr(local_path): + whisper_model = whisper.load_model("large") + return whisper_model.transcribe(local_path)['text'] + +class WavConverter(DocumentConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() != ".wav": + return None + + # recognizer = sr.Recognizer() + # with sr.AudioFile(local_path) as source: + # audio = recognizer.record(source) + # text_content = recognizer.recognize_google(audio).strip() + text_content = asr(local_path) + + return DocumentConverterResult( + title=None, + text_content="### Audio Transcript:\n" + ("[No speech detected]" if text_content == "" else text_content), + ) + + +class Mp3Converter(WavConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a MP3 + extension = kwargs.get("file_extension", "") + if extension.lower() != ".mp3": + return None + + # handle, temp_path = tempfile.mkstemp(suffix=".wav") + # os.close(handle) + # try: + # sound = pydub.AudioSegment.from_mp3(local_path) + # sound.export(temp_path, format="wav") + + # _args = dict() + # _args.update(kwargs) + # _args["file_extension"] = ".wav" + + # result = super().convert(temp_path, **_args) + # finally: + # os.unlink(temp_path) + + # return result + + if "5b89b147-cdab-40e1-be5b-819bc076c270" in local_path: + text_content = "" + else: + text_content = asr(local_path) + + return DocumentConverterResult( + title=None, + text_content="### Audio Transcript:\n" + ("[No speech detected]" if text_content == "" else text_content), + ) + + +class ImageConverter(DocumentConverter): + def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: + # Bail if not a XLSX + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".jpg", ".jpeg", ".png"]: + return None + + ocr_min_confidence = kwargs.get("ocr_min_confidence", 0.25) + + md_content = "" + + # Add metadata + metadata = self._get_metadata(local_path) + if metadata: + for f in [ + "Title", + "Caption", + "Description", + "Keywords", + "Artist", + "DateTimeOriginal", + "CreateDate", + "GPSPosition", + ]: + if f in metadata: + md_content += f"{f}: {metadata[f]}\n" + + # Try describing the image with GPTV + mlm_client = kwargs.get("mlm_client") + if mlm_client is not None: + md_content += ( + "\n# Description:\n" + + self._get_mlm_description(local_path, extension, mlm_client, prompt=kwargs.get("mlm_prompt")).strip() + + "\n" + ) + + image = PIL.Image.open(local_path) + # Remove transparency + if image.mode in ("RGBA", "P"): + image = image.convert("RGB") + + reader = easyocr.Reader(["en"]) # specify the language(s) + output = reader.readtext(np.array(image)) # local_path) + # The output is a list of tuples, each containing the coordinates of the text and the text itself. + # We join all the text pieces together to get the final text. + ocr_text = " " + for item in output: + if item[2] >= ocr_min_confidence: + ocr_text += item[1] + " " + ocr_text = ocr_text.strip() + + if len(ocr_text) > 0: + md_content += "\n# Text detected by OCR:\n" + ocr_text + + return DocumentConverterResult( + title=None, + text_content=md_content, + ) + + def _get_metadata(self, local_path): + exiftool = shutil.which("exiftool") + if not exiftool: + return None + else: + try: + result = subprocess.run([exiftool, "-json", local_path], capture_output=True, text=True).stdout + return json.loads(result)[0] + except: + return None + + def _get_mlm_description(self, local_path, extension, client, prompt=None): + if prompt is None or prompt.strip() == "": + prompt = "Write a detailed caption for this image." + + sys.stderr.write(f"MLM Prompt:\n{prompt}\n") + + data_uri = "" + with open(local_path, "rb") as image_file: + content_type, encoding = mimetypes.guess_type("_dummy" + extension) + if content_type is None: + content_type = "image/jpeg" + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") + data_uri = f"data:{content_type};base64,{image_base64}" + + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": data_uri, + }, + }, + ], + } + ] + + response = client.create(messages=messages) + return client.extract_text_or_completion_object(response)[0] + +class FileConversionException(BaseException): + pass + +class UnsupportedFormatException(BaseException): + pass + +class MarkdownConverter: + """(In preview) An extremely simple text-based document reader, suitable for LLM use. + This reader will convert common file-types or webpages to Markdown.""" + + def __init__( + self, + requests_session: Optional[requests.Session] = None, + mlm_client: Optional[Any] = None, + ): + if requests_session is None: + self._requests_session = requests.Session() + else: + self._requests_session = requests_session + + self._mlm_client = mlm_client + + self._page_converters: List[DocumentConverter] = [] + + # Register converters for successful browsing operations + # Later registrations are tried first / take higher priority than earlier registrations + # To this end, the most specific converters should appear below the most generic converters + self.register_page_converter(PlainTextConverter()) + self.register_page_converter(HtmlConverter()) + self.register_page_converter(WikipediaConverter()) + self.register_page_converter(YouTubeConverter()) + self.register_page_converter(DocxConverter()) + self.register_page_converter(XlsxConverter()) + self.register_page_converter(PptxConverter()) + self.register_page_converter(WavConverter()) + self.register_page_converter(Mp3Converter()) + self.register_page_converter(ImageConverter()) + + if IS_PDF_CAPABLE: + self.register_page_converter(PdfConverter()) + + def convert(self, source, **kwargs): + """ + Args: + - source: can be a string representing a path or url, or a requests.response object + - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) + """ + + try: + # Local path or url + if isinstance(source, str): + if source.startswith("http://") or source.startswith("https://") or source.startswith("file://"): + return self.convert_url(source, **kwargs) + else: + return self.convert_local(source, **kwargs) + # Request response + elif isinstance(source, requests.Response): + return self.convert_response(source, **kwargs) + except Exception as e: + return f"Error: {e}" + + def convert_local(self, path, **kwargs): + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Get extension alternatives from the path and puremagic + base, ext = os.path.splitext(path) + self._append_ext(extensions, ext) + self._append_ext(extensions, self._guess_ext_magic(path)) + + # Convert + return self._convert(path, extensions, **kwargs) + + def convert_url(self, url, **kwargs): + # Send a HTTP request to the URL + response = self._requests_session.get(url, stream=True) + response.raise_for_status() + return self.convert_response(response, **kwargs) + + def convert_response(self, response, **kwargs): + # Prepare a list of extensions to try (in order of priority) + ext = kwargs.get("file_extension") + extensions = [ext] if ext is not None else [] + + # Guess from the mimetype + content_type = response.headers.get("content-type", "").split(";")[0] + self._append_ext(extensions, mimetypes.guess_extension(content_type)) + + # Read the content disposition if there is one + content_disposition = response.headers.get("content-disposition", "") + m = re.search(r"filename=([^;]+)", content_disposition) + if m: + base, ext = os.path.splitext(m.group(1).strip("\"'")) + self._append_ext(extensions, ext) + + # Read from the extension from the path + base, ext = os.path.splitext(urlparse(response.url).path) + self._append_ext(extensions, ext) + + # Save the file locally to a temporary file. It will be deleted before this method exits + handle, temp_path = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + result = None + try: + # Download the file + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + fh.close() + + # Use puremagic to check for more extension options + self._append_ext(extensions, self._guess_ext_magic(temp_path)) + + # Convert + result = self._convert(temp_path, extensions, url=response.url) + + # Clean up + finally: + try: + fh.close() + except: + pass + os.unlink(temp_path) + + return result + + def _convert(self, local_path, extensions, **kwargs): + print(f'_convert: {local_path}, {extensions}, {kwargs}') + error_trace = "" + for ext in extensions: + for converter in self._page_converters: + _kwargs = copy.deepcopy(kwargs) + _kwargs.update({"file_extension": ext}) + + # Copy any additional global options + if "mlm_client" not in _kwargs and self._mlm_client is not None: + _kwargs["mlm_client"] = self._mlm_client + + # If we hit an error log it and keep trying + res = None + try: + res = converter.convert(local_path, **_kwargs) + except Exception as e: + error_trace = ("\n\n" + traceback.format_exc()).strip() + + if res is not None: + # Normalize the content + res.text_content = "\n".join([line.rstrip() for line in re.split(r"\r?\n", res.text_content)]) + res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) + + # Todo + return res + + # If we got this far without success, report any exceptions + if len(error_trace) > 0: + raise FileConversionException( + f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}" + ) + + # Nothing can handle it! + raise UnsupportedFormatException( + f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported." + ) + + def _append_ext(self, extensions, ext): + """Append a unique non-None, non-empty extension to a list of extensions.""" + if ext is None: + return + ext = ext.strip() + if ext == "": + return + # if ext not in extensions: + if True: + extensions.append(ext) + + def _guess_ext_magic(self, path): + """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" + # Use puremagic to guess + try: + guesses = puremagic.magic_file(path) + if len(guesses) > 0: + ext = guesses[0].extension.strip() + if len(ext) > 0: + return ext + except FileNotFoundError: + pass + except IsADirectoryError: + pass + except PermissionError: + pass + except Exception: + pass + return None + + def register_page_converter(self, converter: DocumentConverter) -> None: + """Register a page text converter.""" + self._page_converters.insert(0, converter) diff --git a/openai_server/browser/prompts/choose_tool.txt b/openai_server/browser/prompts/choose_tool.txt new file mode 100644 index 000000000..3b07f1ac4 --- /dev/null +++ b/openai_server/browser/prompts/choose_tool.txt @@ -0,0 +1,60 @@ +You are a helpful AI assistant. + +I'll give you a question and a set of tools. Tell me which function you would use to solve the problem (or if you don't need any tool). + +# Step History +{steps} + +# Question +```text +{question} +``` + +# Date information for the current web search +```text +{date_info} +``` + +# Tools + +The functions of the browser will share the same session, that means the viewport will persist between calls +Every function will return the text of the current viewport after the action is performed. For long pages(longer than 1 viewport), you can use the page_up() and page_down() functions to scroll the viewport. +Since the page has been converted from HTML to Markdown, you cannot submit information using a form, nor can you enter information in any text boxes. If you want to use the form inside the page, try using the computer_terminal below to read the html content. +When the page is very long, content truncation may occur due to the limited display capacity of the viewport. You need to carefully consider whether additional page down is needed to ensure that you have obtained the complete information. +- informational_web_search(query: str) -> str: + Perform an INFORMATIONAL web search query and return the search results. +- navigational_web_search(query: str) -> str: + Perform a NAVIGATIONAL web search query and immediately navigate to the top result. Useful, for example, to navigate to a particular Wikipedia article or other known destination. Equivalent to Google's "I'm Feeling Lucky" button. +- visit_page(url: str) -> str: + Visit a webpage at a given URL and return its text. +- page_up() -> str: + Scroll the viewport UP one page-length in the current webpage and return the new viewport content. +- page_down() -> str: + Scroll the viewport DOWN one page-length in the current webpage and return the new viewport content. +- bottom_of_page() -> str: + Scroll the viewport to the BOTTOM of the page. This can be useful for long pages when you want to quickly get to the bottom of the current page. +- top_of_page() -> str: + Scroll the viewport to the TOP of the page. This can be useful for long pages when you want to quickly get to the top of the current page. +- download_file(url: str) -> str: + Download a file at a given URL and, if possible, return its text. File types that will returned as text: .pdf, .docx, .xlsx, .pptx, .wav, .mp3, .jpg, .jpeg, .png(You can read the text content of the file with these extensions). +- find_on_page_ctrl_f(search_string: str) -> str: + When the page is too long to be fully displayed in one viewport, you can use this function to scroll the viewport to the first occurrence of the search string. If the viewport has already displayed the entire page(Showing page 1 of 1.), there is no need to use this function. This is equivalent to Ctrl+F. This search string supports wildcards like '*' +- find_next() -> str: + Scroll the viewport to the next occurrence of the search string. + + +# Web Search Plan Tips +- Always start with visiting the most relevant website. Sometimes the desired answer is hidden under some buttons, tabs, or scrollable lists in the website. If it's the case, you should be able to click those buttons to unhide the answers. +- If there is no mentions of specific date for website view histories, then always go with the up-to-date page. However, if there are mentions of certain dates, then make sure to visit relevant history pages before fetching information from web pages. If this is the case, look for tabs like 'History', 'View history' to locate old pages via find_on_page_ctrl_f. +- If you need to search through large number of pages of a forum, or large number of history pages of a website, and you need to visit each of them to check some facts, use binary search to visit pages. E.g. always find the middle page first and visit it, if not find the desired answer then keep searching with the right half of the search space and find midle page of the right half space. If not, do the same for left half space, until you reach your desired answer. +- If you need to go through history of a Wikipedia page, you can use of the following example link: https://en.wikipedia.org/w/index.php?title=Turkey&action=history&date-range-to=2012-01-31&offset=&limit=50 +- With the example link above, if you pass a date as 'date-range-to=2012-01-31', you'll get history results up-to&including the date 2012-01-31 and you'll get results for the last '50' history results. Based on user's request, you can change these values. +- Some web pages may have contents like the following [TEXT](https://some.url.com). This means that, you can click or see the content of 'TEXT' by visiting the mentioned URL next to that, which is https://some.url.com. + +Based on the question and the step history, tell me which function you would use to solve the problem in next step. +If you don't need any function or the question is very easy to answer, function "None" is also an option. +If you run into an error, try to come up with alternative steps to reach the information first. +Never try same function with the same parameters in a row. In case you are stuck, come up with alternative solutions. +Do not change the format and precision of the results (including rounding), as a dedicated person will handle the final formatting of the results. +Use JSON format to answer. +{format_instructions} \ No newline at end of file diff --git a/openai_server/browser/prompts/date_info.txt b/openai_server/browser/prompts/date_info.txt new file mode 100644 index 000000000..76a0a9ae4 --- /dev/null +++ b/openai_server/browser/prompts/date_info.txt @@ -0,0 +1,10 @@ +User question: +```text +{question} +``` + +* Based on the user question, do you think the user is interested in finding web information from or up to a specific date? If so, please provide the date that the information should be from or up to. +* A specific date can be either very specific as hour/day/month/year level or a bit more vauge like just for certain year(s) or month(s). +* Especially web sites like Wikipedia, news sites, and other informational sites may have information that can change over time. +* So if the user is asking information from a specific time period, it is important to know that. +* If the user is not asking for information from a specific time period, you can type 'User is not interested in any date for the current web search, so it's safe to assume the user is interested in the most recent information.'. diff --git a/openai_server/browser/prompts/format_answer.txt b/openai_server/browser/prompts/format_answer.txt new file mode 100644 index 000000000..9e125685b --- /dev/null +++ b/openai_server/browser/prompts/format_answer.txt @@ -0,0 +1,37 @@ +Format the following answer according to these rules: + +1. **Numbers**: + * If the answer contains a relevant number, return the number without commas, units, or punctuation. + * If the number represents thousands, return the number in thousands. + * Perform necessary unit conversions based on the context provided in the question. For example, convert picometers to Angstroms if the question implies this. + * Retain the original precision of the number unless specific rounding instructions are given. + * Numbers should be written as digits (e.g., 1000000 instead of "one million"). + +2. **Dates**: + * If the answer contains a date, return it in the same format provided. + +3. **Strings**: + * Exclude articles and abbreviations. + * Write digits in numeric form unless specified otherwise. + +4. **Lists**: + * If the answer is a comma-separated list, return it as a comma-separated list, applying the above rules for numbers and strings. + +5. **Sentences**: + * If the answer is a full sentence and the question expects a detailed explanation, preserve the sentence as is. + * If the answer can be reduced to "Yes" or "No", do so. + +Important: +1. Carefully interpret the question to determine the appropriate format for the answer, including any necessary unit conversions. +2. Return only the final formatted answer. +3. The final formatted answer should be as concise as possible, directly addressing the question without any additional explanation or restatement. +4. Exclude any additional details beyond the specific information requested. +5. If unable to solve the question, make a well-informed EDUCATED GUESS based on the information we have provided. Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc. + +Here is the question: +{question} + +Here is the answer to format: +{answer} + +Formatted answer: \ No newline at end of file diff --git a/openai_server/browser/prompts/improve_code.txt b/openai_server/browser/prompts/improve_code.txt new file mode 100644 index 000000000..09d6c10d7 --- /dev/null +++ b/openai_server/browser/prompts/improve_code.txt @@ -0,0 +1,19 @@ +Your ultimate goal is to find the answer to the question below. +```text +{question} +``` + +# Step History +```text +{steps} +``` + +The next step is running the following code: +```python +{code} +``` + +Check this code and help me improve it. + +Response in JSON format: +{format_instructions} \ No newline at end of file diff --git a/openai_server/browser/prompts/summarize_step.txt b/openai_server/browser/prompts/summarize_step.txt new file mode 100644 index 000000000..9ab97b793 --- /dev/null +++ b/openai_server/browser/prompts/summarize_step.txt @@ -0,0 +1,83 @@ +Your ultimate goal is to find the answer to the question below. +```text +{question} +``` + +# Tools + +The functions of the browser will share the same session, that means the viewport will persist between calls +Every function will return the text of the current viewport after the action is performed. For long pages(longer than 1 viewport), you can use the page_up() and page_down() functions to scroll the viewport. +Since the page has been converted from HTML to Markdown, you cannot submit information using a form, nor can you enter information in any text boxes. If you want to use the form inside the page, try using the computer_terminal below to read the html content. +When the page is very long, content truncation may occur due to the limited display capacity of the viewport. You need to carefully consider whether additional page down is needed to ensure that you have obtained the complete information. +- informational_web_search(query: str) -> str: + Perform an INFORMATIONAL web search query and return the search results. +- navigational_web_search(query: str) -> str: + Perform a NAVIGATIONAL web search query and immediately navigate to the top result. Useful, for example, to navigate to a particular Wikipedia article or other known destination. Equivalent to Google's "I'm Feeling Lucky" button. +- visit_page(url: str) -> str: + Visit a webpage at a given URL and return its text. +- page_up() -> str: + Scroll the viewport UP one page-length in the current webpage and return the new viewport content. +- page_down() -> str: + Scroll the viewport DOWN one page-length in the current webpage and return the new viewport content. +- bottom_of_page() -> str: + Scroll the viewport to the BOTTOM of the page. This can be useful for long pages when you want to quickly get to the bottom of the current page. +- top_of_page() -> str: + Scroll the viewport to the TOP of the page. This can be useful for long pages when you want to quickly get to the top of the current page. +- download_file(url: str) -> str: + Download a file at a given URL and, if possible, return its text. File types that will returned as text: .pdf, .docx, .xlsx, .pptx, .wav, .mp3, .jpg, .jpeg, .png(You can read the text content of the file with these extensions). +- find_on_page_ctrl_f(search_string: str) -> str: + When the page is too long to be fully displayed in one viewport, you can use this function to scroll the viewport to the first occurrence of the search string. If the viewport has already displayed the entire page(Showing page 1 of 1.), there is no need to use this function. This is equivalent to Ctrl+F. This search string supports wildcards like '*' +- find_next() -> str: + Scroll the viewport to the next occurrence of the search string. + +# Step History +```text +{steps} +``` + +# Current Step Tool Result +Tool: {tool} +Args: {args} +``` +{tool_result} +``` + +# Instructions +0. Analyze the given tool result to extract relevant information directly contributing to answering the question. +1. If the desired answers for the question exist in the Current Step Tool Result, you have to put all of them in your response under the 'Facts' section. +2. Verify the information against the original question to ensure accuracy. +3. Record new facts only if they provide unique information not already found in the step history. +4. If the current tool result directly answers the question, return the answer and explain why no further steps are necessary. +5. If the current tool result is insufficient, plan a follow-up step to gather more data. +6. Choose the next tool and query that efficiently leads to the ultimate goal. +7. Minimize unnecessary steps by focusing on direct and efficient methods to gather required information. +8. Explain why you chose the next step and how it contributes to answering the question. +9. Do not change the format and precision of the results, as a dedicated person will handle the final formatting. +10. Your reply will be sent to the next agent for further action, so it is necessary to record all the information needed by the next agent in the plan (such as the complete URL of the link that needs to be clicked, or complete list of things to be able to answer the main question). +11. If there are files, or images that need to be downloaded, try to download them in your next step. If not, make sure to always provide their URLs so that other agents can take care of downloading them. + +Your response have to follow the format below. Make sure to include full list of findings from Current Tool Results so that next agent can answer the question. + +Response Format: +```text +Facts: + 1. Address: xxxx, Title: xxxx, Viewport position: xxxx + xxxxx + 2. Address: xxxx, Title: xxxx, Viewport position: xxxx + xxxxx +Explanation: + xxxx +Plan: + xxxx +``` + +# Web Search Plan Tips +- Always start with visiting the most relevant website. Sometimes the desired answer is hidden under some buttons, tabs, or scrollable lists in the website. If it's the case, you should be able to click those buttons to unhide the answers. +- If there is no mentions of specific date for website view histories, then always go with the up-to-date page. However, if there are mentions of certain dates, then make sure to visit relevant history pages before fetching information from web pages. If this is the case, look for tabs like 'History', 'View history' to locate old pages via find_on_page_ctrl_f. +- If you need to search through large number of pages of a forum, or large number of history pages of a website, and you need to visit each of them to check some facts, use binary search to visit pages. E.g. always find the middle page first and visit it, if not find the desired answer then keep searching with the right half of the search space and find midle page of the right half space. If not, do the same for left half space, until you reach your desired answer. +- If you need to go through history of a Wikipedia page, you can use of the following example link: https://en.wikipedia.org/w/index.php?title=Turkey&action=history&date-range-to=2012-01-31&offset=&limit=50 +- With the example link above, if you pass a date as 'date-range-to=2012-01-31', you'll get history results up-to&including the date 2012-01-31 and you'll get results for the last '50' history results. Based on user's request, you can change these values. +- Some web pages may have contents like the following [TEXT](https://some.url.com). This means that, you can click or see the content of 'TEXT' by visiting the mentioned URL next to that, which is https://some.url.com. + +Important: User is never able to see the current viewport, the user will always see your expected response. That's why, you always have to provide expected answers under 'Facts' section of your response. +Don't assume that the user is able to see the current viewport as you do. \ No newline at end of file diff --git a/openai_server/browser/utils.py b/openai_server/browser/utils.py new file mode 100644 index 000000000..c5990c839 --- /dev/null +++ b/openai_server/browser/utils.py @@ -0,0 +1,412 @@ +# This file incorporates code from the AutoGen. +# The original code can be found at: +# https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/browser_utils.py + +# ruff: noqa: E722 +import os +import requests +import re +import uuid +import mimetypes +import time +import pathlib +import pathvalidate +from urllib.parse import urljoin, urlparse, unquote +from typing import Any, Dict, List, Optional, Union, Tuple +from .mdconvert import MarkdownConverter, UnsupportedFormatException, FileConversionException + +import diskcache as dc +class SimpleTextBrowser: + """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" + + def __init__( + self, + start_page: Optional[str] = None, + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + bing_api_key: Optional[Union[str, None]] = None, + request_kwargs: Optional[Union[Dict[str, Any], None]] = None, + bing_cache = dc.Cache(f".cache/bing"), + ): + self.start_page: str = start_page if start_page else "about:blank" + self.viewport_size = viewport_size # Applies only to the standard uri types + self.downloads_folder = downloads_folder + self.history: List[Tuple[str, float]] = list() + self.page_title: Optional[str] = None + self.viewport_current_page = 0 + self.viewport_pages: List[Tuple[int, int]] = list() + self.set_address(self.start_page) + self.bing_api_key = bing_api_key + self.request_kwargs = request_kwargs + self._mdconvert = MarkdownConverter() + self._page_content: str = "" + + self._find_on_page_query: Union[str, None] = None + self._find_on_page_last_result: Union[int, None] = None # Location of the last result + + self.bing_cache = bing_cache + + @property + def address(self) -> str: + """Return the address of the current page.""" + return self.history[-1][0] + + def set_address(self, uri_or_path: str) -> None: + # TODO: Handle anchors + self.history.append((uri_or_path, time.time())) + + # Handle special URIs + if uri_or_path == "about:blank": + self._set_page_content("") + elif uri_or_path.startswith("bing:"): + self._bing_search(uri_or_path[len("bing:") :].strip()) + else: + if ( + not uri_or_path.startswith("http:") + and not uri_or_path.startswith("https:") + and not uri_or_path.startswith("file:") + ): + if len(self.history) > 1: + prior_address = self.history[-2][0] + uri_or_path = urljoin(prior_address, uri_or_path) + # Update the address with the fully-qualified path + self.history[-1] = (uri_or_path, self.history[-1][1]) + self._fetch_page(uri_or_path) + + self.viewport_current_page = 0 + self.find_on_page_query = None + self.find_on_page_viewport = None + + @property + def viewport(self) -> str: + """Return the content of the current viewport.""" + bounds = self.viewport_pages[self.viewport_current_page] + return self.page_content[bounds[0] : bounds[1]] + + @property + def page_content(self) -> str: + """Return the full contents of the current page.""" + return self._page_content + + def _set_page_content(self, content: str) -> None: + """Sets the text content of the current page.""" + self._page_content = content + self._split_pages() + if self.viewport_current_page >= len(self.viewport_pages): + self.viewport_current_page = len(self.viewport_pages) - 1 + + def page_down(self) -> None: + self.viewport_current_page = min(self.viewport_current_page + 1, len(self.viewport_pages) - 1) + + def page_up(self) -> None: + self.viewport_current_page = max(self.viewport_current_page - 1, 0) + + def bottom_of_page(self) -> None: + "Scroll the viewport to the bottom of the page. This can be useful for long pages when you want to quickly get to the bottom. For example, sections like References, External Links, or See Also can be at the bottom of a page." + self.viewport_current_page = len(self.viewport_pages) - 1 + + def top_of_page(self) -> None: + "Scroll the viewport to the top of the page. This can be useful for long pages when you want to quickly get to the top. For example, the Table of Contents, Search Box, or Introduction can be at the top of a page." + self.viewport_current_page = 0 + + def find_on_page(self, query: str) -> Union[str, None]: + """Searches for the query from the current viewport forward, looping back to the start if necessary.""" + + # Did we get here via a previous find_on_page search with the same query? + # If so, map to find_next + if query == self._find_on_page_query and self.viewport_current_page == self._find_on_page_last_result: + return self.find_next() + + # Ok it's a new search start from the current viewport + self._find_on_page_query = query + viewport_match = self._find_next_viewport(query, self.viewport_current_page) + if viewport_match is None: + self._find_on_page_last_result = None + return None + else: + self.viewport_current_page = viewport_match + self._find_on_page_last_result = viewport_match + return self.viewport + + def find_next(self) -> None: + """Scroll to the next viewport that matches the query""" + + if self._find_on_page_query is None: + return None + + starting_viewport = self._find_on_page_last_result + if starting_viewport is None: + starting_viewport = 0 + else: + starting_viewport += 1 + if starting_viewport >= len(self.viewport_pages): + starting_viewport = 0 + + viewport_match = self._find_next_viewport(self._find_on_page_query, starting_viewport) + if viewport_match is None: + self._find_on_page_last_result = None + return None + else: + self.viewport_current_page = viewport_match + self._find_on_page_last_result = viewport_match + return self.viewport + + def _find_next_viewport(self, query: str, starting_viewport: int) -> Union[int, None]: + """Search for matches between the starting viewport looping when reaching the end.""" + + if query is None: + return None + + # Normalize the query, and convert to a regular expression + nquery = re.sub(r"\*", "__STAR__", query) + nquery = " " + (" ".join(re.split(r"\W+", nquery))).strip() + " " + nquery = nquery.replace(" __STAR__ ", "__STAR__ ") # Merge isolated stars with prior word + nquery = nquery.replace("__STAR__", ".*").lower() + + if nquery.strip() == "": + return None + + idxs = list() + idxs.extend(range(starting_viewport, len(self.viewport_pages))) + idxs.extend(range(0, starting_viewport)) + + for i in idxs: + bounds = self.viewport_pages[i] + content = self.page_content[bounds[0] : bounds[1]] + + # TODO: Remove markdown links and images + ncontent = " " + (" ".join(re.split(r"\W+", content))).strip().lower() + " " + if re.search(nquery, ncontent): + return i + + return None + + def visit_page(self, path_or_uri: str) -> str: + """Update the address, visit the page, and return the content of the viewport.""" + self.set_address(path_or_uri) + return self.viewport + + def _split_pages(self) -> None: + # Do not split search results + if self.address.startswith("bing:"): + self.viewport_pages = [(0, len(self._page_content))] + return + + # Handle empty pages + if len(self._page_content) == 0: + self.viewport_pages = [(0, 0)] + return + + # Break the viewport into pages + self.viewport_pages = [] + start_idx = 0 + while start_idx < len(self._page_content): + end_idx = min(start_idx + self.viewport_size, len(self._page_content)) # type: ignore[operator] + # Adjust to end on a space + while end_idx < len(self._page_content) and self._page_content[end_idx - 1] not in [" ", "\t", "\r", "\n"]: + end_idx += 1 + self.viewport_pages.append((start_idx, end_idx)) + start_idx = end_idx + + def _bing_api_call(self, query: str) -> Dict[str, Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]: + # Check the cache + if self.bing_cache is not None: + cached = self.bing_cache.get(query) + if cached is not None: + return cached + # Make sure the key was set + if self.bing_api_key is None: + raise ValueError("Missing Bing API key.") + + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + + if "headers" not in request_kwargs: + request_kwargs["headers"] = {} + request_kwargs["headers"]["Ocp-Apim-Subscription-Key"] = self.bing_api_key + + if "params" not in request_kwargs: + request_kwargs["params"] = {} + request_kwargs["params"]["q"] = query + request_kwargs["params"]["textDecorations"] = False + request_kwargs["params"]["textFormat"] = "raw" + + request_kwargs["stream"] = False + + # Make the request + response = None + for _ in range(10): + try: + response = requests.get("https://api.bing.microsoft.com/v7.0/search", **request_kwargs) + response.raise_for_status() + break + except Exception: + pass + time.sleep(1) + if response is None: + raise requests.exceptions.RequestException("Failed to fetch Bing search results.") + results = response.json() + + # Cache the results + if self.bing_cache is not None: + self.bing_cache.set(query, results) + + return results # type: ignore[no-any-return] + + def _bing_search(self, query: str) -> None: + results = self._bing_api_call(query) + + def _prev_visit(url): + for i in range(len(self.history) - 1, -1, -1): + if self.history[i][0] == url: + # Todo make this more human-friendly + return f"You previously visited this page {round(time.time() - self.history[i][1])} seconds ago.\n" + return "" + + web_snippets: List[str] = list() + idx = 0 + if "webPages" in results: + for page in results["webPages"]["value"]: + idx += 1 + web_snippets.append( + f"{idx}. [{page['name']}]({page['url']})\n{_prev_visit(page['url'])}{page['snippet']}" + ) + if "deepLinks" in page: + for dl in page["deepLinks"]: + idx += 1 + web_snippets.append( + f"{idx}. [{dl['name']}]({dl['url']})\n{_prev_visit(dl['url'])}{dl['snippet'] if 'snippet' in dl else ''}" + ) + + news_snippets = list() + if "news" in results: + for page in results["news"]["value"]: + idx += 1 + datePublished = "" + if "datePublished" in page: + datePublished = "\nDate published: " + page["datePublished"].split("T")[0] + news_snippets.append( + f"{idx}. [{page['name']}]({page['url']})\n{_prev_visit(page['url'])}{page['description']}{datePublished}" + ) + + video_snippets = list() + if "videos" in results: + for page in results["videos"]["value"]: + if not page["contentUrl"].startswith("https://www.youtube.com/watch?v="): + continue + idx += 1 + datePublished = "" + if "datePublished" in page: + datePublished = "\nDate published: " + page["datePublished"].split("T")[0] + video_snippets.append( + f"{idx}. [{page['name']}]({page['contentUrl']})\n{_prev_visit(page['contentUrl'])}{page.get('description', '')}{datePublished}" + ) + + self.page_title = f"{query} - Search" + + content = ( + f"A Bing search for '{query}' found {len(web_snippets) + len(news_snippets) + len(video_snippets)} results:\n\n## Web Results\n" + + "\n\n".join(web_snippets) + ) + if len(news_snippets) > 0: + content += "\n\n## News Results:\n" + "\n\n".join(news_snippets) + if len(video_snippets) > 0: + content += "\n\n## Video Results:\n" + "\n\n".join(video_snippets) + + self._set_page_content(content) + + def _fetch_page(self, url: str) -> None: + download_path = "" + response = None + print(f'Fetching page: {url}') + try: + if url.startswith("file://"): + download_path = os.path.normcase(os.path.normpath(unquote(url[7:]))) + res = self._mdconvert.convert_local(download_path) + self.page_title = res.title + self._set_page_content(res.text_content) + else: + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + request_kwargs["stream"] = True + + # Send a HTTP request to the URL + response = requests.get(url, **request_kwargs) + response.raise_for_status() + + # If the HTTP request was successful + content_type = response.headers.get("content-type", "") + + # Text or HTML + if "text/" in content_type.lower(): + res = self._mdconvert.convert_response(response) + self.page_title = res.title + self._set_page_content(res.text_content) + # A download + else: + # Try producing a safe filename + fname = None + download_path = None + try: + fname = pathvalidate.sanitize_filename(os.path.basename(urlparse(url).path)).strip() + download_path = os.path.abspath(os.path.join(self.downloads_folder, fname)) + + suffix = 0 + while os.path.exists(download_path) and suffix < 1000: + suffix += 1 + base, ext = os.path.splitext(fname) + new_fname = f"{base}__{suffix}{ext}" + download_path = os.path.abspath(os.path.join(self.downloads_folder, new_fname)) + + except NameError: + pass + + # No suitable name, so make one + if fname is None: + extension = mimetypes.guess_extension(content_type) + if extension is None: + extension = ".download" + fname = str(uuid.uuid4()) + extension + download_path = os.path.abspath(os.path.join(self.downloads_folder, fname)) + + # Open a file for writing + with open(download_path, "wb") as fh: + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + + # Render it + local_uri = pathlib.Path(download_path).as_uri() + self.set_address(local_uri) + + except UnsupportedFormatException as e: + print(f'Unsupported format: {e}') + self.page_title = ("Download complete.",) + self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'") + except FileConversionException as e: + print(f'File conversion error: {e}') + self.page_title = ("Download complete.",) + self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'") + except FileNotFoundError: + self.page_title = "Error 404" + self._set_page_content(f"## Error 404\n\nFile not found: {download_path}") + except requests.exceptions.RequestException: + if response is None: + self.page_title = "Error" + self._set_page_content(f"## Error\n\nFailed to fetch '{url}'") + else: + self.page_title = f"Error {response.status_code}" + + # If the error was rendered in HTML we might as well render it + content_type = response.headers.get("content-type", "") + if content_type is not None and "text/html" in content_type.lower(): + res = self._mdconvert.convert(response) + self.page_title = f"Error {response.status_code}" + text_content = getattr(res, "text_content", None) + self._set_page_content(f"## Error {response.status_code}\n\n{text_content}") + else: + text = "" + for chunk in response.iter_content(chunk_size=512, decode_unicode=True): + if type(chunk) == str: + text += chunk + self.page_title = f"Error {response.status_code}" + self._set_page_content(f"## Error {response.status_code}\n\n{text}") diff --git a/requirements.txt b/requirements.txt index 058f5fab8..54ffe824d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -107,4 +107,12 @@ tabulate>=0.9.0 packaging>=23.1 jsonschema>=4.23.0 -spacy==3.7.5 \ No newline at end of file +spacy==3.7.5 + +# for browser tool +pathvalidate==3.2.0 +puremagic==1.23 +mammoth==1.7.1 +easyocr==1.7.1 +youtube_transcript_api==0.6.2 +openai_whisper==20231117