From 06df8c90f7d11d33a2781abe789165fbee92dac3 Mon Sep 17 00:00:00 2001 From: evans <58369673+Johnnyevans32@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:39:20 +0100 Subject: [PATCH] feat: code complexity analyzer toolkit (#70) --- .gitignore | 3 + pyproject.toml | 2 + .../toolkits/complexity_analyzer.py | 167 ++++++++++++++++++ tests/toolkits/test_complexity_analyzer.py | 85 +++++++++ 4 files changed, 257 insertions(+) create mode 100644 src/goose_plugins/toolkits/complexity_analyzer.py create mode 100644 tests/toolkits/test_complexity_analyzer.py diff --git a/.gitignore b/.gitignore index e68d1cf..5461955 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,9 @@ share/python-wheels/ *.egg MANIFEST + +.DS_Store + # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. diff --git a/pyproject.toml b/pyproject.toml index 8db0362..d46d9fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.10" dependencies = [ "ai-exchange>=0.8.4", "goose-ai>=0.9.8", + "radon>=6.0.1", ] author = [{ name = "Block", email = "ai-oss-tools@block.xyz" }] packages = [{ include = "goose_plugins", from = "src" }] @@ -20,6 +21,7 @@ goose-plugins = "goose_plugins:module_name" [project.entry-points."goose.toolkit"] artify = "goose_plugins.toolkits.artify:VincentVanCode" todo = "goose_plugins.toolkits.todo:TodoToolkit" +complexity_analyzer = "goose_plugins.toolkits.complexity_analyzer:CodeComplexityToolkit" [build-system] diff --git a/src/goose_plugins/toolkits/complexity_analyzer.py b/src/goose_plugins/toolkits/complexity_analyzer.py new file mode 100644 index 0000000..027f52e --- /dev/null +++ b/src/goose_plugins/toolkits/complexity_analyzer.py @@ -0,0 +1,167 @@ +import os +import ast +from goose.toolkit.base import Toolkit, tool +import radon.complexity as rc +import radon.metrics as rm + + +class CodeComplexityToolkit(Toolkit): + """A toolkit for analyzing the complexity of Python code in a given directory.""" + + def __init__(self, *args: tuple, **kwargs: dict) -> None: + super().__init__(*args, **kwargs) + + @tool + def get_python_files(self, directory: str) -> list: + """Retrieve all Python files from the specified directory. + + Args: + directory (str): The directory to search for Python files. + + Returns: + list: A list of paths to all Python files in the directory. + """ + return [ + os.path.join(root, file) + for root, _, files in os.walk(directory) + for file in files + if file.endswith(".py") + ] + + @tool + def analyze_complexity(self, directory: str) -> dict: + """Analyze the complexity of Python code in a directory. + + Args: + directory (str): The path to the directory containing Python files to analyze. + + Returns: + dict: A dictionary containing the average complexity metrics (Cyclomatic Complexity, Halstead Metrics, + and Maintainability Index) for all Python files in the directory, or an error message if no + valid Python files are found. + """ + python_files = self.get_python_files(directory) + if not python_files: + return {"error": f"No Python files found in the directory: {directory}"} + + complexity_results = { + "cyclomatic_complexity": 0, + "halstead_metrics": 0, + "maintainability_index": 0, + "file_count": 0, + } + + for file in python_files: + try: + with open(file, "r", encoding="utf-8") as f: + code = f.read() + + # Process each complexity metric and update the results + complexity_results[ + "cyclomatic_complexity" + ] += self.cyclomatic_complexity(code) + halstead_result = self.halstead_complexity(code) + complexity_results["halstead_metrics"] += ( + halstead_result["halstead_volume"] if halstead_result else 0 + ) + complexity_results[ + "maintainability_index" + ] += self.maintainability_index(code) + complexity_results["file_count"] += 1 + + except Exception as e: + complexity_results["error"] = f"Error processing {file}: {str(e)}" + continue + + if complexity_results["file_count"] > 0: + # Average the results + return { + "avg_cyclomatic_complexity": complexity_results["cyclomatic_complexity"] + / complexity_results["file_count"], + "avg_halstead_complexity": complexity_results["halstead_metrics"] + / complexity_results["file_count"], + "avg_maintainability_index": complexity_results["maintainability_index"] + / complexity_results["file_count"], + } + else: + return {"error": "No valid Python files to analyze."} + + @tool + def cyclomatic_complexity(self, code: str) -> int: + """Calculate the Cyclomatic Complexity of a given Python code. + + Args: + code (str): The Python code as a string to analyze. + + Returns: + int: The Cyclomatic Complexity of the code. + """ + try: + complexity_list = rc.cc_visit(ast.parse(code)) + total_complexity = 0 + + # Iterate over each item in the complexity list + for item in complexity_list: + if hasattr(item, "complexity"): + # Add complexity of the function or class's top-level complexity + total_complexity += item.complexity + + # For classes, add complexity of methods if any + if hasattr(item, "methods"): + for method in item.methods: + total_complexity += method.complexity + return total_complexity + except Exception as e: + print(e) + self.notifier.log(f"Error calculating cyclomatic complexity: {str(e)}") + return 0 + + @tool + def halstead_complexity(self, code: str) -> dict: + """Calculate Halstead Complexity metrics of the given Python code. + + Args: + code (str): The Python code as a string to analyze. + + Returns: + dict: A dictionary containing the Halstead metrics, including 'halstead_volume'. + """ + from radon.metrics import h_visit + + try: + halstead_report = h_visit(code) + return { + "halstead_volume": halstead_report.total.volume, + "details": { + "vocabulary": halstead_report.total.vocabulary, + "length": halstead_report.total.length, + "calculated_length": halstead_report.total.calculated_length, + "difficulty": halstead_report.total.difficulty, + "effort": halstead_report.total.effort, + "time": halstead_report.total.time, + "bugs": halstead_report.total.bugs, + }, + } + except Exception as e: + print(e) + self.notifier.log(f"Error calculating Halstead complexity: {str(e)}") + return {} + + @tool + def maintainability_index(self, code: str) -> int: + """Calculate the Maintainability Index of the given Python code. + + Args: + code (str): The Python code as a string to analyze. + + Returns: + int: The Maintainability Index of the code. + """ + + try: + mi_score = rm.mi_visit(code, multi=True) + return mi_score + except Exception as e: + print(e) + self.notifier.log(f"Error calculating maintainability index: {str(e)}") + return 0 diff --git a/tests/toolkits/test_complexity_analyzer.py b/tests/toolkits/test_complexity_analyzer.py new file mode 100644 index 0000000..2122d27 --- /dev/null +++ b/tests/toolkits/test_complexity_analyzer.py @@ -0,0 +1,85 @@ +import pytest +from unittest.mock import MagicMock +from goose_plugins.toolkits.complexity_analyzer import CodeComplexityToolkit + + +@pytest.fixture +def toolkit(): + toolkit = CodeComplexityToolkit(notifier=MagicMock()) + return toolkit + + +def test_get_python_files(toolkit): + directory = "test_directory" + + # Simulate os.walk to mock the file retrieval process + toolkit.get_python_files = MagicMock( + return_value=["test_file.py", "another_test_file.py"] + ) + + result = toolkit.get_python_files(directory) + + # Check that the mocked method was called with the correct argument + toolkit.get_python_files.assert_called_with(directory) + assert result == ["test_file.py", "another_test_file.py"] + + +def test_analyze_complexity(toolkit): + directory = "test_directory" + + # Mock methods that would be used during complexity analysis + toolkit.get_python_files = MagicMock(return_value=["test_file.py"]) + toolkit.cyclomatic_complexity = MagicMock(return_value=5) + toolkit.halstead_complexity = MagicMock(return_value={"halstead_volume": 100}) + toolkit.maintainability_index = MagicMock(return_value=70) + + # Mock file content reading + with open("test_file.py", "w") as f: + f.write("def example_function():\n return 42") + + result = toolkit.analyze_complexity(directory) + assert "avg_cyclomatic_complexity" in result + assert "avg_halstead_complexity" in result + assert "avg_maintainability_index" in result + + +def test_cyclomatic_complexity(toolkit): + code = "def test_func():\n if True:\n return 1" + + try: + result = toolkit.cyclomatic_complexity(code) + except Exception as e: + result = None + toolkit.notifier.log.assert_called_with( + f"Error calculating cyclomatic complexity: {str(e)}" + ) + + assert result == 2 + + +def test_halstead_complexity(toolkit): + code = "def test_func():\n return 42" + + try: + result = toolkit.halstead_complexity(code) + except Exception as e: + result = None + toolkit.notifier.log.assert_called_with( + f"Error calculating Halstead complexity: {str(e)}" + ) + + assert isinstance(result, dict) + + +def test_maintainability_index(toolkit): + code = "def test_func():\n return 42" + + try: + result = toolkit.maintainability_index(code) + except Exception as e: + result = None + toolkit.notifier.log.assert_called_with( + f"Error calculating maintainability index: {str(e)}" + ) + + assert isinstance(result, float) or isinstance(result, int)