Skip to content

Commit

Permalink
Merge pull request sophos#65 from sophos/feature/CPLAT-37648
Browse files Browse the repository at this point in the history
[Feature/CPLAT-37648] Validate the dhost value and convert to valid fqdn format when dhost not contain valid fqdn value
  • Loading branch information
ramksophos authored Dec 9, 2021
2 parents e1577a0 + 1de0aeb commit 48c17d2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
10 changes: 10 additions & 0 deletions siem.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@
logging.basicConfig(format="%(message)s")


def is_valid_fqdn(fqdn):
fqdn = fqdn.strip()
fqdn = fqdn[:-1] if fqdn.endswith(".") else fqdn # chomp trailing period
return fqdn and len(fqdn) < 256 and all(part and len(part) < 64 and re.match(r"^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$", part) for part in fqdn.split("."))

def convert_to_valid_fqdn(value):
return ".".join([re.sub("[^-a-z0-9]+", "-", x.strip()).strip("-") for x in value.lower().split(".") if x.strip()])

def write_json_format(results):
"""Write JSON format data.
Arguments:
Expand Down Expand Up @@ -220,6 +228,8 @@ def update_cef_keys(data):
new_key = CEF_MAPPING.get(key, key)
if new_key == key:
continue
if new_key == "dhost" and not is_valid_fqdn(value):
value = convert_to_valid_fqdn(value)
data[new_key] = value
del data[key]

Expand Down
53 changes: 52 additions & 1 deletion tests/unit/test_siem.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import os
import siem
import sys
import unittest

from mock import MagicMock
Expand Down Expand Up @@ -113,6 +112,9 @@ def test_update_cef_keys(self):
self.assertEqual(same_key_value_data, {"name": "test_name"})
siem.update_cef_keys(different_key_value_data)
self.assertEqual(different_key_value_data, {"type": "test_type"})
invalid_host_key_value_data = {"location": "John's MacBook"}
siem.update_cef_keys(invalid_host_key_value_data)
self.assertEqual(invalid_host_key_value_data, {"dhost": "john-s-macbook"})

def test_format_cef(self):
data = {
Expand Down Expand Up @@ -200,3 +202,52 @@ def test_validate_endpoint(self):
"Invalid endpoint in config.ini, endpoint can be event, alert or all"
in str(context.exception)
)

def test_is_valid_fqdn(self):
valid = [
("foo.com", "dot separated alpha"),
(" foo.com", "leading space is stripped"),
("foo.bar ", "trailing space is stripped"),
("FOO.bar", "case doesn't matter"),
("foo.tld.", "trailing dot allowed"),
("a", "min length"),
("a-b", "min with hyphen"),
("f11-bar.baz-w1BBle", "alphanum parts can be hyphenated"),
("f11-bar.baz-w1BBle-Quux", "multiple hyphens allowed in a part"),
("a" * 63 + ("." + ("a" * 63)) * 3, "max length 255; max part length 63"),
]

for fqdn, message in valid:
self.assertTrue(siem.is_valid_fqdn(fqdn), f"Check is valid fqdn {fqdn} ({message})")

invalid = [
("", "can't be empty"),
(" ", "can't be whitespace"),
("a" * 64, "max part length should be 63"),
("a" * 63 + ("." + ("a" * 63)) * 4 , "max length should be 255"),
("f22-.abc", "hyphen must be in middle of alphanum part"),
]
for fqdn, message in invalid:
self.assertFalse(siem.is_valid_fqdn(fqdn), f"Check invalid fqdn {fqdn} ({message})")

def test_convert_to_valid_fqdn(self):
needs_fixing = [
("Foo.com", "foo.com", "lowercase"),
("foo.bar.", "foo.bar", "trailing dot is removed"),
("foo..bar....baz", "foo.bar.baz", "multiple dots are replaced with one"),
("foo&!(12.baz", "foo-12.baz", "multiple non alphanum chars are replaced with a hyphen"),
("foo&!(.baz", "foo.baz", "no trailing hyphen in part after replacement"),
]
for fqdn, fixed, message in needs_fixing:
self.assertEqual(fixed, siem.convert_to_valid_fqdn(fqdn), f"Check conversion {fqdn} ({message})")
self.assertTrue(siem.is_valid_fqdn(fixed))

already_good = [
"a",
"foo",
"foo-bar",
"foo-bar.baz-wibble",
"foo-bar.baz-wibble-quux",
]
for fqdn in already_good:
self.assertEqual(fqdn, siem.convert_to_valid_fqdn(fqdn), f"Check doesn't need conversion {fqdn}")

0 comments on commit 48c17d2

Please sign in to comment.