Skip to content

Commit

Permalink
Merge pull request #28 from OWASP/dev
Browse files Browse the repository at this point in the history
RELEASE: 0.12.2
  • Loading branch information
dmdhrumilmistry authored Nov 17, 2023
2 parents 0dd3235 + 9dd234e commit 7000f89
Show file tree
Hide file tree
Showing 14 changed files with 387 additions and 258 deletions.
38 changes: 24 additions & 14 deletions src/offat/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,32 @@ def start():
banner()

parser = ArgumentParser(prog='offat')
parser.add_argument('-f','--file', dest='fpath', type=str, help='path or url of openapi/swagger specification file', required=True)
parser.add_argument('-v','--version', action='version', version=f'%(prog)s {get_package_version()}')
parser.add_argument('-rl', '--rate-limit', dest='rate_limit', help='API requests rate limit. -dr should be passed in order to use this option', type=int, default=None, required=False)
parser.add_argument('-dr', '--delay-rate', dest='delay_rate', help='API requests delay rate in seconds. -rl should be passed in order to use this option', type=float, default=None, required=False)
parser.add_argument('-pr','--path-regex', dest='path_regex_pattern', type=str, help='run tests for paths matching given regex pattern', required=False, default=None)
parser.add_argument('-o', '--output', dest='output_file', type=str, help='path to store test results in specified format. Default format is html', required=False, default=None)
parser.add_argument('-of','--format', dest='output_format', type=str, choices=['json', 'yaml','html'], help='Data format to save (json, yaml, html). Default: json', required=False, default='json')
parser.add_argument('-H', '--headers', dest='headers', type=str, help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, action='append', nargs='*')
parser.add_argument('-tdc','--test-data-config', dest='test_data_config',help='YAML file containing user test data for tests', required=False, type=str)
parser.add_argument('-p', '--proxy', dest='proxy', help='Proxy server URL to route HTTP requests through (e.g., "http://proxyserver:port")', required=False, type=str)
parser.add_argument('-ns', '--no-ssl', dest='no_ssl', help='Ignores SSL verification when enabled', action='store_true', required=False) # False -> ignore SSL, True -> enforce SSL check
parser.add_argument('-f', '--file', dest='fpath', type=str,
help='path or url of openapi/swagger specification file', required=True)
parser.add_argument('-v', '--version', action='version',
version=f'%(prog)s {get_package_version()}')
parser.add_argument('-rl', '--rate-limit', dest='rate_limit',
help='API requests rate limit. -dr should be passed in order to use this option', type=int, default=None, required=False)
parser.add_argument('-dr', '--delay-rate', dest='delay_rate',
help='API requests delay rate in seconds. -rl should be passed in order to use this option', type=float, default=None, required=False)
parser.add_argument('-pr', '--path-regex', dest='path_regex_pattern', type=str,
help='run tests for paths matching given regex pattern', required=False, default=None)
parser.add_argument('-o', '--output', dest='output_file', type=str,
help='path to store test results in specified format. Default format is html', required=False, default=None)
parser.add_argument('-of', '--format', dest='output_format', type=str, choices=[
'json', 'yaml', 'html', 'table'], help='Data format to save (json, yaml, html, table). Default: table', required=False, default='table')
parser.add_argument('-H', '--headers', dest='headers', type=str,
help='HTTP requests headers that should be sent during testing eg: User-Agent: offat', required=False, default=None, action='append', nargs='*')
parser.add_argument('-tdc', '--test-data-config', dest='test_data_config',
help='YAML file containing user test data for tests', required=False, type=str)
parser.add_argument('-p', '--proxy', dest='proxy',
help='Proxy server URL to route HTTP requests through (e.g., "http://proxyserver:port")', required=False, type=str)
parser.add_argument('-ns', '--no-ssl', dest='no_ssl', help='Ignores SSL verification when enabled',
action='store_true', required=False) # False -> ignore SSL, True -> enforce SSL check
args = parser.parse_args()


# convert req headers str to dict
headers_dict:dict = headers_list_to_dict(args.headers)
headers_dict: dict = headers_list_to_dict(args.headers)

# handle rate limiting options
rate_limit = args.rate_limit
Expand Down Expand Up @@ -82,4 +92,4 @@ def start():


if __name__ == '__main__':
start()
start()
3 changes: 1 addition & 2 deletions src/offat/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from offat.api.config import app, task_queue, task_timeout, auth_secret_key
from offat.api.jobs import scan_api
from offat.api.models import CreateScanModel
from offat.logger import create_logger
from offat.logger import logger
from os import uname, environ


logger = create_logger(__name__)
logger.info(f'Secret Key: {auth_secret_key}')


Expand Down
5 changes: 1 addition & 4 deletions src/offat/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
from offat.api.models import CreateScanModel
from offat.tester.tester_utils import generate_and_run_tests
from offat.openapi import OpenAPIParser
from offat.logger import create_logger


logger = create_logger(__name__)
from offat.logger import logger


def scan_api(body_data: CreateScanModel):
Expand Down
41 changes: 20 additions & 21 deletions src/offat/config_data_handler.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
from copy import deepcopy
from pprint import pprint
from .logger import create_logger
from .logger import logger


logger = create_logger(__name__)


def validate_config_file_data(test_config_data:dict):
def validate_config_file_data(test_config_data: dict):
if not isinstance(test_config_data, dict):
logger.warning('Invalid data format')
return False

if test_config_data.get('error', False):
logger.warning(f'Error Occurred While reading file: {test_config_data}')
logger.warning(
f'Error Occurred While reading file: {test_config_data}')
return False

if not test_config_data.get('actors', ):
logger.warning('actors are required')
return False
if not test_config_data.get('actors', [])[0].get('actor1',None):

if not test_config_data.get('actors', [])[0].get('actor1', None):
logger.warning('actor1 is required')
return False

logger.info('User provided data will be used for generating test cases')
return test_config_data


def populate_user_data(actor_data:dict, actor_name:str,tests:list[dict]):
def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
tests = deepcopy(tests)
headers = actor_data.get('request_headers',[])
body_params = actor_data.get('body',[])
query_params = actor_data.get('query',[])
path_params = actor_data.get('path',[])
headers = actor_data.get('request_headers', [])
body_params = actor_data.get('body', [])
query_params = actor_data.get('query', [])
path_params = actor_data.get('path', [])

# create HTTP request headers
request_headers = {}
Expand All @@ -44,10 +41,12 @@ def populate_user_data(actor_data:dict, actor_name:str,tests:list[dict]):
test['body_params'] += body_params
test['query_params'] += query_params
test['path_params'] += path_params
test['test_actor_name'] = actor_name # for post test processing tests such as broken authentication
if test.get('kwargs',{}).get('headers',{}).items():
test['kwargs']['headers'] = dict(test['kwargs']['headers'], **request_headers)
# for post test processing tests such as broken authentication
test['test_actor_name'] = actor_name
if test.get('kwargs', {}).get('headers', {}).items():
test['kwargs']['headers'] = dict(
test['kwargs']['headers'], **request_headers)
else:
test['kwargs']['headers'] = request_headers

return tests
return tests
53 changes: 13 additions & 40 deletions src/offat/logger.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,17 @@
from colorama import Fore, Style, init
import logging


init(autoreset=True)


class ColoredLogger(logging.Formatter):
grey = Fore.WHITE
yellow = Fore.YELLOW + Style.BRIGHT
red = Fore.RED
bold_red = Fore.RED + Style.BRIGHT
reset = "\x1b[0m"
format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
from rich.console import Console
from rich.logging import RichHandler

FORMATS = {
logging.DEBUG: grey + format,
logging.INFO: grey + format,
logging.WARNING: yellow + format,
logging.ERROR: red + format,
logging.CRITICAL: bold_red + format
}

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)


def create_logger(logger_name:str, logging_level=logging.DEBUG):
# create logger
logger = logging.getLogger(logger_name)
logger.setLevel(logging_level)

# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
import logging

ch.setFormatter(ColoredLogger())

logger.addHandler(ch)
console = Console()

return logger

# create logger
logging.basicConfig(
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(
console=console, rich_tracebacks=True, tracebacks_show_locals=True)],
)
logger = logging.getLogger("OWASP-OFFAT")
logger.setLevel(logging.DEBUG)
71 changes: 36 additions & 35 deletions src/offat/openapi.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,48 @@
from prance import ResolvingParser
from .logger import create_logger


logger = create_logger(__name__)
from .logger import logger


class OpenAPIParser:
''''''
def __init__(self, fpath_or_url:str, spec:dict=None) -> None:
self._parser = ResolvingParser(fpath_or_url, backend = 'openapi-spec-validator', spec_string=spec)

def __init__(self, fpath_or_url: str, spec: dict = None) -> None:
self._parser = ResolvingParser(
fpath_or_url, backend='openapi-spec-validator', spec_string=spec)

if self._parser.valid:
logger.info('Specification file is valid')
else:
logger.error('Specification file is invalid!')

self._spec = self._parser.specification

self.hosts = []
self._populate_hosts()
self.host = self.hosts[0]

self.http_scheme = 'https' if 'https' in self._spec.get('schemes',[]) else 'http'
self.http_scheme = 'https' if 'https' in self._spec.get(
'schemes', []) else 'http'
self.base_url = f"{self.http_scheme}://{self.host}{self._spec.get('basePath','')}"
self.request_response_params = self._get_request_response_params()

def _populate_hosts(self):
if self._spec.get('openapi'): # for openapi v3
servers = self._spec.get('servers',[])
if self._spec.get('openapi'): # for openapi v3
servers = self._spec.get('servers', [])
hosts = []
for server in servers:
host = server.get('url','').removeprefix('http://').removeprefix('http://').removesuffix('/')
host = server.get('url', '').removeprefix(
'http://').removeprefix('http://').removesuffix('/')
host = None if host == '' else host
hosts.append(host)
else:
host = self._spec.get('host') # for swagger files
host = self._spec.get('host') # for swagger files
if not host:
logger.error('Invalid Host: Host is missing')
raise ValueError('Host Not Found in spec file')
hosts = [host]

self.hosts = hosts


def _get_endpoints(self):
'''Returns list of endpoint paths along with HTTP methods allowed'''
endpoints = []
Expand All @@ -57,25 +57,25 @@ def _get_endpoints(self):

def _get_endpoint_details_for_fuzz_test(self):
return self._spec.get('paths')
def _get_param_definition_schema(self, param:dict):

def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
param_schema = param.get('schema')

# replace schema $ref with model params
if param_schema:
param_schema_ref = param_schema.get('$ref')

if param_schema_ref:
model_slug = param_schema_ref.split('/')[-1]
param_schema = self._spec.get('definitions',{}).get(model_slug)
param_schema = self._spec.get(
'definitions', {}).get(model_slug)

return param_schema


def _get_response_definition_schema(self, responses:dict):
def _get_response_definition_schema(self, responses: dict):
'''returns schema of API response
Args:
responses (dict): responses from path http method json data
Expand All @@ -87,13 +87,13 @@ def _get_response_definition_schema(self, responses:dict):
if 'parameters' in status_code_response:
responses[status_code]['schema'] = responses[status_code]['parameters']
elif 'schema' in status_code_response:
responses[status_code]['schema'] = self._get_param_definition_schema(responses[status_code])
else:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code])
else:
continue

return responses


def _get_request_response_params(self):
'''Returns Schema of requests and response params
Expand All @@ -104,31 +104,32 @@ def _get_request_response_params(self):
list:
'''
requests = []
paths = self._spec.get('paths',{})
paths = self._spec.get('paths', {})

# extract endpoints and supported params
for path in paths.keys():
path_params = paths[path].get('parameters',[])
path_params = paths[path].get('parameters', [])

for http_method in paths.get(path,{}).keys():
for http_method in paths.get(path, {}).keys():
# consider only http methods
if http_method not in ['get', 'put', 'post', 'delete', 'options']:
continue


body_parameters = paths[path][http_method].get('parameters',[])
response_params = self._get_response_definition_schema(paths[path][http_method].get('responses',{}))
body_parameters = paths[path][http_method].get(
'parameters', [])
response_params = self._get_response_definition_schema(
paths[path][http_method].get('responses', {}))

# create list of parameters
for param in body_parameters:
param['schema'] = self._get_param_definition_schema(param)

requests.append({
'http_method':http_method,
'path':path,
'request_params':body_parameters,
'response_params':response_params,
'path_params':path_params,
'http_method': http_method,
'path': path,
'request_params': body_parameters,
'response_params': response_params,
'path_params': path_params,
})

return requests
Loading

0 comments on commit 7000f89

Please sign in to comment.