Skip to content

Commit

Permalink
Update use of autogen_code_restrictions_level by copying code over fo…
Browse files Browse the repository at this point in the history
…r _execute_code_dont_check_setup
  • Loading branch information
pseudotensor committed Sep 11, 2024
1 parent 34d2ee5 commit 56ea0e4
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 12 deletions.
128 changes: 117 additions & 11 deletions openai_server/autogen_utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
import logging
import os
import re
import subprocess
import sys
import typing
from typing import List
from hashlib import md5
from pathlib import Path
from typing import Any, Callable, ClassVar, Dict, List, Optional, Union
from types import SimpleNamespace
import uuid

from autogen.code_utils import PYTHON_VARIANTS, WIN32, _cmd, TIMEOUT_MSG
from autogen.coding import LocalCommandLineCodeExecutor, CodeBlock
from autogen.coding.base import CommandLineCodeResult
from autogen import ConversableAgent
from autogen import GroupChatManager
import backoff

from autogen.coding.func_with_reqs import (
FunctionWithRequirements,
FunctionWithRequirementsStr,
)
from autogen.coding.utils import silence_pip, _get_file_name_from_content

from typing_extensions import ParamSpec

A = ParamSpec("A")

from openai_server.autogen_streaming import iostream_generator
from openai_server.backend_utils import convert_gen_kwargs
from openai_server.agent_utils import in_pycharm, set_python_path
Expand All @@ -23,6 +38,19 @@


class H2OLocalCommandLineCodeExecutor(LocalCommandLineCodeExecutor):
def __init__(
self,
timeout: int = 60,
virtual_env_context: Optional[SimpleNamespace] = None,
work_dir: Union[Path, str] = Path("."),
functions: List[Union[FunctionWithRequirements[Any, A], Callable[..., Any], FunctionWithRequirementsStr]] = [],
functions_module: str = "functions",
execution_policies: Optional[Dict[str, bool]] = None,
autogen_code_restrictions_level: int = 2,
):
super().__init__(timeout, virtual_env_context, work_dir, functions, functions_module, execution_policies)
self.autogen_code_restrictions_level = autogen_code_restrictions_level

@staticmethod
def remove_comments_strings(code: str, lang: str) -> str:
if verbose:
Expand Down Expand Up @@ -145,6 +173,87 @@ def sanitize_command(lang: str, code: str) -> None:
if match.group(f"pat{i}"):
raise ValueError(f"{danger_mark}: {patterns[pattern]}\n\n{cleaned_code}")

def __execute_code_dont_check_setup(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult:
# nearly identical to parent, but with control over guardrails via self.sanitize_command
logs_all = ""
file_names = []
exitcode = -2
for code_block in code_blocks:
lang, code = code_block.language, code_block.code
lang = lang.lower()

if self.autogen_code_restrictions_level >= 2:
self.sanitize_command(lang, code)
elif self.autogen_code_restrictions_level == 1:
LocalCommandLineCodeExecutor.sanitize_command(lang, code)
code = silence_pip(code, lang)

if lang in PYTHON_VARIANTS:
lang = "python"

if WIN32 and lang in ["sh", "shell"]:
lang = "ps1"

if lang not in self.SUPPORTED_LANGUAGES:
# In case the language is not supported, we return an error message.
exitcode = 1
logs_all += "\n" + f"unknown language {lang}"
break

execute_code = self.execution_policies.get(lang, False)
try:
# Check if there is a filename comment
filename = _get_file_name_from_content(code, self._work_dir)
except ValueError:
return CommandLineCodeResult(exit_code=1, output="Filename is not in the workspace")

if filename is None:
# create a file with an automatically generated name
code_hash = md5(code.encode()).hexdigest()
filename = f"tmp_code_{code_hash}.{'py' if lang.startswith('python') else lang}"
written_file = (self._work_dir / filename).resolve()
with written_file.open("w", encoding="utf-8") as f:
f.write(code)
file_names.append(written_file)

if not execute_code:
# Just return a message that the file is saved.
logs_all += f"Code saved to {str(written_file)}\n"
exitcode = 0
continue

program = _cmd(lang)
cmd = [program, str(written_file.absolute())]
env = os.environ.copy()

if self._virtual_env_context:
virtual_env_abs_path = os.path.abspath(self._virtual_env_context.bin_path)
path_with_virtualenv = rf"{virtual_env_abs_path}{os.pathsep}{env['PATH']}"
env["PATH"] = path_with_virtualenv
if WIN32:
activation_script = os.path.join(virtual_env_abs_path, "activate.bat")
cmd = [activation_script, "&&", *cmd]

try:
result = subprocess.run(
cmd, cwd=self._work_dir, capture_output=True, text=True, timeout=float(self._timeout), env=env
)
except subprocess.TimeoutExpired:
logs_all += "\n" + TIMEOUT_MSG
# Same exit code as the timeout command on linux.
exitcode = 124
break

logs_all += result.stderr
logs_all += result.stdout
exitcode = result.returncode

if exitcode != 0:
break

code_file = str(file_names[0]) if len(file_names) > 0 else None
return CommandLineCodeResult(exit_code=exitcode, output=logs_all, code_file=code_file)

def _execute_code_dont_check_setup(self, code_blocks: List[CodeBlock]) -> CommandLineCodeResult:
try:
# skip code blocks with # execution: false
Expand All @@ -164,15 +273,16 @@ def _execute_code_dont_check_setup(self, code_blocks: List[CodeBlock]) -> Comman
os.environ['TERM'] = 'dumb'
""" + code_block.code

ret = super()._execute_code_dont_check_setup(code_blocks)
except Exception as e:
if 'exitcode' in str(e) and 'local variable' in str(e):
return CommandLineCodeResult(exit_code=0,
ret = self.__execute_code_dont_check_setup(code_blocks)

if ret.exit_code == -2 and len(code_blocks) > 0:
ret = CommandLineCodeResult(exit_code=0,
output='Code block present, but no code executed (execution tag was false or not present for all code blocks). This is expected if you had code blocks but they were not meant for python or shell execution. For example, you may have shown code for demonstration purposes. If this is expected, then move on normally without concern.')
except Exception as e:
if danger_mark in str(e):
print(f"Code Danger Error: {e}\n\n{code_blocks}", file=sys.stderr)
# dont' fail, just return the error so LLM can adjust
return CommandLineCodeResult(exit_code=1, output=str(e))
ret = CommandLineCodeResult(exit_code=1, output=str(e))
else:
raise
try:
Expand All @@ -181,7 +291,7 @@ def _execute_code_dont_check_setup(self, code_blocks: List[CodeBlock]) -> Comman
if bad_output_mark in str(e):
print(f"Code Output Danger Error: {e}\n\n{code_blocks}\n\n{ret}", file=sys.stderr)
# dont' fail, just return the error so LLM can adjust
return CommandLineCodeResult(exit_code=1, output=str(e))
ret = CommandLineCodeResult(exit_code=1, output=str(e))
else:
raise
ret = self.truncate_output(ret)
Expand Down Expand Up @@ -290,10 +400,6 @@ def truncate_output(ret: CommandLineCodeResult) -> CommandLineCodeResult:
return ret


# override the original method with the new one
# required because original class does not use super() but references its own method
LocalCommandLineCodeExecutor.sanitize_command = H2OLocalCommandLineCodeExecutor.sanitize_command

error_patterns = [
r"Rate limit reached",
r"Connection timeout",
Expand Down
2 changes: 1 addition & 1 deletion src/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "122332ef576358589f3dff64301e7ea0622870f8"
__version__ = "34d2ee55eee948203c15f6a3cd355d347b0029ae"

0 comments on commit 56ea0e4

Please sign in to comment.