From a8f3010040ce5b8dc14033d2ec151498fe2fbd97 Mon Sep 17 00:00:00 2001 From: RMI78 Date: Fri, 16 Jun 2023 12:45:26 +0000 Subject: [PATCH] Fix empty and multiple values headers --- tests/attack/test_mod_wapp.py | 83 ++++++++++++++++++++++++++++- wapitiCore/wappalyzer/wappalyzer.py | 45 +++++++++++----- 2 files changed, 113 insertions(+), 15 deletions(-) diff --git a/tests/attack/test_mod_wapp.py b/tests/attack/test_mod_wapp.py index 8dfcc0eb4..31c88a48c 100644 --- a/tests/attack/test_mod_wapp.py +++ b/tests/attack/test_mod_wapp.py @@ -141,6 +141,7 @@ async def test_html_detection(): '["Development"], "groups": ["Web development"]}' ) + @pytest.mark.asyncio @respx.mock async def test_dom_detection(): @@ -193,7 +194,6 @@ async def test_dom_detection(): assert arg[1]["info"] in expected_result - @pytest.mark.asyncio @respx.mock async def test_script_detection(): @@ -307,6 +307,87 @@ async def test_headers_detection(): ) +@pytest.mark.asyncio +@respx.mock +async def test_headers_multiple_values_detection(): + # Test if application is detected using its headers regex with multiple values + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + text="Vous Etes Perdu ?

Perdu sur l'Internet ?

\ +

Pas de panique, on va vous aider

\ +
    * <----- vous êtes ici
\ + ", + headers={"Server": "Cherokee/1.3.4", + "Server": "cloudflare"} + ) + ) + + persister = AsyncMock() + home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home" + base_dir = os.path.join(home_dir, ".wapiti") + persister.CONFIG_DIR = os.path.join(base_dir, "config") + + request = Request("http://perdu.com/") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2} + + module = ModuleWapp(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + expected_output = [ + '{"name": "Cherokee", "versions": ["1.3.4"], "categories": ["Web servers"], "groups": ["Servers"]}', + '{"name": "Cloudflare", "versions": [], "categories": ["CDN"], "groups": ["Servers"]}' + + ] + + assert persister.add_payload.call_count + for arg in persister.add_payload.call_args_list: + assert arg[1]['info'] in expected_output + + +@pytest.mark.asyncio +@respx.mock +async def test_headers_empty_values_detection(): + # Test if application is detected using its headers regex with multiple values + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + text="Vous Etes Perdu ?

Perdu sur l'Internet ?

\ +

Pas de panique, on va vous aider

\ +
    * <----- vous êtes ici
\ + ", + headers={"X-CF1": "", + "X-CF2": ""} + ) + ) + + persister = AsyncMock() + home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home" + base_dir = os.path.join(home_dir, ".wapiti") + persister.CONFIG_DIR = os.path.join(base_dir, "config") + + request = Request("http://perdu.com/") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2} + + module = ModuleWapp(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + assert persister.add_payload.call_count + assert persister.add_payload.call_args_list[0][1]["info"] == ( + '{"name": "CacheFly", "versions": [], "categories": ["CDN"], "groups": ["Servers"]}' + ) + + @pytest.mark.asyncio @respx.mock async def test_meta_detection(): diff --git a/wapitiCore/wappalyzer/wappalyzer.py b/wapitiCore/wappalyzer/wappalyzer.py index d696c45ca..02593efd2 100644 --- a/wapitiCore/wappalyzer/wappalyzer.py +++ b/wapitiCore/wappalyzer/wappalyzer.py @@ -3,6 +3,7 @@ import re import warnings from typing import Set +from collections import defaultdict from soupsieve.util import SelectorSyntaxError from wapitiCore.net.crawler import Response @@ -145,11 +146,11 @@ def normalize_application_regex(self): # Format each dom of applications tmp_dom = [] for (key, patterns) in self.applications[application_name]["dom"].items(): - tmp_dom.append(self.normalize_application_regex_dom({key:patterns})) + tmp_dom.append(self.normalize_application_regex_dom({key: patterns})) self.applications[application_name]["dom"] = tmp_dom @staticmethod - def normalize_application_regex_dom(dom : dict) -> dict: + def normalize_application_regex_dom(dom: dict) -> dict: """ Convert the result of wappalyzer to a generic format used by Wapiti. The goal is to match with the css selector if we don't have a regex. @@ -167,7 +168,7 @@ def normalize_application_regex_dom(dom : dict) -> dict: css_selector = list(dom.keys())[0] value = dom[css_selector] if value == "": - return {css_selector : {"exists": ""}} + return {css_selector: {"exists": ""}} return dom @staticmethod @@ -212,6 +213,7 @@ def with_categories(func): """ Return a list of applications with their categories and the versions that can be detected on web content. """ + def wrapper_func(self): versioned_applications = func(self) versioned_and_categorised_applications = versioned_applications @@ -231,6 +233,7 @@ def with_groups(func): Return a list of applications with their categories, their versions & theirs groups that can be detected on web content. """ + def wrapper_func(self): versioned_and_categorised_applications = func(self) applications = versioned_and_categorised_applications @@ -306,10 +309,14 @@ def detect_versions_normalize_dict(rules: dict, contents) -> Set[str]: if key in contents: # regex_params is a list : [{"application_pattern": "..", "regex": "re.compile(..)"}, ...] for i, _ in enumerate(regex_params): - if re.search(regex_params[i]['regex'], contents[key]): - # Use that special string to show we detected the app once but not necessarily a version - versions.add("__detected__") - versions.update(extract_version(regex_params[i], contents[key])) + for content_value in contents[key]: + # If the regex fails, it can be due to the fact that we are looking for the key instead + # The value can be set to the key so we compare + if re.search(regex_params[i]['regex'], content_value) or\ + regex_params[i]['application_pattern'] == key: + # Use that special string to show we detected the app once but not necessarily a version + versions.add("__detected__") + versions.update(extract_version(regex_params[i], content_value)) return versions @@ -330,13 +337,23 @@ def __init__(self, application_data: ApplicationData, web_content: Response, js: self._url = web_content.url self._html_code = web_content.content # Copy some values to make sure they aren't processed more than once + # List based attributes: self.html = Html(self._html_code, self._url) self._scripts = self.html.scripts[:] - self._cookies = dict(web_content.cookies) - self._headers = web_content.headers - self._metas = dict(self.html.metas) self._js = js + # Dict based attributes, for the sake of recording same-attribute-multiple-values, + # the dictionnaries should have a list of items as value + # httpx.Header object stack up values from the same attributes in a string + # when casted in a dictionnary, here is a workaround + self._headers = defaultdict(list) + for attribute, value in web_content.headers.multi_items(): + self._headers[attribute].append(value) + # Cookies can't have multiple values so parsing singletons lists is okay + self._cookies = {key: [value] for key, value in web_content.cookies.items()} + # Same with meta tags + self._metas = {key: [value] for key, value in self.html.metas.items()} + def detect_application_versions(self, application: dict) -> Set[str]: """ Determine whether the web content matches the application regex. @@ -372,12 +389,12 @@ def detect_versions_normalize_dom(self, application: dict) -> Set[str]: self.check_dom_attribute(soup, versions, css_selector, value) return versions - def check_dom_attribute(self, soup, versions : set, css_selector, value): + def check_dom_attribute(self, soup, versions: set, css_selector, value): try: match = soup.select(css_selector) except SelectorSyntaxError as err: warnings.warn( - f"Caught {err} while selecting css selector: {css_selector}") + f"Caught {err} while selecting css selector: {css_selector}") return for attribute, data in value.items(): if attribute == "exists": @@ -389,7 +406,7 @@ def check_dom_attribute(self, soup, versions : set, css_selector, value): self.check_dom_attribute_others(match, versions, data) @staticmethod - def check_dom_attribute_exists(match, versions : set): + def check_dom_attribute_exists(match, versions: set): # if attribute is "exists" we just want to match with the css selector if match: versions.add("__detected__") @@ -412,7 +429,7 @@ def check_dom_attribute_text(match, versions: set, data): @staticmethod def check_dom_attribute_others(match, versions: set, data): - for attribute, value in data.items(): + for attribute, value in data.items(): # if data is empty you just want to match with the css selector and check if the attribute exist if value == "": for match_html in match: