Skip to content

Commit

Permalink
Fix empty and multiple values headers
Browse files Browse the repository at this point in the history
  • Loading branch information
RMI78 committed Jun 16, 2023
1 parent 9c91017 commit a8f3010
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 15 deletions.
83 changes: 82 additions & 1 deletion tests/attack/test_mod_wapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ async def test_html_detection():
'["Development"], "groups": ["Web development"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_dom_detection():
Expand Down Expand Up @@ -193,7 +194,6 @@ async def test_dom_detection():
assert arg[1]["info"] in expected_result



@pytest.mark.asyncio
@respx.mock
async def test_script_detection():
Expand Down Expand Up @@ -307,6 +307,87 @@ async def test_headers_detection():
)


@pytest.mark.asyncio
@respx.mock
async def test_headers_multiple_values_detection():
# Test if application is detected using its headers regex with multiple values
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
text="<html><head><title>Vous Etes Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong> \
</body></html>",
headers={"Server": "Cherokee/1.3.4",
"Server": "cloudflare"}
)
)

persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2}

module = ModuleWapp(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

expected_output = [
'{"name": "Cherokee", "versions": ["1.3.4"], "categories": ["Web servers"], "groups": ["Servers"]}',
'{"name": "Cloudflare", "versions": [], "categories": ["CDN"], "groups": ["Servers"]}'

]

assert persister.add_payload.call_count
for arg in persister.add_payload.call_args_list:
assert arg[1]['info'] in expected_output


@pytest.mark.asyncio
@respx.mock
async def test_headers_empty_values_detection():
# Test if application is detected using its headers regex with multiple values
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
text="<html><head><title>Vous Etes Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong> \
</body></html>",
headers={"X-CF1": "",
"X-CF2": ""}
)
)

persister = AsyncMock()
home_dir = os.getenv("HOME") or os.getenv("USERPROFILE") or "/home"
base_dir = os.path.join(home_dir, ".wapiti")
persister.CONFIG_DIR = os.path.join(base_dir, "config")

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2}

module = ModuleWapp(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "CacheFly", "versions": [], "categories": ["CDN"], "groups": ["Servers"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_meta_detection():
Expand Down
45 changes: 31 additions & 14 deletions wapitiCore/wappalyzer/wappalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import warnings
from typing import Set
from collections import defaultdict
from soupsieve.util import SelectorSyntaxError

from wapitiCore.net.crawler import Response
Expand Down Expand Up @@ -145,11 +146,11 @@ def normalize_application_regex(self):
# Format each dom of applications
tmp_dom = []
for (key, patterns) in self.applications[application_name]["dom"].items():
tmp_dom.append(self.normalize_application_regex_dom({key:patterns}))
tmp_dom.append(self.normalize_application_regex_dom({key: patterns}))
self.applications[application_name]["dom"] = tmp_dom

@staticmethod
def normalize_application_regex_dom(dom : dict) -> dict:
def normalize_application_regex_dom(dom: dict) -> dict:
"""
Convert the result of wappalyzer to a generic format used by Wapiti.
The goal is to match with the css selector if we don't have a regex.
Expand All @@ -167,7 +168,7 @@ def normalize_application_regex_dom(dom : dict) -> dict:
css_selector = list(dom.keys())[0]
value = dom[css_selector]
if value == "":
return {css_selector : {"exists": ""}}
return {css_selector: {"exists": ""}}
return dom

@staticmethod
Expand Down Expand Up @@ -212,6 +213,7 @@ def with_categories(func):
"""
Return a list of applications with their categories and the versions that can be detected on web content.
"""

def wrapper_func(self):
versioned_applications = func(self)
versioned_and_categorised_applications = versioned_applications
Expand All @@ -231,6 +233,7 @@ def with_groups(func):
Return a list of applications with their categories, their versions & theirs groups
that can be detected on web content.
"""

def wrapper_func(self):
versioned_and_categorised_applications = func(self)
applications = versioned_and_categorised_applications
Expand Down Expand Up @@ -306,10 +309,14 @@ def detect_versions_normalize_dict(rules: dict, contents) -> Set[str]:
if key in contents:
# regex_params is a list : [{"application_pattern": "..", "regex": "re.compile(..)"}, ...]
for i, _ in enumerate(regex_params):
if re.search(regex_params[i]['regex'], contents[key]):
# Use that special string to show we detected the app once but not necessarily a version
versions.add("__detected__")
versions.update(extract_version(regex_params[i], contents[key]))
for content_value in contents[key]:
# If the regex fails, it can be due to the fact that we are looking for the key instead
# The value can be set to the key so we compare
if re.search(regex_params[i]['regex'], content_value) or\
regex_params[i]['application_pattern'] == key:
# Use that special string to show we detected the app once but not necessarily a version
versions.add("__detected__")
versions.update(extract_version(regex_params[i], content_value))

return versions

Expand All @@ -330,13 +337,23 @@ def __init__(self, application_data: ApplicationData, web_content: Response, js:
self._url = web_content.url
self._html_code = web_content.content
# Copy some values to make sure they aren't processed more than once
# List based attributes:
self.html = Html(self._html_code, self._url)
self._scripts = self.html.scripts[:]
self._cookies = dict(web_content.cookies)
self._headers = web_content.headers
self._metas = dict(self.html.metas)
self._js = js

# Dict based attributes, for the sake of recording same-attribute-multiple-values,
# the dictionnaries should have a list of items as value
# httpx.Header object stack up values from the same attributes in a string
# when casted in a dictionnary, here is a workaround
self._headers = defaultdict(list)
for attribute, value in web_content.headers.multi_items():
self._headers[attribute].append(value)
# Cookies can't have multiple values so parsing singletons lists is okay
self._cookies = {key: [value] for key, value in web_content.cookies.items()}
# Same with meta tags
self._metas = {key: [value] for key, value in self.html.metas.items()}

def detect_application_versions(self, application: dict) -> Set[str]:
"""
Determine whether the web content matches the application regex.
Expand Down Expand Up @@ -372,12 +389,12 @@ def detect_versions_normalize_dom(self, application: dict) -> Set[str]:
self.check_dom_attribute(soup, versions, css_selector, value)
return versions

def check_dom_attribute(self, soup, versions : set, css_selector, value):
def check_dom_attribute(self, soup, versions: set, css_selector, value):
try:
match = soup.select(css_selector)
except SelectorSyntaxError as err:
warnings.warn(
f"Caught {err} while selecting css selector: {css_selector}")
f"Caught {err} while selecting css selector: {css_selector}")
return
for attribute, data in value.items():
if attribute == "exists":
Expand All @@ -389,7 +406,7 @@ def check_dom_attribute(self, soup, versions : set, css_selector, value):
self.check_dom_attribute_others(match, versions, data)

@staticmethod
def check_dom_attribute_exists(match, versions : set):
def check_dom_attribute_exists(match, versions: set):
# if attribute is "exists" we just want to match with the css selector
if match:
versions.add("__detected__")
Expand All @@ -412,7 +429,7 @@ def check_dom_attribute_text(match, versions: set, data):

@staticmethod
def check_dom_attribute_others(match, versions: set, data):
for attribute, value in data.items():
for attribute, value in data.items():
# if data is empty you just want to match with the css selector and check if the attribute exist
if value == "":
for match_html in match:
Expand Down

0 comments on commit a8f3010

Please sign in to comment.