Skip to content

Commit

Permalink
Merge pull request #38 from OWASP/host-check
Browse files Browse the repository at this point in the history
check host availability before starting tests
  • Loading branch information
dmdhrumilmistry authored Jan 20, 2024
2 parents 2b1753d + f2b6940 commit 8e03802
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/offat/tester/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def send_request(self, test_task):
logger.error('Connection Failed! Server refused Connection!!')
except ClientProxyConnectionError as e:
logger.error(f'Proxy Connection Error: {e}')
# TODO: handle exception here

test_result = test_task

Expand Down
35 changes: 34 additions & 1 deletion src/offat/tester/tester_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from asyncio import run
from copy import deepcopy
from http import client as http_client
from typing import Optional
from re import search as regex_search

Expand All @@ -8,13 +8,40 @@
from .test_runner import TestRunner
from ..report.generator import ReportGenerator
from ..logger import logger
from ..http import AsyncRequests
from ..openapi import OpenAPIParser


# create tester objs
test_generator = TestGenerator()


def is_host_up(openapi_parser: OpenAPIParser) -> bool:
tokens = openapi_parser.host.split(":")
match len(tokens):
case 1:
host = tokens[0]
port = 443 if openapi_parser.http_scheme == "https" else 80
case 2:
host = tokens[0]
port = tokens[1]
case _:
logger.warning(f"Invalid host: {openapi_parser.host}")
return False

logger.info(f"Checking whether host {host}:{port} is available")
try:
conn = http_client.HTTPConnection(host=host, port=port, timeout=5)
conn.request("GET", "/")
res = conn.getresponse()
logger.info(f"Host returned status code: {res.status}")
return res.status in range(200, 499)
except Exception as e:
logger.error(
f"Unable to connect to host {host}:{port} due to error: {e}")
return False


def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional[str] = None, skip_test_run: Optional[bool] = False, post_run_matcher_test: Optional[bool] = False, description: Optional[str] = None) -> list:
'''Run tests and print result on console'''
# filter data if regex is passed
Expand Down Expand Up @@ -51,6 +78,12 @@ def run_test(test_runner: TestRunner, tests: list[dict], regex_pattern: Optional
def generate_and_run_tests(api_parser: OpenAPIParser, regex_pattern: Optional[str] = None, output_file: Optional[str] = None, output_file_format: Optional[str] = None, rate_limit: Optional[int] = None, delay: Optional[float] = None, req_headers: Optional[dict] = None, proxy: Optional[str] = None, ssl: Optional[bool] = True, test_data_config: Optional[dict] = None):
global test_table_generator, logger

if not is_host_up(openapi_parser=api_parser):
logger.error(
f"Stopping tests due to unavailibility of host: {api_parser.host}")
return
logger.info(f"Host {api_parser.host} is up")

test_runner = TestRunner(
rate_limit=rate_limit,
delay=delay,
Expand Down

0 comments on commit 8e03802

Please sign in to comment.