diff --git a/CHANGELOG.md b/CHANGELOG.md index f633836..e7a6186 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,4 +15,9 @@ # v2.0.0 - Upgraded [SSLyze](https://github.com/nabla-c0d3/sslyze) to 3.x - Added several TLS 1.2 ciphers to the "policy" scan type as "weak" -- Added scan type and port to result set \ No newline at end of file +- Added scan type and port to result set + +# v2.1.0 +- Upgraded dnspython to 2.0.x and fixed deprecated call to dns.resolver.query() +- Upgraded validators to 0.17 +- Various pylint and type checking fixes \ No newline at end of file diff --git a/SSLChecker/SSLChecker/main.py b/SSLChecker/SSLChecker/main.py index 8bbd178..6ddb77f 100644 --- a/SSLChecker/SSLChecker/main.py +++ b/SSLChecker/SSLChecker/main.py @@ -80,6 +80,7 @@ def pre_scan_check(req: func.HttpRequest) -> Tuple[str, str, str, int, str]: scan_type = req.route_params.get('scan') view = req.route_params.get('view') target = req.route_params.get('target') + # Default to TCP 443 if no port is passed port = req.route_params.get('port', '443') port = verify_port(port) @@ -105,6 +106,8 @@ def pre_scan_check(req: func.HttpRequest) -> Tuple[str, str, str, int, str]: ERROR_MSG_MISSING_DNS_SERVER) # See if a valid IP was passed, else check if it was a valid DNS name + ip = "" + try: if ipaddress.IPv4Address(target): ip = target @@ -116,7 +119,11 @@ def pre_scan_check(req: func.HttpRequest) -> Tuple[str, str, str, int, str]: """ Try to resolve the DNS name to an IP to ensure it exists. We use the IP in the scan so that we can record which one we tested which can be useful. """ - ip = shared_dns.resolve_dns(DNSVIEW.get(view), target) + + # Ignore type error on get(key) as it defaults to None + # https://docs.python.org/3/library/stdtypes.html#dict.get + # I check that the key exists + ip = shared_dns.resolve_dns(DNSVIEW.get(view), target) # type: ignore return scan_type, view, target, port, ip @@ -174,7 +181,7 @@ def query_scanner_precheck(url: str, target = params['target'] scan_type = 'full' - port = '443' + port = 443 nameserver = None for param in params: @@ -198,6 +205,8 @@ def query_scanner_precheck(url: str, nameserver = INTERNAL_DNS # See if a valid IP was passed, else check if it was a valid DNS name + ip = "" + try: if ipaddress.IPv4Address(target): ip = target diff --git a/SSLChecker/requirements.txt b/SSLChecker/requirements.txt index a7d3fcb..5dddcfe 100644 --- a/SSLChecker/requirements.txt +++ b/SSLChecker/requirements.txt @@ -1,7 +1,7 @@ azure-functions -dnspython==1.16.0 +dnspython==2.00.0 requests==2.22.0 sslyze==3.0.8 -validators==0.15.0 +validators==0.17.1 typing-extensions==3.7.4.2 pytest==5.4.3 diff --git a/SSLChecker/sharedcode/scanner.py b/SSLChecker/sharedcode/scanner.py index 71954c0..485c3e4 100644 --- a/SSLChecker/sharedcode/scanner.py +++ b/SSLChecker/sharedcode/scanner.py @@ -83,8 +83,11 @@ def scan(target, ip, port, view, suite): results.set_result(scan_output, "View", view) scanner = Scanner() + # Ignore type error on get(key) as it defaults to None + # https://docs.python.org/3/library/stdtypes.html#dict.get + # We supply the values in the dict server_scan_req = ServerScanRequest( - server_info=server_info, scan_commands=CIPHER_SUITES.get(suite) + server_info=server_info, scan_commands=CIPHER_SUITES.get(suite) # type: ignore ) scanner.queue_scan(server_scan_req) diff --git a/SSLChecker/sharedcode/shared_dns.py b/SSLChecker/sharedcode/shared_dns.py index c43491d..864dfef 100644 --- a/SSLChecker/sharedcode/shared_dns.py +++ b/SSLChecker/sharedcode/shared_dns.py @@ -45,18 +45,20 @@ def _init_resolver(dnsserver: ip, timeout: int, lifetime: int) -> resolver.Resol def resolve_dns( - dnsserver: ip, dnsname: fqdn, timeout: int = 3, lifetime: int = 3 -) -> ip: + dnsserver: ip, dnsname: fqdn, + timeout: int = 3, lifetime: int = 3 + ) -> ip: """ Resolve dns name """ _iplist = [] # results res = _init_resolver(dnsserver, timeout, lifetime) try: - answers = res.query(dnsname, "A") # explicit query for A record - for answer in answers: + answers = res.resolve(dnsname, search=False) # explicit query for A record + for answer in answers.rrset: _iplist.append(answer.address) return _iplist[0] # Return the first IP of the DNS Answer + except resolver.NoAnswer: raise DNSError( "DNS No Answer", f"No Answer for {dnsname} using nameserver {dnsserver}" @@ -98,7 +100,7 @@ def parse_name(name: str) -> fqdn: dns_name_candidate = parsed_name.path # The below ensures a valid domain was supplied - if domain(dns_name_candidate): - return dns_name_candidate - else: + if not domain(dns_name_candidate): raise InvalidFQDN("Invalid FQDN", f"{name} is not a valid FQDN") + + return dns_name_candidate diff --git a/tests/test_SSLChecker.py b/tests/test_SSLChecker.py index b76df48..aec0a85 100644 --- a/tests/test_SSLChecker.py +++ b/tests/test_SSLChecker.py @@ -1,3 +1,5 @@ +""" SSLChecker pytest tests """ + import json import azure.functions as func @@ -8,15 +10,16 @@ def test_policy_external_no_violations(): - # Construct a mock HTTP request + """ Test policy scan on an external host with no violations """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': 'api.metlife.com'} - ) + method='GET', + body=b'', + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': 'api.metlife.com'} + ) # Call the function resp = main(req) @@ -29,15 +32,15 @@ def test_policy_external_no_violations(): def test_full_external(): - # Construct a mock HTTP request + """ Test full scan on an external host """ req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'full', - 'view': 'external', - 'target': 'github.com'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'full', + 'view': 'external', + 'target': 'github.com'} + ) # Call the function resp = main(req) @@ -50,15 +53,16 @@ def test_full_external(): def test_policy_external_violations(): - # Construct a mock HTTP request + """ Test policy scan on an external host with violations """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': 'espn.com'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': 'espn.com'} + ) # Call the function. resp = main(req) @@ -71,15 +75,16 @@ def test_policy_external_violations(): def test_external_dns_name_not_resolved(): - # Construct a mock HTTP request + """ Test dns name not resolved """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': 'joegatt.com'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': 'joegatt.com'} + ) # Call the function. resp = main(req) @@ -92,15 +97,16 @@ def test_external_dns_name_not_resolved(): def test_external_dns_name_not_exist(): - # Construct a mock HTTP request + """ Test NXDOMAIN """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': 'jeogatt.com'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': 'jeogatt.com'} + ) # Call the function. resp = main(req) @@ -112,16 +118,17 @@ def test_external_dns_name_not_exist(): def test_external_sslyze_timeout(): - # Construct a mock HTTP request + """ Test sslyze timeout """ + name = 'bbbbbbbbbbbbbbb.com' req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': name} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': name} + ) # Call the function resp = main(req) @@ -133,16 +140,17 @@ def test_external_sslyze_timeout(): assert results["Message"] == f'TCP connection to {name}:443 timed-out' -def test_external_missing_dns_name(): - # Construct a mock HTTP request +def test_external_missing_target(): + """ Test a request with a missing hostname """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': None} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': None} + ) # Call the function. resp = main(req) @@ -156,16 +164,17 @@ def test_external_missing_dns_name(): def test_bad_dns_view_input(): - # Construct a mock HTTP request + """ Test bad dns view input """ + view_name = 'badinput' req = func.HttpRequest( - method='GET', - body=None, - url=f'/api/', - route_params={'scan': 'policy', - 'view': view_name, - 'target': 'microsoft.com'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': view_name, + 'target': 'microsoft.com'} + ) # Call the function. resp = main(req) @@ -178,17 +187,18 @@ def test_bad_dns_view_input(): assert results["Message"] == _main.ERROR_MSG_INVALID_VIEW -def test_bad_policy_input(): - # Construct a mock HTTP request - policy_type = 'pppppp' +def test_bad_scan_type_input(): + """ Test bad scan type input """ + + scan_type = 'pppppp' req = func.HttpRequest( - method='GET', - body=None, - url=f'/api/', - route_params={'scan': policy_type, - 'view': 'external', - 'target': 'microsoft.com'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': scan_type, + 'view': 'external', + 'target': 'microsoft.com'} + ) # Call the function. resp = main(req) @@ -197,20 +207,21 @@ def test_bad_policy_input(): results = json.loads(resp) # Ensure error handling is working properly - assert results["Error Type"] == f"Invalid scanner type '{policy_type}'" + assert results["Error Type"] == f"Invalid scanner type '{scan_type}'" assert results["Message"] == _main.ERROR_MSG_INVALID_SCANNER_TYPE def test_missing_dns_view(): - # Construct a mock HTTP request + """ Test not dns view input """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': None, - 'target': None} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': None, + 'target': None} + ) # Call the function. resp = main(req) @@ -223,17 +234,18 @@ def test_missing_dns_view(): assert results["Message"] == _main.ERROR_MSG_MISSING_PARAMETERS -def test_bad_dns_name(): - # Construct a mock HTTP request +def test_invalid_dns_name(): + """ Test invalid dns name input """ + dns_name = 'bbbbbbbbb' req = func.HttpRequest( - method='GET', - body=None, - url=f'/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': dns_name} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': dns_name} + ) # Call the function. resp = main(req) @@ -247,15 +259,16 @@ def test_bad_dns_name(): def test_missing_policy_view_dns_name(): - # Construct a mock HTTP request + """ Test missing scan, view, and target """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': None, - 'view': None, - 'target': None} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': None, # type: ignore + 'view': None, + 'target': None} + ) # Call the function. resp = main(req) @@ -269,18 +282,19 @@ def test_missing_policy_view_dns_name(): def test_external_bad_port(): - # Construct a mock HTTP request + """ Test bad port input """ + dns_name = 'yahoo.com' port = 'a' req = func.HttpRequest( - method='GET', - body=None, - url=f'/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': dns_name, - 'port': port} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': dns_name, + 'port': port} + ) # Call the function resp = main(req) @@ -294,18 +308,19 @@ def test_external_bad_port(): def test_external_port_timeout(): - # Construct a mock HTTP request + """ Test timeout connecting to a port """ + dns_name = 'yahoo.com' port = '8443' req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': dns_name, - 'port': '8443'} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': dns_name, + 'port': '8443'} + ) # Call the function resp = main(req) @@ -319,17 +334,18 @@ def test_external_port_timeout(): def test_external_port_not_in_range(): - # Construct a mock HTTP request + """ Test port not in valid range """ + port = '123456' req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': 'espn.com', - 'port': port} - ) + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': 'espn.com', + 'port': port} + ) # Call the function resp = main(req) @@ -343,10 +359,12 @@ def test_external_port_not_in_range(): def test_query_api(): + """ Test query api """ + req = func.HttpRequest( method='GET', - body=None, - url=f'/api/tls', + body=b"", + url='/api/tls', params={'target': 'www.google.com', 'nameserver': '8.8.8.8'} ) resp = main(req) @@ -354,9 +372,11 @@ def test_query_api(): def test_query_api_by_ip(): + """ Test query api by ip """ + req = func.HttpRequest( method='GET', - body=None, + body=b"", url='/api/tls', params={'target': '140.82.113.4', 'nameserver': '8.8.8.8'} ) @@ -365,9 +385,11 @@ def test_query_api_by_ip(): def test_query_api_error_handling(): + """ Test missing target """ + req = func.HttpRequest( method='GET', - body=None, + body=b"", url='/api/tls', params={'nameserver': '8.8.8.8'} ) @@ -377,13 +399,15 @@ def test_query_api_error_handling(): def test_policy_external_by_ip_no_violations(): + """ Test policy scan on an external ip with no violations """ + req = func.HttpRequest( - method='GET', - body=None, - url='/api/', - route_params={'scan': 'policy', - 'view': 'external', - 'target': '216.163.251.205'} + method='GET', + body=b"", + url='/api/', + route_params={'scan': 'policy', + 'view': 'external', + 'target': '216.163.251.205'} ) # Call the function. resp = main(req)