From c55bc3f31eb314e550e4930456aff6a14197f5d5 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Fri, 17 Nov 2023 22:31:54 +0530 Subject: [PATCH 1/6] move result table to report --- .../test_results.py => report/table.py} | 0 src/offat/tester/tester_utils.py | 2 +- src/poetry.lock | 69 ++++++++++++++++++- src/pyproject.toml | 1 + 4 files changed, 70 insertions(+), 2 deletions(-) rename src/offat/{tester/test_results.py => report/table.py} (100%) diff --git a/src/offat/tester/test_results.py b/src/offat/report/table.py similarity index 100% rename from src/offat/tester/test_results.py rename to src/offat/report/table.py diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index bbcae79..b6bba82 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -6,7 +6,7 @@ from .post_test_processor import PostRunTests from .test_generator import TestGenerator from .test_runner import TestRunner -from .test_results import TestResultTable +from ..report.table import TestResultTable from ..report.generator import ReportGenerator from ..logger import create_logger from ..openapi import OpenAPIParser diff --git a/src/poetry.lock b/src/poetry.lock index daa46b4..3f4e9ef 100644 --- a/src/poetry.lock +++ b/src/poetry.lock @@ -564,6 +564,41 @@ files = [ {file = "lazy_object_proxy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:db1c1722726f47e10e0b5fdbf15ac3b8adb58c091d12b3ab713965795036985f"}, ] +[[package]] +name = "markdown-it-py" +version = "3.0.0" +description = "Python port of markdown-it. Markdown parsing, done right!" +optional = false +python-versions = ">=3.8" +files = [ + {file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"}, + {file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"}, +] + +[package.dependencies] +mdurl = ">=0.1,<1.0" + +[package.extras] +benchmarking = ["psutil", "pytest", "pytest-benchmark"] +code-style = ["pre-commit (>=3.0,<4.0)"] +compare = ["commonmark (>=0.9,<1.0)", "markdown (>=3.4,<4.0)", "mistletoe (>=1.0,<2.0)", "mistune (>=2.0,<3.0)", "panflute (>=2.3,<3.0)"] +linkify = ["linkify-it-py (>=1,<3)"] +plugins = ["mdit-py-plugins"] +profiling = ["gprof2dot"] +rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"] +testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"] + +[[package]] +name = "mdurl" +version = "0.1.2" +description = "Markdown URL utilities" +optional = false +python-versions = ">=3.7" +files = [ + {file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"}, + {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, +] + [[package]] name = "multidict" version = "6.0.4" @@ -882,6 +917,20 @@ files = [ [package.dependencies] typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" +[[package]] +name = "pygments" +version = "2.16.1" +description = "Pygments is a syntax highlighting package written in Python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "Pygments-2.16.1-py3-none-any.whl", hash = "sha256:13fc09fa63bc8d8671a6d247e1eb303c4b343eaee81d861f3404db2935653692"}, + {file = "Pygments-2.16.1.tar.gz", hash = "sha256:1daff0494820c69bc8941e407aa20f577374ee88364ee10a98fdbe0aece96e29"}, +] + +[package.extras] +plugins = ["importlib-metadata"] + [[package]] name = "pyrsistent" version = "0.19.3" @@ -1054,6 +1103,24 @@ files = [ [package.dependencies] six = "*" +[[package]] +name = "rich" +version = "13.7.0" +description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" +optional = false +python-versions = ">=3.7.0" +files = [ + {file = "rich-13.7.0-py3-none-any.whl", hash = "sha256:6da14c108c4866ee9520bbffa71f6fe3962e193b7da68720583850cd4548e235"}, + {file = "rich-13.7.0.tar.gz", hash = "sha256:5cb5123b5cf9ee70584244246816e9114227e0b98ad9176eede6ad54bf5403fa"}, +] + +[package.dependencies] +markdown-it-py = ">=2.2.0" +pygments = ">=2.13.0,<3.0.0" + +[package.extras] +jupyter = ["ipywidgets (>=7.5.1,<9)"] + [[package]] name = "rq" version = "1.15.1" @@ -1489,4 +1556,4 @@ api = ["fastapi", "python-dotenv", "redis", "rq", "uvicorn"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "731a24540743243fd4387cfbe710834f243b4213be6408df32b52de1ef9ecaa0" +content-hash = "9e2233ca496d15d519487945c67872eeb59c25bf28543ab22267b7268fca34db" diff --git a/src/pyproject.toml b/src/pyproject.toml index 9d92b5e..82c983b 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -19,6 +19,7 @@ uvicorn = {extras = ["standard"], version = "^0.23.2", optional = true} rq = {version = "^1.15.1", optional = true} redis = {version = "^5.0.0", optional = true} python-dotenv = {version = "^1.0.0", optional = true} +rich = "^13.7.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.0" From 581bd74f75975ead72dbfc9e6144190283523434 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Fri, 17 Nov 2023 22:39:54 +0530 Subject: [PATCH 2/6] move result table to report --- src/offat/report/table.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/offat/report/table.py b/src/offat/report/table.py index 0e7012d..aca6b64 100644 --- a/src/offat/report/table.py +++ b/src/offat/report/table.py @@ -1,5 +1,8 @@ from tabulate import tabulate +from rich.console import Console +from rich.table import Table + class TestResultTable: def __init__(self, tablefmt: str = 'heavy_outline', headers: str = 'keys', *args, **kwargs) -> None: @@ -9,7 +12,10 @@ def __init__(self, tablefmt: str = 'heavy_outline', headers: str = 'keys', *args self.kwargs = kwargs def generate_result_table(self, results: list, filter_passed_results: bool = True): - return tabulate(self._sanitize_results(results, filter_passed_results), headers=self.headers, tablefmt=self.tablefmt, *self.args, **self.kwargs) + results = self._sanitize_results(results, filter_passed_results) + table = Table() + print(results) + return [] def _sanitize_results(self, results: list, filter_passed_results: bool = True, is_leaking_data: bool = False): if filter_passed_results: From 348ec4599a2913bc86aca34b13b9d0b3e4d8edc4 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Sat, 18 Nov 2023 00:01:22 +0530 Subject: [PATCH 3/6] use rich for tabulating results --- src/offat/report/table.py | 47 ++++++++++++++++++++++---------- src/offat/tester/tester_utils.py | 8 ++++-- src/poetry.lock | 16 +---------- src/pyproject.toml | 1 - 4 files changed, 38 insertions(+), 34 deletions(-) diff --git a/src/offat/report/table.py b/src/offat/report/table.py index aca6b64..e69af87 100644 --- a/src/offat/report/table.py +++ b/src/offat/report/table.py @@ -1,21 +1,38 @@ -from tabulate import tabulate - from rich.console import Console -from rich.table import Table +from rich.table import Table, Column class TestResultTable: - def __init__(self, tablefmt: str = 'heavy_outline', headers: str = 'keys', *args, **kwargs) -> None: - self.tablefmt = tablefmt - self.headers = headers - self.args = args - self.kwargs = kwargs + def __init__(self, table_width_percentage: float = 98, ) -> None: + self.console = Console() + self.table_width_percentage = table_width_percentage + + def print_table(self, table: Table): + terminal_width = self.console.width + table_width = int(terminal_width * (self.table_width_percentage / 100)) + table.width = table_width + + self.console.print(table, width=table_width, overflow='fold') + self.console.rule() + + def extract_result_table_cols(self, results: list[dict]) -> list[str]: + return sorted({key for dictionary in results for key in dictionary.keys()}) + + def generate_result_cols(self, results_list: list[dict]) -> list[Column]: + return [Column(header=col_header, overflow='fold') for col_header in self.extract_result_table_cols(results_list)] def generate_result_table(self, results: list, filter_passed_results: bool = True): results = self._sanitize_results(results, filter_passed_results) - table = Table() - print(results) - return [] + cols = self.generate_result_cols(results) + table = Table(*cols) + + for result in results: + table_row = [] + for col in cols: + table_row.append(str(result[col.header])) + table.add_row(*table_row) + + return table def _sanitize_results(self, results: list, filter_passed_results: bool = True, is_leaking_data: bool = False): if filter_passed_results: @@ -25,9 +42,9 @@ def _sanitize_results(self, results: list, filter_passed_results: bool = True, i # remove keys based on conditions or update their values for result in results: if result['result']: - result['result'] = u"\u2713" + result['result'] = u"[bold green]Passed \u2713[/bold green]" else: - result['result'] = u"\u00d7" + result['result'] = u"[bold red]Failed \u00d7[/bold red]" if not is_leaking_data: del result['response_headers'] @@ -47,9 +64,9 @@ def _sanitize_results(self, results: list, filter_passed_results: bool = True, i del result['response_match_regex'] if result.get('data_leak'): - result['data_leak'] = u"\u2713" + result['data_leak'] = u"[bold red]Leak Found \u00d7[/bold red]" else: - result['data_leak'] = u"\u00d7" + result['data_leak'] = u"[bold green]No Leak \u2713[/bold green]" if not isinstance(result.get('malicious_payload'), str): del result['malicious_payload'] diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index b6bba82..7625a6a 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -10,7 +10,6 @@ from ..report.generator import ReportGenerator from ..logger import create_logger from ..openapi import OpenAPIParser -from ..utils import write_json_to_file logger = create_logger(__name__) @@ -50,9 +49,12 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional test_results = PostRunTests.detect_data_exposure(test_results) # print results - results = test_table_generator.generate_result_table( + results_table = test_table_generator.generate_result_table( deepcopy(test_results)) - print(results) + + if results_table.columns: + test_table_generator.print_table(results_table) + return test_results diff --git a/src/poetry.lock b/src/poetry.lock index 3f4e9ef..4358e7f 100644 --- a/src/poetry.lock +++ b/src/poetry.lock @@ -1239,20 +1239,6 @@ anyio = ">=3.4.0,<5" [package.extras] full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart", "pyyaml"] -[[package]] -name = "tabulate" -version = "0.9.0" -description = "Pretty-print tabular data" -optional = false -python-versions = ">=3.7" -files = [ - {file = "tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f"}, - {file = "tabulate-0.9.0.tar.gz", hash = "sha256:0095b12bf5966de529c0feb1fa08671671b3368eec77d7ef7ab114be2c068b3c"}, -] - -[package.extras] -widechars = ["wcwidth"] - [[package]] name = "typing-extensions" version = "4.8.0" @@ -1556,4 +1542,4 @@ api = ["fastapi", "python-dotenv", "redis", "rq", "uvicorn"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "9e2233ca496d15d519487945c67872eeb59c25bf28543ab22267b7268fca34db" +content-hash = "ba1f03330ab6bef00a84cd168539b0d4d7871620bed7b8a5db42a6fb30fbae0f" diff --git a/src/pyproject.toml b/src/pyproject.toml index 82c983b..ddf4adb 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -13,7 +13,6 @@ pyyaml = "^6.0" prance = "^23.6.21.0" openapi-spec-validator = "^0.5.7" colorama = "^0.4.6" -tabulate = "^0.9.0" fastapi = {version = "^0.103.1", optional = true} uvicorn = {extras = ["standard"], version = "^0.23.2", optional = true} rq = {version = "^1.15.1", optional = true} From 465fabdde1ca7a0f9fcf64d90468197b029b4df9 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Sat, 18 Nov 2023 02:37:41 +0530 Subject: [PATCH 4/6] add progress bars update logger use single console for rich --- src/offat/api/app.py | 3 +- src/offat/api/jobs.py | 5 +- src/offat/config_data_handler.py | 41 ++++---- src/offat/logger.py | 53 +++------- src/offat/openapi.py | 71 ++++++------- src/offat/report/generator.py | 5 +- src/offat/report/table.py | 8 +- src/offat/tester/test_generator.py | 1 - src/offat/tester/test_runner.py | 26 ++++- src/offat/tester/tester_utils.py | 156 ++++++++++++++++++++--------- src/offat/utils.py | 59 ++++++----- src/poetry.lock | 2 +- src/pyproject.toml | 1 - 13 files changed, 233 insertions(+), 198 deletions(-) diff --git a/src/offat/api/app.py b/src/offat/api/app.py index 99882bc..b959a92 100644 --- a/src/offat/api/app.py +++ b/src/offat/api/app.py @@ -2,11 +2,10 @@ from offat.api.config import app, task_queue, task_timeout, auth_secret_key from offat.api.jobs import scan_api from offat.api.models import CreateScanModel -from offat.logger import create_logger +from offat.logger import logger from os import uname, environ -logger = create_logger(__name__) logger.info(f'Secret Key: {auth_secret_key}') diff --git a/src/offat/api/jobs.py b/src/offat/api/jobs.py index 6aead7e..8715154 100644 --- a/src/offat/api/jobs.py +++ b/src/offat/api/jobs.py @@ -2,10 +2,7 @@ from offat.api.models import CreateScanModel from offat.tester.tester_utils import generate_and_run_tests from offat.openapi import OpenAPIParser -from offat.logger import create_logger - - -logger = create_logger(__name__) +from offat.logger import logger def scan_api(body_data: CreateScanModel): diff --git a/src/offat/config_data_handler.py b/src/offat/config_data_handler.py index 0e2189f..9e90545 100644 --- a/src/offat/config_data_handler.py +++ b/src/offat/config_data_handler.py @@ -1,38 +1,35 @@ from copy import deepcopy -from pprint import pprint -from .logger import create_logger +from .logger import logger -logger = create_logger(__name__) - - -def validate_config_file_data(test_config_data:dict): +def validate_config_file_data(test_config_data: dict): if not isinstance(test_config_data, dict): logger.warning('Invalid data format') return False - + if test_config_data.get('error', False): - logger.warning(f'Error Occurred While reading file: {test_config_data}') + logger.warning( + f'Error Occurred While reading file: {test_config_data}') return False - + if not test_config_data.get('actors', ): logger.warning('actors are required') return False - - if not test_config_data.get('actors', [])[0].get('actor1',None): + + if not test_config_data.get('actors', [])[0].get('actor1', None): logger.warning('actor1 is required') return False - + logger.info('User provided data will be used for generating test cases') return test_config_data -def populate_user_data(actor_data:dict, actor_name:str,tests:list[dict]): +def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]): tests = deepcopy(tests) - headers = actor_data.get('request_headers',[]) - body_params = actor_data.get('body',[]) - query_params = actor_data.get('query',[]) - path_params = actor_data.get('path',[]) + headers = actor_data.get('request_headers', []) + body_params = actor_data.get('body', []) + query_params = actor_data.get('query', []) + path_params = actor_data.get('path', []) # create HTTP request headers request_headers = {} @@ -44,10 +41,12 @@ def populate_user_data(actor_data:dict, actor_name:str,tests:list[dict]): test['body_params'] += body_params test['query_params'] += query_params test['path_params'] += path_params - test['test_actor_name'] = actor_name # for post test processing tests such as broken authentication - if test.get('kwargs',{}).get('headers',{}).items(): - test['kwargs']['headers'] = dict(test['kwargs']['headers'], **request_headers) + # for post test processing tests such as broken authentication + test['test_actor_name'] = actor_name + if test.get('kwargs', {}).get('headers', {}).items(): + test['kwargs']['headers'] = dict( + test['kwargs']['headers'], **request_headers) else: test['kwargs']['headers'] = request_headers - return tests \ No newline at end of file + return tests diff --git a/src/offat/logger.py b/src/offat/logger.py index 0c2d06a..9533ab7 100644 --- a/src/offat/logger.py +++ b/src/offat/logger.py @@ -1,44 +1,17 @@ -from colorama import Fore, Style, init -import logging - - -init(autoreset=True) - - -class ColoredLogger(logging.Formatter): - grey = Fore.WHITE - yellow = Fore.YELLOW + Style.BRIGHT - red = Fore.RED - bold_red = Fore.RED + Style.BRIGHT - reset = "\x1b[0m" - format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)" +from rich.console import Console +from rich.logging import RichHandler - FORMATS = { - logging.DEBUG: grey + format, - logging.INFO: grey + format, - logging.WARNING: yellow + format, - logging.ERROR: red + format, - logging.CRITICAL: bold_red + format - } - - def format(self, record): - log_fmt = self.FORMATS.get(record.levelno) - formatter = logging.Formatter(log_fmt) - return formatter.format(record) - - -def create_logger(logger_name:str, logging_level=logging.DEBUG): - # create logger - logger = logging.getLogger(logger_name) - logger.setLevel(logging_level) - - # create console handler with a higher log level - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) +import logging - ch.setFormatter(ColoredLogger()) - logger.addHandler(ch) +console = Console() - return logger - \ No newline at end of file +# create logger +logging.basicConfig( + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler( + console=console, rich_tracebacks=True, tracebacks_show_locals=True)], +) +logger = logging.getLogger("OWASP-OFFAT") +logger.setLevel(logging.DEBUG) diff --git a/src/offat/openapi.py b/src/offat/openapi.py index 812316a..6513687 100644 --- a/src/offat/openapi.py +++ b/src/offat/openapi.py @@ -1,40 +1,41 @@ from prance import ResolvingParser -from .logger import create_logger - - -logger = create_logger(__name__) +from .logger import logger class OpenAPIParser: '''''' - def __init__(self, fpath_or_url:str, spec:dict=None) -> None: - self._parser = ResolvingParser(fpath_or_url, backend = 'openapi-spec-validator', spec_string=spec) + + def __init__(self, fpath_or_url: str, spec: dict = None) -> None: + self._parser = ResolvingParser( + fpath_or_url, backend='openapi-spec-validator', spec_string=spec) if self._parser.valid: logger.info('Specification file is valid') else: logger.error('Specification file is invalid!') - + self._spec = self._parser.specification - + self.hosts = [] self._populate_hosts() self.host = self.hosts[0] - self.http_scheme = 'https' if 'https' in self._spec.get('schemes',[]) else 'http' + self.http_scheme = 'https' if 'https' in self._spec.get( + 'schemes', []) else 'http' self.base_url = f"{self.http_scheme}://{self.host}{self._spec.get('basePath','')}" self.request_response_params = self._get_request_response_params() def _populate_hosts(self): - if self._spec.get('openapi'): # for openapi v3 - servers = self._spec.get('servers',[]) + if self._spec.get('openapi'): # for openapi v3 + servers = self._spec.get('servers', []) hosts = [] for server in servers: - host = server.get('url','').removeprefix('http://').removeprefix('http://').removesuffix('/') + host = server.get('url', '').removeprefix( + 'http://').removeprefix('http://').removesuffix('/') host = None if host == '' else host hosts.append(host) else: - host = self._spec.get('host') # for swagger files + host = self._spec.get('host') # for swagger files if not host: logger.error('Invalid Host: Host is missing') raise ValueError('Host Not Found in spec file') @@ -42,7 +43,6 @@ def _populate_hosts(self): self.hosts = hosts - def _get_endpoints(self): '''Returns list of endpoint paths along with HTTP methods allowed''' endpoints = [] @@ -57,25 +57,25 @@ def _get_endpoints(self): def _get_endpoint_details_for_fuzz_test(self): return self._spec.get('paths') - - def _get_param_definition_schema(self, param:dict): + + def _get_param_definition_schema(self, param: dict): '''Returns Model defined schema for the passed param''' param_schema = param.get('schema') - + # replace schema $ref with model params if param_schema: param_schema_ref = param_schema.get('$ref') if param_schema_ref: model_slug = param_schema_ref.split('/')[-1] - param_schema = self._spec.get('definitions',{}).get(model_slug) + param_schema = self._spec.get( + 'definitions', {}).get(model_slug) return param_schema - - def _get_response_definition_schema(self, responses:dict): + def _get_response_definition_schema(self, responses: dict): '''returns schema of API response - + Args: responses (dict): responses from path http method json data @@ -87,13 +87,13 @@ def _get_response_definition_schema(self, responses:dict): if 'parameters' in status_code_response: responses[status_code]['schema'] = responses[status_code]['parameters'] elif 'schema' in status_code_response: - responses[status_code]['schema'] = self._get_param_definition_schema(responses[status_code]) - else: + responses[status_code]['schema'] = self._get_param_definition_schema( + responses[status_code]) + else: continue return responses - def _get_request_response_params(self): '''Returns Schema of requests and response params @@ -104,31 +104,32 @@ def _get_request_response_params(self): list: ''' requests = [] - paths = self._spec.get('paths',{}) + paths = self._spec.get('paths', {}) # extract endpoints and supported params for path in paths.keys(): - path_params = paths[path].get('parameters',[]) + path_params = paths[path].get('parameters', []) - for http_method in paths.get(path,{}).keys(): + for http_method in paths.get(path, {}).keys(): # consider only http methods if http_method not in ['get', 'put', 'post', 'delete', 'options']: continue - - body_parameters = paths[path][http_method].get('parameters',[]) - response_params = self._get_response_definition_schema(paths[path][http_method].get('responses',{})) + body_parameters = paths[path][http_method].get( + 'parameters', []) + response_params = self._get_response_definition_schema( + paths[path][http_method].get('responses', {})) # create list of parameters for param in body_parameters: param['schema'] = self._get_param_definition_schema(param) requests.append({ - 'http_method':http_method, - 'path':path, - 'request_params':body_parameters, - 'response_params':response_params, - 'path_params':path_params, + 'http_method': http_method, + 'path': path, + 'request_params': body_parameters, + 'response_params': response_params, + 'path_params': path_params, }) return requests diff --git a/src/offat/report/generator.py b/src/offat/report/generator.py index 8cdc509..89798fd 100644 --- a/src/offat/report/generator.py +++ b/src/offat/report/generator.py @@ -5,10 +5,7 @@ from os import makedirs from yaml import dump as yaml_dump -from ..logger import create_logger - - -logger = create_logger(__name__) +from ..logger import logger class ReportGenerator: diff --git a/src/offat/report/table.py b/src/offat/report/table.py index e69af87..173e02c 100644 --- a/src/offat/report/table.py +++ b/src/offat/report/table.py @@ -1,18 +1,18 @@ -from rich.console import Console from rich.table import Table, Column +from ..logger import console class TestResultTable: def __init__(self, table_width_percentage: float = 98, ) -> None: - self.console = Console() + self.console = console self.table_width_percentage = table_width_percentage def print_table(self, table: Table): - terminal_width = self.console.width + terminal_width = console.width table_width = int(terminal_width * (self.table_width_percentage / 100)) table.width = table_width - self.console.print(table, width=table_width, overflow='fold') + self.console.print(table) self.console.rule() def extract_result_table_cols(self, results: list[dict]) -> list[str]: diff --git a/src/offat/tester/test_generator.py b/src/offat/tester/test_generator.py index 708c319..999ece4 100644 --- a/src/offat/tester/test_generator.py +++ b/src/offat/tester/test_generator.py @@ -1,5 +1,4 @@ from copy import deepcopy -from pprint import pprint from .fuzzer import fill_params from .test_runner import TestRunnerFiltersEnum from .fuzzer import generate_random_int diff --git a/src/offat/tester/test_runner.py b/src/offat/tester/test_runner.py index d563f32..f1986ef 100644 --- a/src/offat/tester/test_runner.py +++ b/src/offat/tester/test_runner.py @@ -1,12 +1,14 @@ from asyncio import ensure_future, gather from aiohttp.client_exceptions import ClientProxyConnectionError from enum import Enum -from typing import Optional +from rich.progress import Progress, TaskID from traceback import print_exc -from ..http import AsyncRequests -from ..logger import create_logger +from typing import Optional -logger = create_logger(__name__) + +from ..http import AsyncRequests +from ..logger import logger +from ..logger import console # TODO: move filters to post processing module @@ -25,6 +27,8 @@ class TestRunner: def __init__(self, rate_limit: Optional[int] = None, delay: Optional[float] = None, headers: Optional[dict] = None, proxy: Optional[str] = None, ssl: Optional[bool] = True) -> None: self._client = AsyncRequests( rate_limit=rate_limit, delay=delay, headers=headers, proxy=proxy, ssl=ssl) + self.progress = Progress(console=console) + self.progress_task_id: Optional[TaskID] = None def _generate_payloads(self, params: list[dict], payload_for: PayloadFor = PayloadFor.BODY): '''Generate body payload from passed data for HTTP body and query. @@ -106,10 +110,22 @@ async def send_request(self, test_task): test_result['response_status_code'] = response.get('status') test_result['redirection'] = response.get('res_redirection', '') + # advance progress bar + if self.progress_task_id: + self.progress.update(self.progress_task_id, + advance=1, refresh=True) + + if self.progress and self.progress.finished: + self.progress.stop() + self.progress_task_id = None + return test_result - async def run_tests(self, test_tasks: list): + async def run_tests(self, test_tasks: list, description: Optional[str]): '''run tests generated from test generator module''' + self.progress.start() + self.progress_task_id = self.progress.add_task( + f'[orange] {description}', total=len(test_tasks)) tasks = [] for test_task in test_tasks: diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index 7625a6a..b2ab8c6 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -8,18 +8,16 @@ from .test_runner import TestRunner from ..report.table import TestResultTable from ..report.generator import ReportGenerator -from ..logger import create_logger +from ..logger import logger from ..openapi import OpenAPIParser -logger = create_logger(__name__) - # create tester objs test_table_generator = TestResultTable() test_generator = TestGenerator() -def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional[str] = None, skip_test_run: Optional[bool] = False, post_run_matcher_test: Optional[bool] = False) -> list: +def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional[str] = None, skip_test_run: Optional[bool] = False, post_run_matcher_test: Optional[bool] = False, description: Optional[str] = None) -> list: '''Run tests and print result on console''' global test_table_generator # filter data if regex is passed @@ -34,7 +32,8 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional if skip_test_run: test_results = tests else: - test_results = run(test_runner.run_tests(tests)) + test_results = run(test_runner.run_tests( + tests, description)) if post_run_matcher_test: test_results = PostRunTests.matcher(test_results) @@ -73,113 +72,170 @@ def generate_and_run_tests(api_parser: OpenAPIParser, regex_pattern: Optional[st results: list = [] # test for unsupported http methods - logger.info('Checking for Unsupported HTTP methods:') + test_name = 'Checking for Unsupported HTTP methods:' + logger.info(test_name) unsupported_http_endpoint_tests = test_generator.check_unsupported_http_methods( api_parser.base_url, api_parser._get_endpoints()) - results += run_test(test_runner=test_runner, - tests=unsupported_http_endpoint_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=unsupported_http_endpoint_tests, + regex_pattern=regex_pattern, + description='(OAS) Checking for Unsupported HTTP methods:' + ) # sqli fuzz test - logger.info('Checking for SQLi vulnerability:') + test_name = 'Checking for SQLi vulnerability:' + logger.info(test_name) sqli_fuzz_tests = test_generator.sqli_fuzz_params_test(api_parser) - results += run_test(test_runner=test_runner, - tests=sqli_fuzz_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=sqli_fuzz_tests, + regex_pattern=regex_pattern, + description='(FUZZED) Checking for SQLi vulnerability:', + ) # OS Command Injection Fuzz Test - logger.info( - 'Checking for OS Command Injection Vulnerability with fuzzed params and checking response body:') + test_name = 'Checking for OS Command Injection Vulnerability with fuzzed params and checking response body:' + logger.info(test_name) os_command_injection_tests = test_generator.os_command_injection_fuzz_params_test( api_parser) - results += run_test(test_runner=test_runner, tests=os_command_injection_tests, - regex_pattern=regex_pattern, post_run_matcher_test=True) + results += run_test( + test_runner=test_runner, + tests=os_command_injection_tests, + regex_pattern=regex_pattern, + post_run_matcher_test=True, + description='(FUZZED) Checking for OS Command Injection:', + ) # XSS/HTML Injection Fuzz Test - logger.info( - 'Checking for XSS/HTML Injection Vulnerability with fuzzed params and checking response body:') + test_name = 'Checking for XSS/HTML Injection Vulnerability with fuzzed params and checking response body:' + logger.info(test_name) os_command_injection_tests = test_generator.xss_html_injection_fuzz_params_test( api_parser) - results += run_test(test_runner=test_runner, tests=os_command_injection_tests, - regex_pattern=regex_pattern, post_run_matcher_test=True) + results += run_test( + test_runner=test_runner, + tests=os_command_injection_tests, + regex_pattern=regex_pattern, + post_run_matcher_test=True, + description='(FUZZED) Checking for XSS/HTML Injection:', + ) # BOLA path tests with fuzzed data - logger.info('Checking for BOLA in PATH using fuzzed params:') + test_name = 'Checking for BOLA in PATH using fuzzed params:' + logger.info(test_name) bola_fuzzed_path_tests = test_generator.bola_fuzz_path_test( api_parser, success_codes=[200, 201, 301]) - results += run_test(test_runner=test_runner, - tests=bola_fuzzed_path_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=bola_fuzzed_path_tests, + regex_pattern=regex_pattern, + description='(FUZZED) Checking for BOLA in PATH:' + ) # BOLA path test with fuzzed data + trailing slash - logger.info( - 'Checking for BOLA in PATH with trailing slash and id using fuzzed params:') + test_name = 'Checking for BOLA in PATH with trailing slash and id using fuzzed params:' + logger.info(test_name) bola_trailing_slash_path_tests = test_generator.bola_fuzz_trailing_slash_path_test( api_parser, success_codes=[200, 201, 301]) - results += run_test(test_runner=test_runner, - tests=bola_trailing_slash_path_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=bola_trailing_slash_path_tests, + regex_pattern=regex_pattern, + description='(FUZZED) Checking for BOLA in PATH with trailing slash:' + ) # Mass Assignment / BOPLA - logger.info( - 'Checking for Mass Assignment Vulnerability with fuzzed params and checking response status codes:') + test_name = 'Checking for Mass Assignment Vulnerability with fuzzed params and checking response status codes:' + logger.info(test_name) bopla_tests = test_generator.bopla_fuzz_test( api_parser, success_codes=[200, 201, 301]) - results += run_test(test_runner=test_runner, - tests=bopla_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=bopla_tests, + regex_pattern=regex_pattern, + description='(FUZZED) Checking for Mass Assignment Vulnerability:', + ) # Tests with User provided Data if bool(test_data_config): - logger.info('Testing with user provided data') + logger.info('[bold]Testing with user provided data[/bold]') # BOLA path tests with fuzzed + user provided data - logger.info( - 'Checking for BOLA in PATH using fuzzed and user provided params:') + test_name = 'Checking for BOLA in PATH using fuzzed and user provided params:', + logger.info(test_name) bola_fuzzed_user_data_tests = test_generator.test_with_user_data( test_data_config, test_generator.bola_fuzz_path_test, openapi_parser=api_parser, success_codes=[200, 201, 301], ) - results += run_test(test_runner=test_runner, - tests=bola_fuzzed_user_data_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=bola_fuzzed_user_data_tests, + regex_pattern=regex_pattern, + description='(USER + FUZZED) Checking for BOLA in PATH:', + ) # BOLA path test with fuzzed + user data + trailing slash - logger.info( - 'Checking for BOLA in PATH with trailing slash id using fuzzed and user provided params:') + test_name = 'Checking for BOLA in PATH with trailing slash id using fuzzed and user provided params:' + logger.info(test_name) bola_trailing_slash_path_user_data_tests = test_generator.test_with_user_data( test_data_config, test_generator.bola_fuzz_trailing_slash_path_test, openapi_parser=api_parser, success_codes=[200, 201, 301], ) - results += run_test(test_runner=test_runner, - tests=bola_trailing_slash_path_user_data_tests, regex_pattern=regex_pattern) + results += run_test( + test_runner=test_runner, + tests=bola_trailing_slash_path_user_data_tests, + regex_pattern=regex_pattern, + description='(USER + FUZZED) Checking for BOLA in PATH with trailing slash:', + ) # OS Command Injection Fuzz Test - logger.info( - 'Checking for OS Command Injection Vulnerability with fuzzed & user params and checking response body:') + test_name = 'Checking for OS Command Injection Vulnerability with fuzzed & user params and checking response body:' + logger.info(test_name) os_command_injection_with_user_data_tests = test_generator.test_with_user_data( test_data_config, test_generator.os_command_injection_fuzz_params_test, openapi_parser=api_parser, ) - results += run_test(test_runner=test_runner, tests=os_command_injection_with_user_data_tests, - regex_pattern=regex_pattern, post_run_matcher_test=True) + results += run_test( + test_runner=test_runner, + tests=os_command_injection_with_user_data_tests, + regex_pattern=regex_pattern, + post_run_matcher_test=True, + description='(USER + FUZZED) Checking for OS Command Injection Vulnerability:', + ) # XSS/HTML Injection Fuzz Test - logger.info( - 'Checking for XSS/HTML Injection Vulnerability with fuzzed & user params and checking response body:') + test_name = 'Checking for XSS/HTML Injection Vulnerability with fuzzed & user params and checking response body:' + logger.info(test_name) os_command_injection_with_user_data_tests = test_generator.test_with_user_data( test_data_config, test_generator.xss_html_injection_fuzz_params_test, openapi_parser=api_parser, ) - results += run_test(test_runner=test_runner, tests=os_command_injection_with_user_data_tests, - regex_pattern=regex_pattern, post_run_matcher_test=True) + results += run_test( + test_runner=test_runner, + tests=os_command_injection_with_user_data_tests, + regex_pattern=regex_pattern, + post_run_matcher_test=True, + description='(USER + FUZZED) Checking for XSS/HTML Injection:', + ) # Broken Access Control Test - logger.info('Checking for Broken Access Control:') + test_name = 'Checking for Broken Access Control:' + logger.info(test_name) bac_results = PostRunTests.run_broken_access_control_tests( results, test_data_config) - results += run_test(test_runner=test_runner, tests=bac_results, - regex_pattern=regex_pattern, skip_test_run=True) + results += run_test( + test_runner=test_runner, + tests=bac_results, + regex_pattern=regex_pattern, + skip_test_run=True, + description=test_name, + ) # save file to output if output flag is present if output_file: diff --git a/src/offat/utils.py b/src/offat/utils.py index b32a465..2b33afd 100644 --- a/src/offat/utils.py +++ b/src/offat/utils.py @@ -1,26 +1,23 @@ from json import loads as json_load, dumps as json_dumps, JSONDecodeError from pkg_resources import get_distribution -from yaml import safe_load, YAMLError from os.path import isfile -from .logger import create_logger - - -logger = create_logger(__name__) +from yaml import safe_load, YAMLError +from .logger import logger def get_package_version(): '''Returns package current version - + Args: None Returns: String: current package version ''' - return get_distribution('offat').version + return get_distribution('offat').version -def read_yaml(file_path:str) -> dict: +def read_yaml(file_path: str) -> dict: '''Reads YAML file and returns as python dict. returns file not found or yaml errors as dict. @@ -31,19 +28,19 @@ def read_yaml(file_path:str) -> dict: dict: YAML contents as dict else returns error ''' if not file_path: - return {"error":"ValueError, path cannot be of None type"} + return {"error": "ValueError, path cannot be of None type"} if not isfile(file_path): - return {"error":"File Not Found"} - + return {"error": "File Not Found"} + with open(file_path) as f: try: return safe_load(f.read()) except YAMLError: return {"error": "YAML error"} - -def read_json(file_path:str) -> dict: + +def read_json(file_path: str) -> dict: '''Reads JSON file and returns as python dict. returns file not found or JSON errors as dict. @@ -54,16 +51,16 @@ def read_json(file_path:str) -> dict: dict: YAML contents as dict else returns error ''' if not isfile(file_path): - return {"error":"File Not Found"} - + return {"error": "File Not Found"} + with open(file_path) as f: try: return json_load(f.read()) except JSONDecodeError: return {"error": "JSON error"} - -def read_openapi_file(file_path:str) -> dict: + +def read_openapi_file(file_path: str) -> dict: '''Returns Open API Documentation file contents as json returns file not found or yaml errors as dict. @@ -74,8 +71,8 @@ def read_openapi_file(file_path:str) -> dict: dict: YAML contents as dict else returns error ''' if not isfile(file_path): - return {"error":"File Not Found"} - + return {"error": "File Not Found"} + file_ext = file_path.split('.')[-1] match file_ext: case 'json': @@ -83,10 +80,10 @@ def read_openapi_file(file_path:str) -> dict: case 'yaml': return read_yaml(file_path) case _: - return {"error":"Invalid file extension"} + return {"error": "Invalid file extension"} -def write_json_to_file(json_data:dict, file_path:str): +def write_json_to_file(json_data: dict, file_path: str): '''Writes dict obj to file as json Args: @@ -101,7 +98,7 @@ def write_json_to_file(json_data:dict, file_path:str): Any exception occurred during operation ''' if isfile(file_path): - logger.info(f'{file_path} file will be overwritten.') + logger.info(f'{file_path} file will be overwritten.') logger.info(f'Writing data to file: {file_path}') try: @@ -111,18 +108,20 @@ def write_json_to_file(json_data:dict, file_path:str): return True except JSONDecodeError: - logger.error(f'Invalid JSON data, error while writing to {file_path} file.') + logger.error( + f'Invalid JSON data, error while writing to {file_path} file.') except Exception as e: - logger.error(f'Unable to write JSON data to file due to below exception:\n{repr(e)}') + logger.error( + f'Unable to write JSON data to file due to below exception:\n{repr(e)}') return False -def str_to_dict(key_values:str) -> dict: +def str_to_dict(key_values: str) -> dict: '''Takes string object and converts to dict String should in `Key1:Value1,Key2:Value2,Key3:Value3` format - + Args: key_values (str): dict as str separated by commas `,` @@ -145,10 +144,10 @@ def str_to_dict(key_values:str) -> dict: return new_dict -def headers_list_to_dict(headers_list_list:list[list[str]]) -> dict|None: +def headers_list_to_dict(headers_list_list: list[list[str]]) -> dict | None: '''Takes list object and converts to dict String should in `[['Key1:Value1'],['Key2:Value2'],['Key3:Value3']]` format - + Args: headers_list_list (list): headers value as list[list[str]], where str is in `key:value` format @@ -162,7 +161,7 @@ def headers_list_to_dict(headers_list_list:list[list[str]]) -> dict|None: if not headers_list_list: return None - response_headers_dict:dict = dict() + response_headers_dict: dict = dict() for header_list in headers_list_list: for header_data in header_list: @@ -171,4 +170,4 @@ def headers_list_to_dict(headers_list_list:list[list[str]]) -> dict|None: v = header_key_value[1].strip() response_headers_dict[k] = v - return response_headers_dict \ No newline at end of file + return response_headers_dict diff --git a/src/poetry.lock b/src/poetry.lock index 4358e7f..a9b010e 100644 --- a/src/poetry.lock +++ b/src/poetry.lock @@ -1542,4 +1542,4 @@ api = ["fastapi", "python-dotenv", "redis", "rq", "uvicorn"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "ba1f03330ab6bef00a84cd168539b0d4d7871620bed7b8a5db42a6fb30fbae0f" +content-hash = "47d162badebc88e55a2271095cc1c5f488ee412b6197b0ec13133b25da5adfb4" diff --git a/src/pyproject.toml b/src/pyproject.toml index ddf4adb..23ea47e 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -12,7 +12,6 @@ aiohttp = "^3.8.4" pyyaml = "^6.0" prance = "^23.6.21.0" openapi-spec-validator = "^0.5.7" -colorama = "^0.4.6" fastapi = {version = "^0.103.1", optional = true} uvicorn = {extras = ["standard"], version = "^0.23.2", optional = true} rq = {version = "^1.15.1", optional = true} From 650028cefae0cf1f946367a2838822aea9d98312 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Sat, 18 Nov 2023 03:46:17 +0530 Subject: [PATCH 5/6] print table after getting final results move table generator to templates --- src/offat/__main__.py | 38 ++++++++++++++--------- src/offat/report/generator.py | 36 +++++++++++++++------ src/offat/report/{ => templates}/table.py | 5 +-- src/offat/tester/tester_utils.py | 22 ++++--------- 4 files changed, 59 insertions(+), 42 deletions(-) rename src/offat/report/{ => templates}/table.py (95%) diff --git a/src/offat/__main__.py b/src/offat/__main__.py index d75e5db..e8fa670 100644 --- a/src/offat/__main__.py +++ b/src/offat/__main__.py @@ -32,22 +32,32 @@ def start(): banner() parser = ArgumentParser(prog='offat') - parser.add_argument('-f','--file', dest='fpath', type=str, help='path or url of openapi/swagger specification file', required=True) - parser.add_argument('-v','--version', action='version', version=f'%(prog)s {get_package_version()}') - parser.add_argument('-rl', '--rate-limit', dest='rate_limit', help='API requests rate limit. -dr should be passed in order to use this option', type=int, default=None, required=False) - parser.add_argument('-dr', '--delay-rate', dest='delay_rate', help='API requests delay rate in seconds. -rl should be passed in order to use this option', type=float, default=None, required=False) - parser.add_argument('-pr','--path-regex', dest='path_regex_pattern', type=str, help='run tests for paths matching given regex pattern', required=False, default=None) - parser.add_argument('-o', '--output', dest='output_file', type=str, help='path to store test results in specified format. Default format is html', required=False, default=None) - parser.add_argument('-of','--format', dest='output_format', type=str, choices=['json', 'yaml','html'], help='Data format to save (json, yaml, html). Default: json', required=False, default='json') - parser.add_argument('-H', '--headers', dest='headers', type=str, help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, action='append', nargs='*') - parser.add_argument('-tdc','--test-data-config', dest='test_data_config',help='YAML file containing user test data for tests', required=False, type=str) - parser.add_argument('-p', '--proxy', dest='proxy', help='Proxy server URL to route HTTP requests through (e.g., "http://proxyserver:port")', required=False, type=str) - parser.add_argument('-ns', '--no-ssl', dest='no_ssl', help='Ignores SSL verification when enabled', action='store_true', required=False) # False -> ignore SSL, True -> enforce SSL check + parser.add_argument('-f', '--file', dest='fpath', type=str, + help='path or url of openapi/swagger specification file', required=True) + parser.add_argument('-v', '--version', action='version', + version=f'%(prog)s {get_package_version()}') + parser.add_argument('-rl', '--rate-limit', dest='rate_limit', + help='API requests rate limit. -dr should be passed in order to use this option', type=int, default=None, required=False) + parser.add_argument('-dr', '--delay-rate', dest='delay_rate', + help='API requests delay rate in seconds. -rl should be passed in order to use this option', type=float, default=None, required=False) + parser.add_argument('-pr', '--path-regex', dest='path_regex_pattern', type=str, + help='run tests for paths matching given regex pattern', required=False, default=None) + parser.add_argument('-o', '--output', dest='output_file', type=str, + help='path to store test results in specified format. Default format is html', required=False, default=None) + parser.add_argument('-of', '--format', dest='output_format', type=str, choices=[ + 'json', 'yaml', 'html', 'table'], help='Data format to save (json, yaml, html, table). Default: table', required=False, default='table') + parser.add_argument('-H', '--headers', dest='headers', type=str, + help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, action='append', nargs='*') + parser.add_argument('-tdc', '--test-data-config', dest='test_data_config', + help='YAML file containing user test data for tests', required=False, type=str) + parser.add_argument('-p', '--proxy', dest='proxy', + help='Proxy server URL to route HTTP requests through (e.g., "http://proxyserver:port")', required=False, type=str) + parser.add_argument('-ns', '--no-ssl', dest='no_ssl', help='Ignores SSL verification when enabled', + action='store_true', required=False) # False -> ignore SSL, True -> enforce SSL check args = parser.parse_args() - # convert req headers str to dict - headers_dict:dict = headers_list_to_dict(args.headers) + headers_dict: dict = headers_list_to_dict(args.headers) # handle rate limiting options rate_limit = args.rate_limit @@ -82,4 +92,4 @@ def start(): if __name__ == '__main__': - start() \ No newline at end of file + start() diff --git a/src/offat/report/generator.py b/src/offat/report/generator.py index 89798fd..21422fe 100644 --- a/src/offat/report/generator.py +++ b/src/offat/report/generator.py @@ -1,11 +1,15 @@ +from copy import deepcopy from html import escape from json import dumps as json_dumps from offat.report import templates from os.path import dirname, join as path_join from os import makedirs +from rich.table import Table +from rich.pretty import pprint from yaml import dump as yaml_dump -from ..logger import logger +from .templates.table import TestResultTable +from ..logger import logger, console class ReportGenerator: @@ -42,7 +46,7 @@ def generate_html_report(results: list[dict]): return report_file_content @staticmethod - def handle_report_format(results: list[dict], report_format: str) -> str: + def handle_report_format(results: list[dict], report_format: str | None) -> str | Table: result = None match report_format: @@ -55,28 +59,40 @@ def handle_report_format(results: list[dict], report_format: str) -> str: result = yaml_dump({ 'results': results, }) - case _: # default json format + case 'json': report_format = 'json' result = json_dumps({ 'results': results, }) + case _: # default: CLI table + report_format = 'table' + results_table = TestResultTable().generate_result_table( + deepcopy(results)) + result = results_table logger.info(f'Generated {report_format.upper()} format report.') return result @staticmethod - def save_report(report_path: str, report_file_content: str): - if report_path != '/': + def save_report(report_path: str | None, report_file_content: str | Table | None): + if report_path != '/' and report_path: dir_name = dirname(report_path) - if dir_name != '': + if dir_name != '' and report_path: makedirs(dir_name, exist_ok=True) - with open(report_path, 'w') as f: - logger.info(f'Writing report to file: {report_path}') - f.write(report_file_content) + # print to cli if report path and file content as absent else write to file location. + if report_path and report_file_content and not isinstance(report_file_content, Table): + with open(report_path, 'w') as f: + logger.info(f'Writing report to file: {report_path}') + f.write(report_file_content) + else: + if isinstance(report_file_content, Table) and report_file_content.columns: + TestResultTable().print_table(report_file_content) + else: + console.print(report_file_content) @staticmethod - def generate_report(results: list[dict], report_format: str, report_path: str): + def generate_report(results: list[dict], report_format: str | None, report_path: str | None): formatted_results = ReportGenerator.handle_report_format( results=results, report_format=report_format) ReportGenerator.save_report( diff --git a/src/offat/report/table.py b/src/offat/report/templates/table.py similarity index 95% rename from src/offat/report/table.py rename to src/offat/report/templates/table.py index 173e02c..335c8ef 100644 --- a/src/offat/report/table.py +++ b/src/offat/report/templates/table.py @@ -1,5 +1,5 @@ from rich.table import Table, Column -from ..logger import console +from ...logger import console class TestResultTable: @@ -29,7 +29,8 @@ def generate_result_table(self, results: list, filter_passed_results: bool = Tru for result in results: table_row = [] for col in cols: - table_row.append(str(result[col.header])) + table_row.append( + str(result.get(col.header, '[red]:bug: - [/red]'))) table.add_row(*table_row) return table diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index b2ab8c6..2cb38c4 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -6,20 +6,18 @@ from .post_test_processor import PostRunTests from .test_generator import TestGenerator from .test_runner import TestRunner -from ..report.table import TestResultTable +from ..report.templates.table import TestResultTable from ..report.generator import ReportGenerator from ..logger import logger from ..openapi import OpenAPIParser # create tester objs -test_table_generator = TestResultTable() test_generator = TestGenerator() def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional[str] = None, skip_test_run: Optional[bool] = False, post_run_matcher_test: Optional[bool] = False, description: Optional[str] = None) -> list: '''Run tests and print result on console''' - global test_table_generator # filter data if regex is passed if regex_pattern: tests = list( @@ -47,13 +45,6 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional # run data leak tests test_results = PostRunTests.detect_data_exposure(test_results) - # print results - results_table = test_table_generator.generate_result_table( - deepcopy(test_results)) - - if results_table.columns: - test_table_generator.print_table(results_table) - return test_results @@ -238,11 +229,10 @@ def generate_and_run_tests(api_parser: OpenAPIParser, regex_pattern: Optional[st ) # save file to output if output flag is present - if output_file: - ReportGenerator.generate_report( - results=results, - report_format=output_file_format, - report_path=output_file, - ) + ReportGenerator.generate_report( + results=results, + report_format=output_file_format, + report_path=output_file, + ) return results From 9dd234e5b66fa596ff0ec8d3f814c93096ddd8a1 Mon Sep 17 00:00:00 2001 From: Dhrumil Mistry <56185972+dmdhrumilmistry@users.noreply.github.com> Date: Sat, 18 Nov 2023 03:49:10 +0530 Subject: [PATCH 6/6] bump project version remove unused imports --- src/offat/report/generator.py | 1 - src/pyproject.toml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/offat/report/generator.py b/src/offat/report/generator.py index 21422fe..94e38ee 100644 --- a/src/offat/report/generator.py +++ b/src/offat/report/generator.py @@ -5,7 +5,6 @@ from os.path import dirname, join as path_join from os import makedirs from rich.table import Table -from rich.pretty import pprint from yaml import dump as yaml_dump from .templates.table import TestResultTable diff --git a/src/pyproject.toml b/src/pyproject.toml index 23ea47e..20bdb56 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "offat" -version = "0.12.1" +version = "0.12.2" description = "Offensive API tester tool automates checks for common API vulnerabilities" authors = ["Dhrumil Mistry "] license = "MIT"