Skip to content

Commit

Permalink
auto format fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sim0nx committed Dec 28, 2023
1 parent 36cf683 commit 5e1b9fa
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 88 deletions.
1 change: 0 additions & 1 deletion eml_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

"""eml_parser serves as a python module for parsing eml files and returning various \
information found in the e-mail as well as computed information.
"""
Expand Down
134 changes: 73 additions & 61 deletions eml_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,20 @@ class EmlParser:
"""eml-parser class."""

# pylint: disable=too-many-arguments
def __init__(self,
include_raw_body: bool = False,
include_attachment_data: bool = False,
pconf: typing.Optional[dict] = None,
policy: typing.Optional[email.policy.Policy] = None,
ignore_bad_start: bool = False,
email_force_tld: bool = False,
domain_force_tld: bool = False,
ip_force_routable: bool = False,
parse_attachments: bool = True,
include_www: bool = True,
include_href: bool = True,
) -> None:
def __init__(
self,
include_raw_body: bool = False,
include_attachment_data: bool = False,
pconf: typing.Optional[dict] = None,
policy: typing.Optional[email.policy.Policy] = None,
ignore_bad_start: bool = False,
email_force_tld: bool = False,
domain_force_tld: bool = False,
ip_force_routable: bool = False,
parse_attachments: bool = True,
include_www: bool = True,
include_href: bool = True,
) -> None:
"""Initialisation.
Args:
Expand Down Expand Up @@ -382,8 +383,7 @@ def parse_email(self) -> dict:
headers_struc['received'].append(parsed_routing)

# Parse IPs in "received headers"
ips_in_received_line = eml_parser.regexes.ipv6_regex.findall(received_line_flat) + \
eml_parser.regexes.ipv4_regex.findall(received_line_flat)
ips_in_received_line = eml_parser.regexes.ipv6_regex.findall(received_line_flat) + eml_parser.regexes.ipv4_regex.findall(received_line_flat)
for ip in ips_in_received_line:
if ip in self.pconf['whiteip']:
continue
Expand Down Expand Up @@ -472,7 +472,6 @@ def parse_email(self) -> dict:
# if more than 4K.. lets cheat, we will cut around the thing we search "://, @, ."
# in order to reduce regex complexity.
for body_slice in self.string_sliding_window_loop(body):

for url_match in self.get_uri_ondata(body_slice):
if ':/' in url_match[:10]:
list_observed_urls.append(url_match)
Expand Down Expand Up @@ -684,29 +683,29 @@ def string_sliding_window_loop(body: str, slice_step: int = 500, max_distance: i
ptr_start = 0

for ptr_end in range(slice_step, body_length + slice_step, slice_step):
if ' ' in body[ptr_end - 1:ptr_end]:
while not (eml_parser.regexes.window_slice_regex.match(body[ptr_end - 1:ptr_end]) or ptr_end > body_length):
if ' ' in body[ptr_end - 1 : ptr_end]:
while not (eml_parser.regexes.window_slice_regex.match(body[ptr_end - 1 : ptr_end]) or ptr_end > body_length):
if ptr_end > body_length:
ptr_end = body_length
break

ptr_end += 1

# Found a :// near the start of the slice, rewind
if ptr_start > 16 and '://' in body[ptr_start - 8:ptr_start + 8]:
if ptr_start > 16 and '://' in body[ptr_start - 8 : ptr_start + 8]:
ptr_start -= 16

# Found a :// near the end of the slice, rewind from that location
if ptr_end < body_length and '://' in body[ptr_end - 8:ptr_end + 8]:
if ptr_end < body_length and '://' in body[ptr_end - 8 : ptr_end + 8]:
pos = body.rfind('://', ptr_end - 8, ptr_end + 8)
ptr_end = pos - 8

# Found a :// within the slice; try to expand the slice until we find an invalid
# URL character in order to avoid cutting off URLs
if '://' in body[ptr_start:ptr_end] and not body[ptr_end - 1:ptr_end] == ' ':
if '://' in body[ptr_start:ptr_end] and not body[ptr_end - 1 : ptr_end] == ' ':
distance = 1

while body[ptr_end - 1:ptr_end] not in (' ', '>') and distance < max_distance and ptr_end <= body_length:
while body[ptr_end - 1 : ptr_end] not in (' ', '>') and distance < max_distance and ptr_end <= body_length:
distance += 1
ptr_end += 1

Expand Down Expand Up @@ -758,7 +757,7 @@ def clean_found_uri(self, url: str) -> typing.Optional[str]:

try:
# Remove leading spaces and quote characters
url = url.lstrip(' \t\n\r\f\v\'\"«»“”‘’').replace('\r', '').replace('\n', '')
url = url.lstrip(' \t\n\r\f\v\'"«»“”‘’').replace('\r', '').replace('\n', '')
url = urllib.parse.urlparse(url).geturl()
scheme_url = url
if ':/' not in scheme_url:
Expand All @@ -778,7 +777,7 @@ def clean_found_uri(self, url: str) -> typing.Optional[str]:
return None

# let's try to be smart by stripping of noisy bogus parts
url = re.split(r'''[', ")}\\]''', url, 1)[0]
url = re.split(r"""[', ")}\\]""", url, 1)[0]

# filter bogus URLs
if url.endswith('://'):
Expand Down Expand Up @@ -887,15 +886,15 @@ def get_raw_body_text(self, msg: email.message.Message, boundary: typing.Optiona
try:
filename = msg.get_filename('').lower()
except (binascii.Error, AssertionError):
logger.exception(
'Exception occurred while trying to parse the content-disposition header. Collected data will not be complete.')
logger.exception('Exception occurred while trying to parse the content-disposition header. Collected data will not be complete.')
filename = ''

# pylint: disable=too-many-boolean-expressions
if ('content-disposition' not in msg and msg.get_content_maintype() == 'text') \
or (filename.endswith('.html') or filename.endswith('.htm')) \
or ('content-disposition' in msg and msg.get_content_disposition() == 'inline'
and msg.get_content_maintype() == 'text'):
if (
('content-disposition' not in msg and msg.get_content_maintype() == 'text')
or (filename.endswith('.html') or filename.endswith('.htm'))
or ('content-disposition' in msg and msg.get_content_disposition() == 'inline' and msg.get_content_maintype() == 'text')
):
encoding = msg.get('content-transfer-encoding', '').lower()

charset = msg.get_content_charset()
Expand Down Expand Up @@ -975,8 +974,7 @@ def traverse_multipart(self, msg: email.message.Message, counter: int = 0) -> ty
if 'content-type' in msg:
if msg.get_content_type() == 'message/rfc822':
# This is an e-mail message attachment, add it to the attachment list apart from parsing it
attachments.update(
self.prepare_multipart_part_attachment(msg, counter))
attachments.update(self.prepare_multipart_part_attachment(msg, counter))

for part in msg.get_payload():
attachments.update(self.traverse_multipart(part, counter))
Expand Down Expand Up @@ -1008,15 +1006,13 @@ def prepare_multipart_part_attachment(self, msg: email.message.Message, counter:
lower_keys = [k.lower() for k in msg.keys()]
msg.policy = former_policy

if ('content-disposition' in lower_keys and msg.get_content_disposition() != 'inline') \
or msg.get_content_maintype() != 'text':
if ('content-disposition' in lower_keys and msg.get_content_disposition() != 'inline') or msg.get_content_maintype() != 'text':
# if it's an attachment-type, pull out the filename
# and calculate the size in bytes
if msg.get_content_type() == 'message/rfc822':
payload = msg.get_payload()
if len(payload) > 1:
logger.warning(
'More than one payload for "message/rfc822" part detected. This is not supported, please report!')
logger.warning('More than one payload for "message/rfc822" part detected. This is not supported, please report!')

try:
custom_policy = email.policy.default.clone(max_line_length=0)
Expand Down Expand Up @@ -1097,9 +1093,16 @@ def get_mime_type(data: bytes) -> typing.Union[typing.Tuple[str, str], typing.Tu
return detected.name, detected.mime_type


def decode_email(eml_file: str, include_raw_body: bool = False, include_attachment_data: bool = False,
pconf: typing.Optional[dict] = None, policy: email.policy.Policy = email.policy.default,
ignore_bad_start: bool = False, email_force_tld: bool = False, parse_attachments: bool = True) -> dict:
def decode_email(
eml_file: str,
include_raw_body: bool = False,
include_attachment_data: bool = False,
pconf: typing.Optional[dict] = None,
policy: email.policy.Policy = email.policy.default,
ignore_bad_start: bool = False,
email_force_tld: bool = False,
parse_attachments: bool = True,
) -> dict:
"""Function for decoding an EML file into an easily parsable structure.
Some intelligence is applied while parsing the file in order to work around
Expand Down Expand Up @@ -1142,20 +1145,28 @@ def decode_email(eml_file: str, include_raw_body: bool = False, include_attachme
with open(eml_file, 'rb') as fp:
raw_email = fp.read()

return decode_email_b(eml_file=raw_email,
include_raw_body=include_raw_body,
include_attachment_data=include_attachment_data,
pconf=pconf,
policy=policy,
ignore_bad_start=ignore_bad_start,
email_force_tld=email_force_tld,
parse_attachments=parse_attachments)


def decode_email_b(eml_file: bytes, include_raw_body: bool = False, include_attachment_data: bool = False,
pconf: typing.Optional[dict] = None, policy: email.policy.Policy = email.policy.default,
ignore_bad_start: bool = False, email_force_tld: bool = False,
parse_attachments: bool = True) -> dict:
return decode_email_b(
eml_file=raw_email,
include_raw_body=include_raw_body,
include_attachment_data=include_attachment_data,
pconf=pconf,
policy=policy,
ignore_bad_start=ignore_bad_start,
email_force_tld=email_force_tld,
parse_attachments=parse_attachments,
)


def decode_email_b(
eml_file: bytes,
include_raw_body: bool = False,
include_attachment_data: bool = False,
pconf: typing.Optional[dict] = None,
policy: email.policy.Policy = email.policy.default,
ignore_bad_start: bool = False,
email_force_tld: bool = False,
parse_attachments: bool = True,
) -> dict:
"""Function for decoding an EML file into an easily parsable structure.
Some intelligence is applied while parsing the file in order to work around
Expand Down Expand Up @@ -1195,13 +1206,14 @@ def decode_email_b(eml_file: bytes, include_raw_body: bool = False, include_atta
"""
warnings.warn('You are using a deprecated method, please use the EmlParser class instead.', DeprecationWarning)

ep = EmlParser(include_raw_body=include_raw_body,
include_attachment_data=include_attachment_data,
pconf=pconf,
policy=policy,
ignore_bad_start=ignore_bad_start,
email_force_tld=email_force_tld,
parse_attachments=parse_attachments,
)
ep = EmlParser(
include_raw_body=include_raw_body,
include_attachment_data=include_attachment_data,
pconf=pconf,
policy=policy,
ignore_bad_start=ignore_bad_start,
email_force_tld=email_force_tld,
parse_attachments=parse_attachments,
)

return ep.decode_email_bytes(eml_file)
62 changes: 40 additions & 22 deletions eml_parser/regexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@

# regex compilation
# W3C HTML5 standard recommended regex for e-mail validation
email_regex = re.compile(r'''([a-zA-Z0-9.!#$%&'*+-/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*)''', re.MULTILINE)
email_force_tld_regex = re.compile(r'''([a-zA-Z0-9.!#$%&'*+-/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)+)''', re.MULTILINE)
email_regex = re.compile(r"""([a-zA-Z0-9.!#$%&'*+-/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*)""", re.MULTILINE)
email_force_tld_regex = re.compile(r"""([a-zA-Z0-9.!#$%&'*+-/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)+)""", re.MULTILINE)

# regex for detecting RFC2047 encodings - used from https://dmorgan.info/posts/encoded-word-syntax/
email_regex_rfc2047 = re.compile(r'''=\?{1}([\w\S]+)\?{1}([B|Q|b|q])\?{1}([\w\S]+)\?{1}=''')
email_regex_rfc2047 = re.compile(r"""=\?{1}([\w\S]+)\?{1}([B|Q|b|q])\?{1}([\w\S]+)\?{1}=""")

recv_dom_regex = re.compile(r'''(?:(?:from|by)\s+)([a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]{2,})+)''', re.MULTILINE)
recv_dom_regex = re.compile(r"""(?:(?:from|by)\s+)([a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]{2,})+)""", re.MULTILINE)

dom_regex = re.compile(r'''(?:^|[\s(/<>|@'=])([a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]{2,})+)(?=$|[\?\s#&/<>')])''', re.MULTILINE)
dom_regex = re.compile(r"""(?:^|[\s(/<>|@'=])([a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]{2,})+)(?=$|[\?\s#&/<>')])""", re.MULTILINE)

ipv4_regex = re.compile(r'''(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})''')
ipv4_regex = re.compile(r"""(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})""")

# From https://gist.github.com/mnordhoff/2213179 : IPv6 with zone ID (RFC 6874)
ipv6_regex = re.compile(r'''((?:[0-9a-f]{1,4}:){6}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9a-f]{1,4}:){5}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:){4}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9a-f]{1,4}:[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:){3}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,2}[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:){2}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,3}[0-9a-f]{1,4})?::[0-9a-f]{1,4}:(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,4}[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,5}[0-9a-f]{1,4})?::[0-9a-f]{1,4}|(?:(?:[0-9a-f]{1,4}:){,6}[0-9a-f]{1,4})?::)''', flags=re.IGNORECASE)
ipv6_regex = re.compile(
r"""((?:[0-9a-f]{1,4}:){6}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9a-f]{1,4}:){5}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:){4}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9a-f]{1,4}:[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:){3}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,2}[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:){2}(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,3}[0-9a-f]{1,4})?::[0-9a-f]{1,4}:(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,4}[0-9a-f]{1,4})?::(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9a-f]{1,4}:){,5}[0-9a-f]{1,4})?::[0-9a-f]{1,4}|(?:(?:[0-9a-f]{1,4}:){,6}[0-9a-f]{1,4})?::)""",
flags=re.IGNORECASE,
)

# simple version for searching for URLs
# character set based on http://tools.ietf.org/html/rfc3986
Expand All @@ -44,7 +47,8 @@
url_regex_www_comma = re.compile(r',(?=https?|ftps?|www\d{0,3})', flags=re.IGNORECASE)

if re.__name__ == 're2':
url_regex_simple = re.compile(r'''
url_regex_simple = re.compile(
r"""
\b
(?:https?|ftps?):
(?:/{1,3}|[a-z0-9%])
Expand All @@ -54,8 +58,11 @@
[^\x00-\x20\s`()<>{}\[\]\/'"«»“”‘’]+
)
(?:[\w\-._~%!$&'()*+,;=:/?#\[\]@\x{00001000}-\x{0010FFFF}]*[^\x00-\x20\s`!\[\]{};:'".,<>«»“”‘’])?
''', flags=re.IGNORECASE | re.VERBOSE)
url_regex_www = re.compile(r'''
""",
flags=re.IGNORECASE | re.VERBOSE,
)
url_regex_www = re.compile(
r"""
(?:
# http/ftp schemes
\b
Expand All @@ -74,9 +81,12 @@
(?::[0]*[1-9][0-9]{0,4})? # Port
[\/\\#?][\w\-._~%!$&'()*+,;=:/?#\[\]@\x{00001000}-\x{0010FFFF}]*[^\x00-\x20\s`!\[\]{};:'\".,<>«»“”‘’] # Path, etc.
)
''', flags=re.IGNORECASE | re.VERBOSE)
""",
flags=re.IGNORECASE | re.VERBOSE,
)
else:
url_regex_simple = re.compile(r'''
url_regex_simple = re.compile(
r"""
\b
(?:https?|ftps?):
(?:/{1,3}|[a-z0-9%])
Expand All @@ -86,8 +96,11 @@
[^\x00-\x20\s`()<>{}\[\]\/'"«»“”‘’]+
)
(?:[\w\-._~%!$&'()*+,;=:/?#\[\]@\U00001000-\U0010FFFF]*[^\x00-\x20\s`!\[\]{};:'".,<>«»“”‘’])?
''', flags=re.IGNORECASE | re.VERBOSE)
url_regex_www = re.compile(r'''
""",
flags=re.IGNORECASE | re.VERBOSE,
)
url_regex_www = re.compile(
r"""
(?:
# http/ftp schemes
\b
Expand All @@ -107,22 +120,27 @@
(?::[0]*[1-9][0-9]{0,4})? # Port
(?:[\/#?](?:[\w\-._~%!$&'()*+,;=:/?#\[\]@\U00001000-\U0010FFFF]*[^\x00-\x20\s`!\[\]{};:'\".,<>«»“”‘’])) # Path, etc.
)
''', flags=re.IGNORECASE | re.VERBOSE)
""",
flags=re.IGNORECASE | re.VERBOSE,
)


# Search for URLs in HTML IMG or A tags
# regex overlaps with url_regex_simple, so simple URL content that starts with "<a " or "<img " still matches.
url_regex_href = re.compile(r'''
url_regex_href = re.compile(
r"""
<(?:a[\s\/]+[^>]*?href
|img[\s\/]+[^>]*?src)
[\s\/]*=[\s\/]*
((?:[\"][^\"]+)|[\'][^\']+|[^\s>]+)
''', flags=re.IGNORECASE | re.VERBOSE)
""",
flags=re.IGNORECASE | re.VERBOSE,
)

date_regex = re.compile(r''';[ \w\s:,+\-()]+$''')
noparenthesis_regex = re.compile(r'''\([^()]*\)''')
cleanline_regex = re.compile(r'''(^[;\s]{0,}|[;\s]{0,}$)''')
date_regex = re.compile(r""";[ \w\s:,+\-()]+$""")
noparenthesis_regex = re.compile(r"""\([^()]*\)""")
cleanline_regex = re.compile(r"""(^[;\s]{0,}|[;\s]{0,}$)""")

escape_special_regex_chars = re.compile(r'''([\^$\[\]()+?.])''')
escape_special_regex_chars = re.compile(r"""([\^$\[\]()+?.])""")

window_slice_regex = re.compile(r'''\s''')
window_slice_regex = re.compile(r"""\s""")
Loading

0 comments on commit 5e1b9fa

Please sign in to comment.