From 3cc89f2eb1fad4fac8194dfb94d0b372239b9d2e Mon Sep 17 00:00:00 2001 From: jeFF0Falltrades <8444166+jeFF0Falltrades@users.noreply.github.com> Date: Sun, 1 Sep 2024 10:20:31 -0400 Subject: [PATCH 1/3] Updates RAT King Parser to commit b85abe5 --- lib/parsers_aux/ratking/__init__.py | 50 ++++--- lib/parsers_aux/ratking/utils/config_item.py | 7 +- .../decryptors/config_decryptor_aes_ecb.py | 1 + modules/processing/parsers/CAPE/AsyncRat.py | 88 +---------- modules/processing/parsers/CAPE/XWorm.py | 137 +----------------- 5 files changed, 44 insertions(+), 239 deletions(-) diff --git a/lib/parsers_aux/ratking/__init__.py b/lib/parsers_aux/ratking/__init__.py index 821b0b402d6..93359cf676f 100644 --- a/lib/parsers_aux/ratking/__init__.py +++ b/lib/parsers_aux/ratking/__init__.py @@ -5,7 +5,7 @@ # Author: jeFF0Falltrades # # Provides the primary functionality for parsing configurations from the -# AsyncRAT, DcRAT, QuasarRAT, VenomRAT, etc. RAT families +# AsyncRAT, DcRAT, QuasarRAT, VenomRAT, XWorm, XenoRAT, etc. RAT families # # MIT License # @@ -49,7 +49,9 @@ class RATConfigParser: config_item.SpecialFolderConfigItem(), config_item.EncryptedStringConfigItem(), ] - MIN_CONFIG_LEN = 7 + # Min and max number of items in a potential config section + MIN_CONFIG_LEN_FLOOR = 5 + MIN_CONFIG_LEN_CEILING = 7 PATTERN_VERIFY_HASH = rb"(?:\x7e.{3}\x04(?:\x6f.{3}\x0a){2}\x74.{3}\x01.+?\x2a.+?\x00{6,})" def __init__(self, file_data=False): @@ -73,7 +75,7 @@ def __init__(self, file_data=False): self.report["config"] = f"Exception encountered: {e}" # Decrypts/decodes values from an encrypted config - def decrypt_and_decode_config(self, encrypted_config): + def decrypt_and_decode_config(self, encrypted_config, min_config_len): decoded_config = {} selected_decryptor = 0 for item in self.CONFIG_ITEM_TYPES: @@ -101,8 +103,8 @@ def decrypt_and_decode_config(self, encrypted_config): arr_size, arr_rva = item_data[k] item_data[k] = self.dnpp.byte_array_from_size_and_rva(arr_size, arr_rva).hex() decoded_config.update(item_data) - if len(decoded_config) < self.MIN_CONFIG_LEN: - raise ConfigParserException("Minimum threshold of config items not met") + if len(decoded_config) < min_config_len: + raise ConfigParserException(f"Minimum threshold of config items not met for threshold: {len(decoded_config)}/{min_config_len}") return decoded_config # Searches for the RAT configuration in the Settings module @@ -130,17 +132,20 @@ def get_config_cctor_brute_force(self): # Get each .cctor method RVA and bytes content up to a RET op candidate_data = {rva: self.dnpp.string_from_offset(self.dnpp.offset_from_rva(rva), OPCODE_RET) for rva in candidates} config_start, decrypted_config = None, None - for method_rva, method_ins in candidate_data.items(): - logger.debug(f"Attempting brute force at .cctor method at {hex(method_rva)}") - try: - config_start, decrypted_config = ( - method_rva, - self.decrypt_and_decode_config(method_ins), - ) - break - except Exception as e: - logger.debug(e) - continue + min_config_len = self.MIN_CONFIG_LEN_CEILING + while decrypted_config is None and min_config_len >= self.MIN_CONFIG_LEN_FLOOR: + for method_rva, method_ins in candidate_data.items(): + logger.debug(f"Attempting brute force at .cctor method at {hex(method_rva)}") + try: + config_start, decrypted_config = ( + method_rva, + self.decrypt_and_decode_config(method_ins, min_config_len), + ) + break + except Exception as e: + logger.debug(e) + continue + min_config_len -= 1 if decrypted_config is None: raise ConfigParserException("No valid configuration could be parsed from any .cctor methods") return config_start, decrypted_config @@ -159,8 +164,17 @@ def get_config_verify_hash_method(self): config_start = self.dnpp.next_method_from_instruction_offset(hit.start()) # Configuration ends with ret operation, so use that as our terminator encrypted_config = self.dnpp.string_from_offset(config_start, OPCODE_RET) - decrypted_config = self.decrypt_and_decode_config(encrypted_config) - return config_start, decrypted_config + min_config_len = self.MIN_CONFIG_LEN_CEILING + while True: + try: + decrypted_config = self.decrypt_and_decode_config( + encrypted_config, min_config_len + ) + return config_start, decrypted_config + except Exception as e: + if min_config_len < self.MIN_CONFIG_LEN_FLOOR: + raise e + min_config_len -= 1 # Sorts the config by field name RVA prior to replacing RVAs with field # name strings (this is done last to preserve config ordering) diff --git a/lib/parsers_aux/ratking/utils/config_item.py b/lib/parsers_aux/ratking/utils/config_item.py index 2192f30917e..5bfbf78d73c 100644 --- a/lib/parsers_aux/ratking/utils/config_item.py +++ b/lib/parsers_aux/ratking/utils/config_item.py @@ -61,8 +61,11 @@ def parse_from(self, data): except Exception: logger.debug(f"Could not parse value from {obj} at {string_rva}") continue - fields[field_rva] = field_value - found_items += 1 + if field_rva not in fields: + fields[field_rva] = field_value + found_items += 1 + else: + logger.warning(f"Overlapping Field RVAs detected in config at {field_rva}") logger.debug(f"Parsed {found_items} {self.label} values") return fields diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py index cb0578fce86..943fb58ddfa 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py @@ -100,6 +100,7 @@ def decrypt_encrypted_strings(self): last_exc = e if result is None: logger.debug(f"Decryption failed for item {v}: {last_exc}") + result = v logger.debug(f"Key: {k}, Value: {result}") decrypted_config_strings[k] = result logger.debug("Successfully decrypted strings") diff --git a/modules/processing/parsers/CAPE/AsyncRat.py b/modules/processing/parsers/CAPE/AsyncRat.py index 040c41c084f..29c59a04fa2 100644 --- a/modules/processing/parsers/CAPE/AsyncRat.py +++ b/modules/processing/parsers/CAPE/AsyncRat.py @@ -1,87 +1,5 @@ -# based on https://github.com/c3rb3ru5d3d53c/mwcfg-modules/blob/master/asyncrat/asyncrat.py +from lib.parsers_aux.ratking import RATConfigParser -import base64 -import binascii -import re -import string -import struct -from contextlib import suppress -from Cryptodome.Cipher import AES -from Cryptodome.Protocol.KDF import PBKDF2 - - -def get_string(data, index, offset): - return data[index][offset:].decode("utf-8", "ignore") - - -def get_wide_string(data, index, offset): - return (data[index][offset:] + b"\x00").decode("utf-16") - - -def get_salt(): - return bytes.fromhex("BFEB1E56FBCD973BB219022430A57843003D5644D21E62B9D4F180E7E6C33941") - - -def decrypt(key, ciphertext): - aes_key = PBKDF2(key, get_salt(), 32, 50000) - cipher = AES.new(aes_key, AES.MODE_CBC, ciphertext[32 : 32 + 16]) - plaintext = cipher.decrypt(ciphertext[48:]).decode("ascii", "ignore").strip() - return plaintext - - -def decrypt_config_string(key, data, index): - return "".join(filter(lambda x: x in string.printable, decrypt(key, base64.b64decode(data[index][2:])))) - - -def decrypt_config_list(key, data, index): - result = decrypt_config_string(key, data, index) - if result == "null": - return [] - return result.split(",") - - -def extract_config(filebuf): - config = {} - addr = re.search(b"BSJB", filebuf).start() - if not addr: - return - - strings_offset = struct.unpack(" 1: - return filtered_bytes - return "".join(filtered_bytes) - - -def extract_config(data): - config_dict = {} - with suppress(Exception): - if data[:2] == b"MZ": - dn = dnfile.dnPE(data=data) - extracted = [] - conf = [] - - ## Mutex is used to derive AES key, so if it's not found, the extractor is useless - ## The main problem is Mutex is not found in fixed location, so this trick is used to find the Mutex - for pattern in mutexPatterns: - mutexMatched = pattern.findall(data) - if mutexMatched: - mutex = dn.net.user_strings.get(int.from_bytes(mutexMatched[0], "little")).value - AESKey = deriveAESKey(mutex) - break - else: - return - - for match in confPattern.findall(data): - er_string = dn.net.user_strings.get(int.from_bytes(match, "little")).value - extracted.append(er_string) - - for i in range(5): - with suppress(Exception): - conf.append(decryptAES(AESKey, extracted[i], AES.MODE_ECB)) - - config_dict["C2"] = conf[0] - - ## Sometimes the port is not found in configs and 'AES Key (connections)' is shifted with SPL' - if 1 <= int(conf[1]) <= 65535: - config_dict["Port"] = conf[1] - config_dict["AES Key (connections)"] = conf[2] - config_dict["SPL"] = conf[3] - else: - config_dict["Port"] = "" - config_dict["AES Key (connections)"] = conf[1] - config_dict["SPL"] = conf[2] - config_dict["AES Key (configs)"] = AESKey - config_dict["Mutex"] = mutex - - installBinMatch = installBinNamePattern.findall(data) - installDirMatch = installDirPattern.findall(data) - - if installDirMatch: - installDir = dn.net.user_strings.get(int.from_bytes(installDirMatch[0], "little")).value - config_dict["InstallDir"] = decryptAES(AESKey, installDir, AES.MODE_ECB) - if installBinMatch: - installBinName = dn.net.user_strings.get(int.from_bytes(installBinMatch[0], "little")).value - config_dict["InstallBinName"] = decryptAES(AESKey, installBinName, AES.MODE_ECB) - else: - lines = data.decode().split("\n") - if "," in lines[0]: - c2_list = lines[0].split(",") - config_dict["C2s"] = c2_list - else: - config_dict["C2"] = lines[0] - config_dict["Port"] = lines[1] - config_dict["AES Key (connections)"] = lines[2] - config_dict["SPL"] = lines[3] - config_dict["USBNM"] = lines[4] - - return config_dict +def extract_config(data: bytes): + return RATConfigParser(data).report.get("config", {}) From bdac3e0acbf524af38c57cb79f5595af8d7906f3 Mon Sep 17 00:00:00 2001 From: jeFF0Falltrades <8444166+jeFF0Falltrades@users.noreply.github.com> Date: Thu, 19 Sep 2024 23:44:34 -0400 Subject: [PATCH 2/3] Brings RAT King Parser to parity with v3.0.0 --- lib/parsers_aux/ratking/__init__.py | 243 +++++++++++------- .../{utils => }/config_parser_exception.py | 3 +- lib/parsers_aux/ratking/utils/__init__.py | 25 ++ lib/parsers_aux/ratking/utils/config_item.py | 75 +++--- lib/parsers_aux/ratking/utils/data_utils.py | 23 +- .../ratking/utils/decryptors/__init__.py | 18 +- .../utils/decryptors/config_decryptor.py | 29 ++- .../decryptors/config_decryptor_aes_cbc.py | 209 +++++++++------ .../decryptors/config_decryptor_aes_ecb.py | 83 ++++-- .../config_decryptor_decrypt_xor.py | 128 +++++++++ .../decryptors/config_decryptor_plaintext.py | 97 ++++++- .../config_decryptor_random_hardcoded.py | 108 ++++++++ .../ratking/utils/dotnet_constants.py | 9 +- .../ratking/utils/dotnetpe_payload.py | 202 +++++++++------ 14 files changed, 917 insertions(+), 335 deletions(-) rename lib/parsers_aux/ratking/{utils => }/config_parser_exception.py (93%) create mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py create mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py diff --git a/lib/parsers_aux/ratking/__init__.py b/lib/parsers_aux/ratking/__init__.py index 93359cf676f..6b724e46bbf 100644 --- a/lib/parsers_aux/ratking/__init__.py +++ b/lib/parsers_aux/ratking/__init__.py @@ -29,159 +29,220 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from logging import getLogger -from re import DOTALL, search +# from os.path import isfile +from re import DOTALL, compile, search +from typing import Any, Tuple + +# from yara import Rules + +from .config_parser_exception import ConfigParserException from .utils import config_item -from .utils.config_parser_exception import ConfigParserException -from .utils.decryptors import SUPPORTED_DECRYPTORS -from .utils.dotnet_constants import OPCODE_RET +from .utils.decryptors import ( + SUPPORTED_DECRYPTORS, + ConfigDecryptor, + IncompatibleDecryptorException, +) from .utils.dotnetpe_payload import DotNetPEPayload logger = getLogger(__name__) class RATConfigParser: - CONFIG_ITEM_TYPES = [ - config_item.BoolConfigItem(), - config_item.ByteArrayConfigItem(), - config_item.IntConfigItem(), - config_item.NullConfigItem(), - config_item.SpecialFolderConfigItem(), - config_item.EncryptedStringConfigItem(), - ] # Min and max number of items in a potential config section - MIN_CONFIG_LEN_FLOOR = 5 - MIN_CONFIG_LEN_CEILING = 7 - PATTERN_VERIFY_HASH = rb"(?:\x7e.{3}\x04(?:\x6f.{3}\x0a){2}\x74.{3}\x01.+?\x2a.+?\x00{6,})" + _MIN_CONFIG_LEN_FLOOR = 5 + _MIN_CONFIG_LEN_CEILING = 9 + + # Pattern to find the VerifyHash() method + _PATTERN_VERIFY_HASH = compile( + rb"\x7e.{3}\x04(?:\x6f.{3}\x0a){2}\x74.{3}\x01", DOTALL + ) - def __init__(self, file_data=False): - self.report = {"config": {}} + # def __init__(self, file_path: str, yara_rule: Rules = None) -> None: + def __init__(self, file_data: bytes = None) -> None: + self.report = { + "config": {}, + } try: + # Filled in _decrypt_and_decode_config() + self._incompatible_decryptors: list[int] = [] + try: + self._dnpp = DotNetPEPayload(file_data) + except Exception as e: + raise e + # self.report["sha256"] = self._dnpp.sha256 + # self.report["yara_possible_family"] = self._dnpp.yara_match - self.dnpp = DotNetPEPayload(file_data) - # self.report["sha256"] = self.dnpp.sha256 - # self.report["possible_yara_family"] = self.dnpp.yara_match - if self.dnpp.dotnetpe is None: - raise ConfigParserException("Failed to load file as .NET executable") - self.decryptor = None # Created in decrypt_and_decode_config() - self.report["config"] = self.get_config() - self.report["config"]["aes_key"] = ( - self.decryptor.key.hex() if self.decryptor is not None and self.decryptor.key is not None else "None" + # Assigned in _decrypt_and_decode_config() + self._decryptor: ConfigDecryptor = None + self.report["config"] = self._get_config() + self.report["key"] = ( + self._decryptor.key.hex() + if self._decryptor is not None and self._decryptor.key is not None + else "None" ) - self.report["config"]["aes_salt"] = ( - self.decryptor.salt.hex() if self.decryptor is not None and self.decryptor.salt is not None else "None" + self.report["salt"] = ( + self._decryptor.salt.hex() + if self._decryptor is not None and self._decryptor.salt is not None + else "None" ) except Exception as e: + # self.report["config"] = f"Exception encountered for {file_path}: {e}" self.report["config"] = f"Exception encountered: {e}" - # Decrypts/decodes values from an encrypted config - def decrypt_and_decode_config(self, encrypted_config, min_config_len): + # Decrypts/decodes values from an encrypted config and returns the + # decrypted/decoded config + def _decrypt_and_decode_config( + self, encrypted_config: bytes, min_config_len: int + ) -> dict[str, Any]: decoded_config = {} - selected_decryptor = 0 - for item in self.CONFIG_ITEM_TYPES: - item_data = item.parse_from(encrypted_config) + + for item_class in config_item.SUPPORTED_CONFIG_ITEMS: + item = item_class() + # Translate config Field RVAs to Field names + item_data = { + self._dnpp.field_name_from_rva(k): v + for k, v in item.parse_from(encrypted_config).items() + } + if len(item_data) > 0: if type(item) is config_item.EncryptedStringConfigItem: - # Translate encrypted string RVAs to encrypted values + # Translate config value RVAs to string values for k in item_data: - item_data[k] = self.dnpp.user_string_from_rva(item_data[k]) - # Decrypt the values - while selected_decryptor < len(SUPPORTED_DECRYPTORS): + item_data[k] = self._dnpp.user_string_from_rva(item_data[k]) + + # Attempt to decrypt encrypted values + for decryptor in SUPPORTED_DECRYPTORS: + if decryptor in self._incompatible_decryptors: + continue + + if self._decryptor is None: + # Try to instantiate the selected decryptor + # Add to incompatible list and move on upon failure + try: + self._decryptor = decryptor(self._dnpp) + except IncompatibleDecryptorException as ide: + logger.debug( + f"Decryptor incompatible {decryptor} : {ide}" + ) + self._incompatible_decryptors.append(decryptor) + continue try: - if self.decryptor is None: - self.decryptor = SUPPORTED_DECRYPTORS[selected_decryptor](self.dnpp, item_data) - item_data = self.decryptor.decrypt_encrypted_strings() + # Try to decrypt the encrypted strings + # Continue to next compatible decryptor on failure + item_data = self._decryptor.decrypt_encrypted_strings( + item_data + ) break except Exception as e: logger.debug( - f"Decryption failed with decryptor {SUPPORTED_DECRYPTORS[selected_decryptor]} : {e}, trying next decryptor..." + f"Decryption failed with decryptor {decryptor} : {e}" ) - self.decryptor = None - selected_decryptor += 1 + self._decryptor = None + + if self._decryptor is None: + raise ConfigParserException("All decryptors failed") + elif type(item) is config_item.ByteArrayConfigItem: for k in item_data: arr_size, arr_rva = item_data[k] - item_data[k] = self.dnpp.byte_array_from_size_and_rva(arr_size, arr_rva).hex() + item_data[k] = self._dnpp.byte_array_from_size_and_rva( + arr_size, arr_rva + ).hex() + decoded_config.update(item_data) + if len(decoded_config) < min_config_len: - raise ConfigParserException(f"Minimum threshold of config items not met for threshold: {len(decoded_config)}/{min_config_len}") + raise ConfigParserException( + f"Minimum threshold of config items not met: {len(decoded_config)}/{min_config_len}" + ) return decoded_config - # Searches for the RAT configuration in the Settings module - def get_config(self): + # Searches for the RAT configuration section, using the VerifyHash() marker + # or brute-force, returning the decrypted config on success + def _get_config(self) -> dict[str, Any]: logger.debug("Extracting config...") try: - config_start, decrypted_config = self.get_config_verify_hash_method() + config_start, decrypted_config = self._get_config_verify_hash_method() except Exception: logger.debug("VerifyHash() method failed; Attempting .cctor brute force...") - # If the typical patterns are not found, start brute-forcing + # If the VerifyHash() method does not work, move to brute-forcing + # static constructors try: - config_start, decrypted_config = self.get_config_cctor_brute_force() + config_start, decrypted_config = self._get_config_cctor_brute_force() except Exception as e: - raise ConfigParserException("Could not identify config") from e - logger.debug(f"Config found at offset {hex(config_start)}...") - return self.translate_config_field_names(decrypted_config) + raise ConfigParserException(f"Could not identify config: {e}") + logger.debug(f"Config found at RVA {hex(config_start)}...") + return decrypted_config # Attempts to retrieve the config via brute-force, looking through every # static constructor (.cctor) and attempting to decode/decrypt a valid - # config from that constructor - def get_config_cctor_brute_force(self): - candidates = self.dnpp.method_rvas_from_name(".cctor") + # config from that constructor, returning the config RVA and decrypted + # config on success + def _get_config_cctor_brute_force(self) -> Tuple[int, dict[str, Any]]: + candidates = self._dnpp.methods_from_name(".cctor") if len(candidates) == 0: raise ConfigParserException("No .cctor method could be found") - # Get each .cctor method RVA and bytes content up to a RET op - candidate_data = {rva: self.dnpp.string_from_offset(self.dnpp.offset_from_rva(rva), OPCODE_RET) for rva in candidates} + + # For each .cctor method, map its RVA and body (in raw bytes) + candidate_cctor_data = { + method.rva: self._dnpp.method_body_from_method(method) + for method in candidates + } + config_start, decrypted_config = None, None - min_config_len = self.MIN_CONFIG_LEN_CEILING - while decrypted_config is None and min_config_len >= self.MIN_CONFIG_LEN_FLOOR: - for method_rva, method_ins in candidate_data.items(): - logger.debug(f"Attempting brute force at .cctor method at {hex(method_rva)}") + # Start at our ceiling value for number of config items + min_config_len = self._MIN_CONFIG_LEN_CEILING + + while decrypted_config is None and min_config_len >= self._MIN_CONFIG_LEN_FLOOR: + for method_rva, method_body in candidate_cctor_data.items(): + logger.debug( + f"Attempting brute force at .cctor method at {hex(method_rva)}" + ) try: config_start, decrypted_config = ( method_rva, - self.decrypt_and_decode_config(method_ins, min_config_len), + self._decrypt_and_decode_config(method_body, min_config_len), ) break except Exception as e: - logger.debug(e) + logger.debug( + f"Brute force failed for method at {hex(method_rva)}: {e}" + ) continue + # Reduce the minimum config length until we reach our floor min_config_len -= 1 + if decrypted_config is None: - raise ConfigParserException("No valid configuration could be parsed from any .cctor methods") + raise ConfigParserException( + "No valid configuration could be parsed from any .cctor methods" + ) return config_start, decrypted_config # Attempts to retrieve the config via looking for a config section preceded - # by the "VerifyHash()" function that is typically found in the Settings - # module - def get_config_verify_hash_method(self): + # by the VerifyHash() method typically found in a Settings module, + # returning the config RVA and decrypted config on success + def _get_config_verify_hash_method(self) -> Tuple[int, dict[str, Any]]: # Identify the VerifyHash() Method code - hit = search(self.PATTERN_VERIFY_HASH, self.dnpp.data, DOTALL) - if hit is None: - raise ConfigParserException("Could not identify VerifyHash() marker method") - # Reverse the VerifyHash() instruction offset, look up VerifyHash() in - # the MethodDef metadata table, and then get the offset to the - # subsequent function, which should be our config constructor - config_start = self.dnpp.next_method_from_instruction_offset(hit.start()) - # Configuration ends with ret operation, so use that as our terminator - encrypted_config = self.dnpp.string_from_offset(config_start, OPCODE_RET) - min_config_len = self.MIN_CONFIG_LEN_CEILING + verify_hash_hit = search(self._PATTERN_VERIFY_HASH, self._dnpp.data) + if verify_hash_hit is None: + raise ConfigParserException("Could not identify VerifyHash() marker") + + # Reverse the hit to find the VerifyHash() method, then grab the + # subsequent function + config_method = self._dnpp.method_from_instruction_offset( + verify_hash_hit.start(), 1 + ) + encrypted_config = self._dnpp.method_body_from_method(config_method) + min_config_len = self._MIN_CONFIG_LEN_CEILING while True: try: - decrypted_config = self.decrypt_and_decode_config( + decrypted_config = self._decrypt_and_decode_config( encrypted_config, min_config_len ) - return config_start, decrypted_config + return config_method.rva, decrypted_config except Exception as e: - if min_config_len < self.MIN_CONFIG_LEN_FLOOR: + # Reduce the minimum config length until we reach our floor + if min_config_len < self._MIN_CONFIG_LEN_FLOOR: raise e min_config_len -= 1 - - # Sorts the config by field name RVA prior to replacing RVAs with field - # name strings (this is done last to preserve config ordering) - def translate_config_field_names(self, decrypted_config): - translated_config = {} - for field_rva, field_value in sorted(decrypted_config.items()): - key = self.dnpp.field_name_from_rva(field_rva) - translated_config[key] = field_value - logger.debug(f"Config item parsed {key}: {field_value}") - return translated_config diff --git a/lib/parsers_aux/ratking/utils/config_parser_exception.py b/lib/parsers_aux/ratking/config_parser_exception.py similarity index 93% rename from lib/parsers_aux/ratking/utils/config_parser_exception.py rename to lib/parsers_aux/ratking/config_parser_exception.py index c1d84e341b1..2b8c1b06282 100644 --- a/lib/parsers_aux/ratking/utils/config_parser_exception.py +++ b/lib/parsers_aux/ratking/config_parser_exception.py @@ -4,8 +4,7 @@ # # Author: jeFF0Falltrades # -# Provides a simple custom Exception class for use with configuration parsing -# actions +# A simple custom Exception class for use with configuration parsing actions # # MIT License # diff --git a/lib/parsers_aux/ratking/utils/__init__.py b/lib/parsers_aux/ratking/utils/__init__.py index e69de29bb2d..716cb99880a 100644 --- a/lib/parsers_aux/ratking/utils/__init__.py +++ b/lib/parsers_aux/ratking/utils/__init__.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# +# __init__.py +# +# Author: jeFF0Falltrades +# +# Copyright (c) 2024 Jeff Archer +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/lib/parsers_aux/ratking/utils/config_item.py b/lib/parsers_aux/ratking/utils/config_item.py index 5bfbf78d73c..cba36ed272c 100644 --- a/lib/parsers_aux/ratking/utils/config_item.py +++ b/lib/parsers_aux/ratking/utils/config_item.py @@ -28,8 +28,10 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from abc import ABC, abstractmethod from logging import getLogger -from re import DOTALL, findall +from re import DOTALL, compile, findall +from typing import Any, Tuple from .data_utils import bytes_to_int from .dotnet_constants import OPCODE_LDC_I4_0, SpecialFolder @@ -38,68 +40,71 @@ # Provides an abstract class for config items -class ConfigItem: - def __init__(self, label, pattern): - self.label = label - self.pattern = pattern +class ConfigItem(ABC): + def __init__(self, label: str, pattern: bytes) -> None: + self._label = label + self._pattern = compile(pattern, flags=DOTALL) # Should be overridden by children to provide a meaningful value - def derive_item_value(self): - return None + @abstractmethod + def _derive_item_value(self) -> Any: + pass - # Derives config field RVAs and values from data using the specified + # Derives config Field RVAs and values from data using the specified # ConfigItem's pattern - def parse_from(self, data): - logger.debug(f"Parsing {self.label} values from data...") + def parse_from(self, data: bytes) -> dict[int, Any]: + logger.debug(f"Parsing {self._label} values from data...") fields = {} - raw_data = findall(self.pattern, data, DOTALL) + raw_data = findall(self._pattern, data) found_items = 0 - for obj, string_rva in raw_data: + for obj, bytes_rva in raw_data: try: - field_value = self.derive_item_value(obj) - field_rva = bytes_to_int(string_rva) + field_value = self._derive_item_value(obj) + field_rva = bytes_to_int(bytes_rva) except Exception: - logger.debug(f"Could not parse value from {obj} at {string_rva}") + logger.debug(f"Could not parse value from {obj} at {hex(bytes_rva)}") continue if field_rva not in fields: fields[field_rva] = field_value found_items += 1 else: - logger.warning(f"Overlapping Field RVAs detected in config at {field_rva}") - logger.debug(f"Parsed {found_items} {self.label} values") + logger.debug( + f"Overlapping Field RVAs detected in config at {hex(field_rva)}" + ) + logger.debug(f"Parsed {found_items} {self._label} values") return fields class BoolConfigItem(ConfigItem): - def __init__(self): + def __init__(self) -> None: super().__init__("boolean", b"(\x16|\x17)\x80(.{3}\x04)") # Boolean values are derived by examing if the opcode is "ldc.i4.0" (False) # or "ldc.i4.1" (True) - def derive_item_value(self, opcode): + def _derive_item_value(self, opcode: bytes) -> bool: return bool(bytes_to_int(opcode) - bytes_to_int(OPCODE_LDC_I4_0)) class ByteArrayConfigItem(ConfigItem): - def __init__(self): + def __init__(self) -> None: super().__init__( "byte array", rb"\x1f(.\x8d.{3}\x01\x25\xd0.{3}\x04)\x28.{3}\x0a\x80(.{3}\x04)", ) - # Byte array size and RVA is returned, as these are needed to + # Byte array size and RVA are returned, as these are needed to # extract the value of the bytes from the payload - def derive_item_value(self, byte_data): + def _derive_item_value(self, byte_data: bytes) -> Tuple[int, int]: arr_size = byte_data[0] arr_rva = bytes_to_int(byte_data[-4:]) return (arr_size, arr_rva) class IntConfigItem(ConfigItem): - def __init__(self): + def __init__(self) -> None: super().__init__("int", b"(\x20.{4}|[\x18-\x1e])\x80(.{3}\x04)") - def derive_item_value(self, int_bytes): + def _derive_item_value(self, int_bytes: bytes) -> int: # If single byte, must be value 2-8, represented by opcodes 0x18-0x1e # Subtract 0x16 to get the int value, e.g.: # ldc.i4.8 == 0x1e - 0x16 == 8 @@ -110,27 +115,37 @@ def derive_item_value(self, int_bytes): class NullConfigItem(ConfigItem): - def __init__(self): + def __init__(self) -> None: super().__init__("null", b"(\x14\x80)(.{3}\x04)") # If "ldnull" is being used, simply return "null" - def derive_item_value(self, _): + def _derive_item_value(self, _: bytes) -> str: return "null" class SpecialFolderConfigItem(ConfigItem): - def __init__(self): + def __init__(self) -> None: super().__init__("special folder", b"\x1f(.)\x80(.{3}\x04)") # Translates SpecialFolder ID to name - def derive_item_value(self, folder_id): + def _derive_item_value(self, folder_id: bytes) -> str: return SpecialFolder(bytes_to_int(folder_id)).name class EncryptedStringConfigItem(ConfigItem): - def __init__(self): + def __init__(self) -> None: super().__init__("encrypted string", b"\x72(.{3}\x70)\x80(.{3}\x04)") # Returns the encrypted string's RVA - def derive_item_value(self, enc_str_rva): + def _derive_item_value(self, enc_str_rva: bytes) -> int: return bytes_to_int(enc_str_rva) + + +SUPPORTED_CONFIG_ITEMS = [ + BoolConfigItem, + ByteArrayConfigItem, + IntConfigItem, + NullConfigItem, + SpecialFolderConfigItem, + EncryptedStringConfigItem, +] diff --git a/lib/parsers_aux/ratking/utils/data_utils.py b/lib/parsers_aux/ratking/utils/data_utils.py index 6e0ea6c8723..34d96ce3964 100644 --- a/lib/parsers_aux/ratking/utils/data_utils.py +++ b/lib/parsers_aux/ratking/utils/data_utils.py @@ -27,20 +27,20 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from .config_parser_exception import ConfigParserException +from ..config_parser_exception import ConfigParserException # Converts a bytes object to an int object using the specified byte order -def bytes_to_int(bytes, order="little"): +def bytes_to_int(bytes: bytes, order: str = "little") -> int: try: return int.from_bytes(bytes, byteorder=order) - except Exception as e: - raise ConfigParserException(f"Error parsing int from value: {bytes}") from e + except Exception: + raise ConfigParserException(f"Error parsing int from value: {bytes}") # Decodes a bytes object to a Unicode string, using UTF-16LE for byte values # with null bytes still embedded in them, and UTF-8 for all other values -def decode_bytes(byte_str): +def decode_bytes(byte_str: bytes | str) -> str: if isinstance(byte_str, str): return byte_str.strip() result = None @@ -49,13 +49,16 @@ def decode_bytes(byte_str): result = byte_str.decode("utf-16le") else: result = byte_str.decode("utf-8") - except Exception as e: - raise ConfigParserException(f"Error decoding bytes object to Unicode: {byte_str}") from e + except Exception: + raise ConfigParserException( + f"Error decoding bytes object to Unicode: {byte_str}" + ) return result -def int_to_bytes(int, length=4, order="little"): +# Converts an int to a bytes object, with the specified length and order +def int_to_bytes(int: int, length: int = 4, order: str = "little") -> bytes: try: return int.to_bytes(length, order) - except Exception as e: - raise ConfigParserException(f"Error parsing bytes from value: {int}") from e + except Exception: + raise ConfigParserException(f"Error parsing bytes from value: {int}") diff --git a/lib/parsers_aux/ratking/utils/decryptors/__init__.py b/lib/parsers_aux/ratking/utils/decryptors/__init__.py index 9a9176ae343..a340a598f31 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/__init__.py +++ b/lib/parsers_aux/ratking/utils/decryptors/__init__.py @@ -4,8 +4,6 @@ # # Author: jeFF0Falltrades # -# MIT License -# # Copyright (c) 2024 Jeff Archer # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -25,12 +23,28 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException from .config_decryptor_aes_cbc import ConfigDecryptorAESCBC from .config_decryptor_aes_ecb import ConfigDecryptorAESECB +from .config_decryptor_decrypt_xor import ConfigDecryptorDecryptXOR from .config_decryptor_plaintext import ConfigDecryptorPlaintext +from .config_decryptor_random_hardcoded import ConfigDecryptorRandomHardcoded + +__all__ = [ + ConfigDecryptor, + IncompatibleDecryptorException, + ConfigDecryptorAESCBC, + ConfigDecryptorAESECB, + ConfigDecryptorDecryptXOR, + ConfigDecryptorRandomHardcoded, + ConfigDecryptorPlaintext, +] +# ConfigDecryptorPlaintext should always be the last fallthrough case SUPPORTED_DECRYPTORS = [ ConfigDecryptorAESCBC, ConfigDecryptorAESECB, + ConfigDecryptorDecryptXOR, + ConfigDecryptorRandomHardcoded, ConfigDecryptorPlaintext, ] diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py index 9df3620f373..16084742d56 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py @@ -28,19 +28,28 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from abc import ABC, abstractmethod +from logging import getLogger +from ..dotnetpe_payload import DotNetPEPayload -class ConfigDecryptor(ABC): - def __init__(self, payload, config_strings): - self.payload = payload - self.config_strings = config_strings - self.key = None - self.salt = None +logger = getLogger(__name__) - @abstractmethod - def decrypt(self, ciphertext): - pass +# Custom Exception to denote that a decryptor is incompatible with a payload +class IncompatibleDecryptorException(Exception): + pass + + +class ConfigDecryptor(ABC): + def __init__(self, payload: DotNetPEPayload) -> None: + self.key: bytes | str = None + self._payload = payload + self.salt: bytes = None + + # Abstract method to take in a map representing a configuration of config + # Field names and values and return a decoded/decrypted configuration @abstractmethod - def decrypt_encrypted_strings(self): + def decrypt_encrypted_strings( + self, encrypted_strings: dict[str, str] + ) -> dict[str, list[str] | str]: pass diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py index fdc2a1bf5a7..8389886bcfc 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 # -# config_aes_decryptor.py +# config_decryptor_aes_cbc.py # # Author: jeFF0Falltrades # -# Provides a custom AES decryptor for RAT payloads utilizing the known -# encryption patterns of AsyncRAT, DcRAT, QuasarRAT, VenomRAT, etc. +# Provides a custom AES decryptor for RAT payloads utilizing CBC mode +# +# Example Hash: 6b99acfa5961591c39b3f889cf29970c1dd48ddb0e274f14317940cf279a4412 # # MIT License # @@ -30,7 +31,8 @@ # SOFTWARE. from base64 import b64decode from logging import getLogger -from re import DOTALL, search +from re import DOTALL, compile, search +from typing import Tuple from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher @@ -40,51 +42,68 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.padding import PKCS7 -from ..config_parser_exception import ConfigParserException +from ...config_parser_exception import ConfigParserException from ..data_utils import bytes_to_int, decode_bytes, int_to_bytes from ..dotnet_constants import OPCODE_LDSTR, OPCODE_LDTOKEN -from .config_decryptor import ConfigDecryptor +from ..dotnetpe_payload import DotNetPEPayload +from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException logger = getLogger(__name__) -MIN_CIPHERTEXT_LEN = 48 - class ConfigDecryptorAESCBC(ConfigDecryptor): - PATTERN_AES_KEY_AND_BLOCK_SIZE = b"[\x06-\x09]\x20(.{4})\x6f.{4}[\x06-\x09]\x20(.{4})" - PATTERN_AES_KEY_BASE = b"(.{3}\x04).%b" - PATTERN_AES_SALT_ITER = b"[\x02-\x05]\x7e(.{4})\x20(.{4})\x73" - PATTERN_AES_SALT_INIT = b"\x80%b\x2a" + # Minimum length of valid ciphertext + _MIN_CIPHERTEXT_LEN = 48 + + # Patterns for identifying AES metadata + _PATTERN_AES_KEY_AND_BLOCK_SIZE = compile( + b"[\x06-\x09]\x20(.{4})\x6f.{4}[\x06-\x09]\x20(.{4})", DOTALL + ) + # Do not compile in-line replacement patterns + _PATTERN_AES_KEY_BASE = b"(.{3}\x04).%b" + _PATTERN_AES_SALT_INIT = b"\x80%b\x2a" + _PATTERN_AES_SALT_ITER = compile(b"[\x02-\x05]\x7e(.{4})\x20(.{4})\x73", DOTALL) - def __init__(self, payload, config_strings): - super().__init__(payload, config_strings) - self.key_size = self.block_size = self.iterations = self.key_candidates = None - self.aes_metadata = self.get_aes_metadata() + def __init__(self, payload: DotNetPEPayload) -> None: + super().__init__(payload) + self._block_size: int = None + self._iterations: int = None + self._key_candidates: list[bytes] = None + self._key_size: int = None + self._key_rva: int = None + try: + self._get_aes_metadata() + except Exception as e: + raise IncompatibleDecryptorException(e) # Given an initialization vector and ciphertext, creates a Cipher # object with the AES key and specified IV and decrypts the ciphertext - def decrypt(self, iv, ciphertext): - logger.debug(f"Decrypting {ciphertext} with key {self.key.hex()} and IV {iv.hex()}...") + def _decrypt(self, iv: bytes, ciphertext: bytes) -> bytes: + logger.debug( + f"Decrypting {ciphertext} with key {self.key.hex()} and IV {iv.hex()}..." + ) aes_cipher = Cipher(AES(self.key), CBC(iv), backend=default_backend()) decryptor = aes_cipher.decryptor() # Use a PKCS7 unpadder to remove padding from decrypted value # https://cryptography.io/en/latest/hazmat/primitives/padding/ - unpadder = PKCS7(self.block_size).unpadder() + unpadder = PKCS7(self._block_size).unpadder() + try: padded_text = decryptor.update(ciphertext) + decryptor.finalize() unpadded_text = unpadder.update(padded_text) + unpadder.finalize() except Exception as e: raise ConfigParserException( - f"Error decrypting ciphertext {ciphertext} with IV {iv.hex()} and key {self.key.hex()}" - ) from e + f"Error decrypting ciphertext {ciphertext} with IV {iv.hex()} and key {self.key.hex()} : {e}" + ) + logger.debug(f"Decryption result: {unpadded_text}") return unpadded_text # Derives AES passphrase candidates from a config + # # If a passphrase is base64-encoded, both its raw value and decoded value # will be added as candidates - def derive_aes_passphrase_candidates(self, aes_key_rva): - key_val = self.config_strings[aes_key_rva] + def _derive_aes_passphrase_candidates(self, key_val: str) -> list[bytes]: passphrase_candidates = [key_val.encode()] try: passphrase_candidates.append(b64decode(key_val)) @@ -94,15 +113,21 @@ def derive_aes_passphrase_candidates(self, aes_key_rva): return passphrase_candidates # Decrypts encrypted config values with the provided cipher data - def decrypt_encrypted_strings(self): + def decrypt_encrypted_strings( + self, encrypted_strings: dict[str, str] + ) -> dict[str, str]: logger.debug("Decrypting encrypted strings...") + if self._key_candidates is None: + self._key_candidates = self._get_aes_key_candidates(encrypted_strings) + decrypted_config_strings = {} - for k, v in self.config_strings.items(): + for k, v in encrypted_strings.items(): # Leave empty strings as they are if len(v) == 0: logger.debug(f"Key: {k}, Value: {v}") decrypted_config_strings[k] = v continue + # Check if base64-encoded string b64_exception = False try: @@ -111,10 +136,11 @@ def decrypt_encrypted_strings(self): b64_exception = True # If it was not base64-encoded, or if it is less than our min length # for ciphertext, leave the value as it is - if b64_exception or len(decoded_val) < MIN_CIPHERTEXT_LEN: + if b64_exception or len(decoded_val) < self._MIN_CIPHERTEXT_LEN: logger.debug(f"Key: {k}, Value: {v}") decrypted_config_strings[k] = v continue + # Otherwise, extract the IV from the 16 bytes after the HMAC # (first 32 bytes) and the ciphertext from the rest of the data # after the IV, and run the decryption @@ -122,44 +148,36 @@ def decrypt_encrypted_strings(self): result, last_exc = None, None key_idx = 0 # Run through key candidates until suitable one found or failure - while result is None and key_idx < len(self.key_candidates): + while result is None and key_idx < len(self._key_candidates): try: - self.key = self.key_candidates[key_idx] + self.key = self._key_candidates[key_idx] key_idx += 1 - result = decode_bytes(self.decrypt(iv, ciphertext)) + result = decode_bytes(self._decrypt(iv, ciphertext)) except ConfigParserException as e: last_exc = e + if result is None: - logger.debug(f"Decryption failed for item {v}: {last_exc}; Leaving as original value...") + logger.debug( + f"Decryption failed for item {v}: {last_exc}; Leaving as original value..." + ) result = v + logger.debug(f"Key: {k}, Value: {result}") decrypted_config_strings[k] = result + logger.debug("Successfully decrypted strings") return decrypted_config_strings # Extracts AES key candidates from the payload - def get_aes_key_candidates(self, metadata_ins_offset): - logger.debug("Extracting possible AES key values...") + def _get_aes_key_candidates(self, encrypted_strings: dict[str, str]) -> list[bytes]: + logger.debug("Extracting AES key candidates...") keys = [] - # Get the RVA of the method that sets up AES256 metadata - metadata_method_rva = self.payload.next_method_from_instruction_offset(metadata_ins_offset, step_back=1, by_token=True) - - # Insert this RVA into the KEY_BASE pattern to find where the AES key - # is initialized - key_hit = search( - self.PATTERN_AES_KEY_BASE % int_to_bytes(metadata_method_rva), - self.payload.data, - DOTALL, - ) - if key_hit is None: - raise ConfigParserException("Could not find AES key pattern") - key_rva = bytes_to_int(key_hit.groups()[0]) - logger.debug(f"AES key RVA: {hex(key_rva)}") - - # Since we already have a map of all field names, use the key field - # name to index into our existing config dict - passphrase_candidates = self.derive_aes_passphrase_candidates(key_rva) + # Use the key Field name to index into our existing config + key_raw_value = encrypted_strings[ + self._payload.field_name_from_rva(self._key_rva) + ] + passphrase_candidates = self._derive_aes_passphrase_candidates(key_raw_value) for candidate in passphrase_candidates: try: @@ -167,83 +185,113 @@ def get_aes_key_candidates(self, metadata_ins_offset): # cryptography library, but we keep it here for compatibility kdf = PBKDF2HMAC( SHA1(), - length=self.key_size, + length=self._key_size, salt=self.salt, - iterations=self.iterations, + iterations=self._iterations, backend=default_backend(), ) keys.append(kdf.derive(candidate)) logger.debug(f"AES key derived: {keys[-1]}") except Exception: continue + if len(keys) == 0: - raise ConfigParserException(f"Could not derive key from passphrase candidates: {passphrase_candidates}") + raise ConfigParserException( + f"Could not derive key from passphrase candidates: {passphrase_candidates}" + ) return keys # Extracts the AES key and block size from the payload - def get_aes_key_and_block_size(self): + def _get_aes_key_and_block_size(self) -> Tuple[int, int]: logger.debug("Extracting AES key and block size...") - hit = search(self.PATTERN_AES_KEY_AND_BLOCK_SIZE, self.payload.data, DOTALL) + hit = search(self._PATTERN_AES_KEY_AND_BLOCK_SIZE, self._payload.data) if hit is None: raise ConfigParserException("Could not extract AES key or block size") + # Convert key size from bits to bytes by dividing by 8 # Note use of // instead of / to ensure integer output, not float key_size = bytes_to_int(hit.groups()[0]) // 8 block_size = bytes_to_int(hit.groups()[1]) + logger.debug(f"Found key size {key_size} and block size {block_size}") return key_size, block_size - # Identifies the initialization of the AES256 object in the payload - def get_aes_metadata(self): + # Given an offset to an instruction within the Method that sets up the + # Cipher, extracts the AES key RVA from the payload + def _get_aes_key_rva(self, metadata_ins_offset: int) -> int: + logger.debug("Extracting AES key RVA...") + + # Get the RVA of the method that sets up AES256 metadata + metadata_method_token = self._payload.method_from_instruction_offset( + metadata_ins_offset, by_token=True + ).token + + # Insert this RVA into the KEY_BASE pattern to find where the AES key + # is initialized + key_hit = search( + self._PATTERN_AES_KEY_BASE % int_to_bytes(metadata_method_token), + self._payload.data, + DOTALL, + ) + if key_hit is None: + raise ConfigParserException("Could not find AES key pattern") + + key_rva = bytes_to_int(key_hit.groups()[0]) + logger.debug(f"AES key RVA: {hex(key_rva)}") + return key_rva + + # Identifies the initialization of the AES256 object in the payload and + # sets the necessary values needed for decryption + def _get_aes_metadata(self) -> None: logger.debug("Extracting AES metadata...") - # Important to use DOTALL here (and with all regex ops to be safe) - # as we are working with bytes, and if we do not set this, and the - # byte sequence contains a byte that equates to a newline (\n or 0x0A), - # the search will fail - metadata = search(self.PATTERN_AES_SALT_ITER, self.payload.data, DOTALL) + metadata = search(self._PATTERN_AES_SALT_ITER, self._payload.data) if metadata is None: raise ConfigParserException("Could not identify AES metadata") logger.debug(f"AES metadata found at offset {hex(metadata.start())}") - self.key_size, self.block_size = self.get_aes_key_and_block_size() + self._key_size, self._block_size = self._get_aes_key_and_block_size() logger.debug("Extracting AES iterations...") - self.iterations = bytes_to_int(metadata.groups()[1]) - logger.debug(f"Found AES iteration number of {self.iterations}") + self._iterations = bytes_to_int(metadata.groups()[1]) + logger.debug(f"Found AES iteration number of {self._iterations}") - self.salt = self.get_aes_salt(metadata.groups()[0]) - self.key_candidates = self.get_aes_key_candidates(metadata.start()) - return metadata + self.salt = self._get_aes_salt(metadata.groups()[0]) + self._key_rva = self._get_aes_key_rva(metadata.start()) # Extracts the AES salt from the payload, accounting for both hardcoded # salt byte arrays, and salts derived from hardcoded strings - def get_aes_salt(self, salt_rva): + def _get_aes_salt(self, salt_rva: int) -> bytes: logger.debug("Extracting AES salt value...") + # Use % to insert our salt RVA into our match pattern # This pattern will then find the salt initialization ops, # specifically: # # stsfld uint8[] Client.Algorithm.Aes256::Salt # ret - aes_salt_initialization = self.payload.data.find(self.PATTERN_AES_SALT_INIT % salt_rva) + aes_salt_initialization = self._payload.data.find( + self._PATTERN_AES_SALT_INIT % salt_rva + ) if aes_salt_initialization == -1: raise ConfigParserException("Could not identify AES salt initialization") - # Look at opcode used to initialize the salt to decide how to - # proceed on extracting the salt value (start of pattern - 10 bytes) + # Look at the opcode used to initialize the salt to decide how to + # proceed with extracting the salt value (start of pattern - 10 bytes) salt_op_offset = aes_salt_initialization - 10 # Need to use bytes([int]) here to properly convert from int to byte # string for our comparison below - salt_op = bytes([self.payload.data[salt_op_offset]]) + salt_op = bytes([self._payload.data[salt_op_offset]]) # Get the salt RVA from the 4 bytes following the initialization op - salt_strings_rva_packed = self.payload.data[salt_op_offset + 1 : salt_op_offset + 5] + salt_strings_rva_packed = self._payload.data[ + salt_op_offset + 1 : salt_op_offset + 5 + ] salt_strings_rva = bytes_to_int(salt_strings_rva_packed) - # If the op is a ldstr op (0x72), just get the bytes value of the - # string being used to initialize the salt + # If the op is a ldstr op, just get the bytes value of the string being + # used to initialize the salt if salt_op == OPCODE_LDSTR: - salt_encoded = self.payload.user_string_from_rva(salt_strings_rva) + salt_encoded = self._payload.user_string_from_rva(salt_strings_rva) # We use decode_bytes() here to get the salt string without any # null bytes (because it's stored as UTF-16LE), then convert it # back to bytes @@ -251,9 +299,12 @@ def get_aes_salt(self, salt_rva): # If the op is a ldtoken (0xd0) operation, we need to get the salt # byte array value from the FieldRVA table elif salt_op == OPCODE_LDTOKEN: - salt_size = self.payload.data[salt_op_offset - 7] - salt = self.payload.byte_array_from_size_and_rva(salt_size, salt_strings_rva) + salt_size = self._payload.data[salt_op_offset - 7] + salt = self._payload.byte_array_from_size_and_rva( + salt_size, salt_strings_rva + ) else: raise ConfigParserException(f"Unknown salt opcode found: {salt_op.hex()}") + logger.debug(f"Found salt value: {salt.hex()}") return salt diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py index 943fb58ddfa..75e54f3f4c6 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py @@ -6,6 +6,8 @@ # # Provides a custom AES decryptor for RAT payloads utilizing ECB mode # +# Example Hash: d5028e10a756f2df677f32ebde105d7de8df37e253c431837c8f810260f4428e +# # MIT License # # Copyright (c) 2024 Jeff Archer @@ -30,7 +32,7 @@ from base64 import b64decode from hashlib import md5 from logging import getLogger -from re import DOTALL, search +from re import DOTALL, compile, search from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher @@ -38,24 +40,28 @@ from cryptography.hazmat.primitives.ciphers.modes import ECB from cryptography.hazmat.primitives.padding import PKCS7 -from ..config_parser_exception import ConfigParserException +from ...config_parser_exception import ConfigParserException from ..data_utils import bytes_to_int, decode_bytes -from .config_decryptor import ConfigDecryptor +from ..dotnetpe_payload import DotNetPEPayload +from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException logger = getLogger(__name__) class ConfigDecryptorAESECB(ConfigDecryptor): - PATTERN_MD5_HASH = rb"\x7e(.{3}\x04)\x28.{3}\x06\x6f" + # MD5 hash pattern used to detect AES key + _PATTERN_MD5_HASH = compile(rb"\x7e(.{3}\x04)\x28.{3}\x06\x6f", DOTALL) - def __init__(self, payload, config_strings): - super().__init__(payload, config_strings) + def __init__(self, payload: DotNetPEPayload) -> None: + super().__init__(payload) + try: + self._aes_key_rva = self._get_aes_key_rva() + except Exception as e: + raise IncompatibleDecryptorException(e) # Given ciphertext, creates a Cipher object with the AES key and decrypts # the ciphertext - def decrypt(self, ciphertext): - if self.key is None: - self.get_aes_key() + def _decrypt(self, ciphertext: bytes) -> bytes: logger.debug(f"Decrypting {ciphertext} with key {self.key.hex()}...") aes_cipher = Cipher(AES(self.key), ECB(), backend=default_backend()) decryptor = aes_cipher.decryptor() @@ -63,24 +69,39 @@ def decrypt(self, ciphertext): # Use a PKCS7 unpadder to remove padding from decrypted value # https://cryptography.io/en/latest/hazmat/primitives/padding/ unpadder = PKCS7(AES.block_size).unpadder() + try: padded_text = decryptor.update(ciphertext) + decryptor.finalize() unpadded_text = unpadder.update(padded_text) + unpadder.finalize() except Exception as e: - raise ConfigParserException(f"Error decrypting ciphertext {ciphertext} with key {self.key.hex()}") from e + raise ConfigParserException( + f"Error decrypting ciphertext {ciphertext} with key {self.key.hex()}: {e}" + ) + logger.debug(f"Decryption result: {unpadded_text}") return unpadded_text # Decrypts encrypted config values with the provided cipher data - def decrypt_encrypted_strings(self): + def decrypt_encrypted_strings( + self, encrypted_strings: dict[str, str] + ) -> dict[str, str]: logger.debug("Decrypting encrypted strings...") + + if self.key is None: + try: + raw_key_field = self._payload.field_name_from_rva(self._aes_key_rva) + self.key = self._derive_aes_key(encrypted_strings[raw_key_field]) + except Exception as e: + raise ConfigParserException(f"Failed to derive AES key: {e}") + decrypted_config_strings = {} - for k, v in self.config_strings.items(): + for k, v in encrypted_strings.items(): # Leave empty strings as they are if len(v) == 0: logger.debug(f"Key: {k}, Value: {v}") decrypted_config_strings[k] = v continue + # Check if base64-encoded string b64_exception = False try: @@ -92,38 +113,44 @@ def decrypt_encrypted_strings(self): logger.debug(f"Key: {k}, Value: {v}") decrypted_config_strings[k] = v continue + ciphertext = decoded_val result, last_exc = None, None try: - result = decode_bytes(self.decrypt(ciphertext)) + result = decode_bytes(self._decrypt(ciphertext)) except ConfigParserException as e: last_exc = e + if result is None: logger.debug(f"Decryption failed for item {v}: {last_exc}") result = v + logger.debug(f"Key: {k}, Value: {result}") decrypted_config_strings[k] = result + logger.debug("Successfully decrypted strings") return decrypted_config_strings - # Extracts AES key candidates from the payload - def get_aes_key(self): - logger.debug("Extracting possible AES key value...") - key_hit = search( - self.PATTERN_MD5_HASH, - self.payload.data, - DOTALL, - ) - if key_hit is None: - raise ConfigParserException("Could not find AES key pattern") - key_rva = bytes_to_int(key_hit.groups()[0]) - logger.debug(f"AES key RVA: {hex(key_rva)}") - key_unhashed = self.config_strings[key_rva] + # Given the raw bytes that will become the key value, derives the AES key + def _derive_aes_key(self, key_unhashed: str) -> bytes: # Generate the MD5 hash md5_hash = md5() md5_hash.update(key_unhashed.encode("utf-8")) md5_digest = md5_hash.digest() + # Key is a 32-byte value made up of the MD5 hash overlaying itself, # tailed with one null byte - self.key = md5_digest[:15] + md5_digest[:16] + b"\x00" - logger.debug(f"AES key derived: {self.key}") + key = md5_digest[:15] + md5_digest[:16] + b"\x00" + logger.debug(f"AES key derived: {key}") + return key + + # Extracts the AES key RVA from the payload + def _get_aes_key_rva(self) -> int: + logger.debug("Extracting AES key value...") + key_hit = search(self._PATTERN_MD5_HASH, self._payload.data) + if key_hit is None: + raise ConfigParserException("Could not find AES key pattern") + + key_rva = bytes_to_int(key_hit.groups()[0]) + logger.debug(f"AES key RVA: {hex(key_rva)}") + return key_rva diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py new file mode 100644 index 00000000000..74311e5e7a2 --- /dev/null +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# +# config_decryptor_decrypt_xor.py +# +# Author: jeFF0Falltrades +# +# Provides a custom decryptor for RAT payloads utilizing the DecryptXOR +# method of embeddeding config strings +# +# Example Hash: 6e5671dec52db7f64557ba8ef70caf53cf0c782795236b03655623640f9e6a83 +# +# MIT License +# +# Copyright (c) 2024 Jeff Archer +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +from logging import getLogger +from re import DOTALL, compile, findall, search + +from ...config_parser_exception import ConfigParserException +from ..data_utils import bytes_to_int, decode_bytes +from ..dotnet_constants import PATTERN_LDSTR_OP +from ..dotnetpe_payload import DotNetPEPayload +from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException +from .config_decryptor_plaintext import ConfigDecryptorPlaintext + +logger = getLogger(__name__) + + +class ConfigDecryptorDecryptXOR(ConfigDecryptor): + _KEY_XOR_DECODED_STRINGS = "xor_decoded_strings" + + # Pattern to detect usage of DecryptXOR Method + _PATTERN_DECRYPT_XOR_BLOCK = compile( + rb"(\x2d.\x72.{3}\x70\x28.{3}\x06\x2a(?:\x02[\x16-\x1f].?\x33.\x72.{3}\x70\x28.{3}\x06\x2a){7,}.+?\x72.{3}\x70)", + flags=DOTALL, + ) + + def __init__(self, payload: DotNetPEPayload) -> None: + super().__init__(payload) + # Filled in _get_xor_metadata() + self._xor_strings: list[str] = [] + try: + self._get_xor_metadata() + except Exception as e: + raise IncompatibleDecryptorException(e) + + # Returns a list of decoded XOR-encoded strings found in the payload + def _decode_encoded_strings(self) -> list[str]: + decoded_strings = [] + + for string in self._xor_strings: + decoded = [] + # Do not modify unencoded strings + if ":" not in string: + decoded_strings.append(string) + continue + + # Split encoded string by ':' and run XOR decoding + arr, arr2 = (bytes.fromhex(arr) for arr in string.split(":")) + for idx, byte in enumerate(arr2): + decoded.append(byte ^ self.key[idx % len(self.key)] ^ arr[idx]) + decoded_strings.append(decode_bytes(bytes(decoded))) + + logger.debug(f"Decoded {len(decoded_strings)} strings") + return decoded_strings + + # Parses the config, adds decoded XOR strings, and returns the decoded + # config + def decrypt_encrypted_strings( + self, encrypted_strings: dict[str, str] + ) -> dict[str, list[str] | str]: + config = {} + # Pass off plaintext config to a ConfigDecryptorPlaintext + ptcd = ConfigDecryptorPlaintext(self._payload) + config.update(ptcd.decrypt_encrypted_strings(encrypted_strings)) + config[self._KEY_XOR_DECODED_STRINGS] = self._decode_encoded_strings() + return config + + # Gathers XOR metadata from the payload + def _get_xor_metadata(self): + dxor_block = search(self._PATTERN_DECRYPT_XOR_BLOCK, self._payload.data) + if dxor_block is None: + raise ConfigParserException("Could not identify DecryptXOR block") + logger.debug(f"DecryptXOR block found at offset {hex(dxor_block.start())}") + + # Derive all XOR-encoded string references in the DecryptXOR block + xor_string_rvas = findall(PATTERN_LDSTR_OP, dxor_block.groups()[0]) + self._xor_strings = list( + filter( + None, + [ + self._payload.user_string_from_rva(bytes_to_int(rva)) + for rva in xor_string_rvas + ], + ) + ) + logger.debug(f"{len(self._xor_strings)} XOR strings found") + + # Get the static constructor containing the XOR key + xor_key_cctor = self._payload.method_from_instruction_offset( + dxor_block.start(), step=1, by_token=True + ) + xor_key_cctor_body = self._payload.method_body_from_method(xor_key_cctor) + + # Derive the XOR key RVA and value + xor_rva = search(PATTERN_LDSTR_OP, xor_key_cctor_body) + if xor_rva is None: + raise ConfigParserException("Could not identify XOR key RVA") + xor_rva = bytes_to_int(xor_rva.groups()[0]) + self.key = bytes(self._payload.user_string_from_rva(xor_rva), encoding="utf-8") + logger.debug(f"XOR key found at {hex(xor_rva)} : {self.key}") diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py index c6e71f8a350..ebd87c49ee8 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py @@ -5,7 +5,8 @@ # Author: jeFF0Falltrades # # Provides a fall-through decryptor that will attempt to return the plaintext -# values of a found config when all other decryptors fail +# values of a found config when all other decryptors fail by matching known +# config field names from supported RAT families # # MIT License # @@ -30,18 +31,98 @@ # SOFTWARE. from logging import getLogger +from ...config_parser_exception import ConfigParserException +from ..dotnetpe_payload import DotNetPEPayload from .config_decryptor import ConfigDecryptor logger = getLogger(__name__) +KNOWN_CONFIG_FIELD_NAMES = set( + [ + "AUTHKEY", + "An_ti", + "Anti", + "Anti_Process", + "BDOS", + "BS_OD", + "Certifi_cate", + "Certificate", + "DIRECTORY", + "De_lay", + "Delay", + "DoStartup", + "ENABLELOGGER", + "EncryptionKey", + "Groub", + "Group", + "HIDEFILE", + "HIDEINSTALLSUBDIRECTORY", + "HIDELOGDIRECTORY", + "HOSTS", + "Hos_ts", + "Hosts", + "Hw_id", + "Hwid", + "INSTALL", + "INSTALLNAME", + "In_stall", + "Install", + "InstallDir", + "InstallFile", + "InstallFolder", + "InstallStr", + "Install_File", + "Install_Folder", + "Install_path", + "KEY", + "Key", + "LOGDIRECTORYNAME", + "MTX", + "MUTEX", + "Mutex", + "Paste_bin", + "Pastebin", + "Por_ts", + "Port", + "Ports", + "RECONNECTDELAY", + "SPL", + "STARTUP", + "STARTUPKEY", + "SUBDIRECTORY", + "ServerIp", + "ServerPort", + "Server_signa_ture", + "Serversignature", + "Sleep", + "TAG", + "USBNM", + "VERSION", + "Ver_sion", + "Version", + "delay", + "mutex_string", + "startup_name", + ] +) + class ConfigDecryptorPlaintext(ConfigDecryptor): - def __init__(self, payload, config_strings): - super().__init__(payload, config_strings) + # Minimum threshold for matching Field names + MIN_THRESHOLD_MATCH = 3 - def decrypt(self, ciphertext): - return ciphertext + def __init__(self, payload: DotNetPEPayload) -> None: + super().__init__(payload) - def decrypt_encrypted_strings(self): - logger.debug("Could not find applicable decryptor, returning found config as plaintext...") - return self.config_strings + # Calculates whether the config meets the minimum threshold for known Field + # Names and returns it if it does + def decrypt_encrypted_strings( + self, encrypted_strings: dict[str, str] + ) -> dict[str, str]: + field_names = set(encrypted_strings.keys()) + num_overlapping_field_names = len(KNOWN_CONFIG_FIELD_NAMES & field_names) + if num_overlapping_field_names < self.MIN_THRESHOLD_MATCH: + raise ConfigParserException( + "Plaintext threshold of known config items not met" + ) + return encrypted_strings diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py new file mode 100644 index 00000000000..747364b8a9d --- /dev/null +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# +# config_decryptor_random_hardcoded.py +# +# Author: jeFF0Falltrades +# +# Provides a custom decryptor for RAT payloads utilizing the method of +# randomly selecting from an embedded list of C2 domains/supradomains +# +# Example hash: a2817702fecb280069f0723cd2d0bfdca63763b9cdc833941c4f33bbe383d93e +# +# MIT License +# +# Copyright (c) 2024 Jeff Archer +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +from logging import getLogger +from re import DOTALL, compile, findall, search + +from ...config_parser_exception import ConfigParserException +from ..data_utils import bytes_to_int +from ..dotnet_constants import PATTERN_LDSTR_OP +from ..dotnetpe_payload import DotNetPEMethod, DotNetPEPayload +from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException +from .config_decryptor_plaintext import ConfigDecryptorPlaintext + +logger = getLogger(__name__) + + +class ConfigDecryptorRandomHardcoded(ConfigDecryptor): + _KEY_HARDCODED_HOSTS = "hardcoded_hosts" + + # Pattern to find the Method that retrieves a random domain + _PATTERN_RANDOM_DOMAIN = compile( + rb"(?:\x73.{3}\x0a){2}\x25.+?\x0a\x06(?:\x6f.{3}\x0a){2}\x0b", flags=DOTALL + ) + + def __init__(self, payload: DotNetPEPayload) -> None: + super().__init__(payload) + try: + self._random_domain_method = self._get_random_domain_method() + except Exception as e: + raise IncompatibleDecryptorException(e) + + # Returns a combined config containing config fields + hardcoded hosts + def decrypt_encrypted_strings( + self, encrypted_strings: dict[str, str] + ) -> dict[str, list[str] | str]: + config = {} + # Pass off plaintext config to a ConfigDecryptorPlaintext + ptcd = ConfigDecryptorPlaintext(self._payload) + config.update(ptcd.decrypt_encrypted_strings(encrypted_strings)) + config[self._KEY_HARDCODED_HOSTS] = self._get_hardcoded_hosts() + return config + + # Retrieves and returns a list of hardcoded hosts + def _get_hardcoded_hosts(self) -> list[str]: + random_domain_method_body = self._payload.method_body_from_method( + self._random_domain_method + ) + hardcoded_host_rvas = findall(PATTERN_LDSTR_OP, random_domain_method_body) + + hardcoded_hosts = [] + for rva in hardcoded_host_rvas: + try: + harcoded_host = self._payload.user_string_from_rva(bytes_to_int(rva)) + if harcoded_host != ".": + hardcoded_hosts.append(harcoded_host) + except Exception as e: + logger.error(f"Error translating hardcoded host at {hex(rva)}: {e}") + continue + + logger.debug(f"Hardcoded hosts found: {hardcoded_hosts}") + return hardcoded_hosts + + # Retrieves the Method that randomly selects from a list of embedded hosts + def _get_random_domain_method(self) -> DotNetPEMethod: + logger.debug("Searching for random domain method") + random_domain_marker = search(self._PATTERN_RANDOM_DOMAIN, self._payload.data) + if random_domain_marker is None: + raise ConfigParserException( + "Could not identify random domain generator method" + ) + + random_domain_method = self._payload.method_from_instruction_offset( + random_domain_marker.start() + ) + + logger.debug( + f"Random domain generator found at offset {hex(random_domain_method.offset)}" + ) + return random_domain_method diff --git a/lib/parsers_aux/ratking/utils/dotnet_constants.py b/lib/parsers_aux/ratking/utils/dotnet_constants.py index 2de1a34027a..84f82e14619 100644 --- a/lib/parsers_aux/ratking/utils/dotnet_constants.py +++ b/lib/parsers_aux/ratking/utils/dotnet_constants.py @@ -28,16 +28,19 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from enum import IntEnum +from re import DOTALL, compile -# Notable CIL Opcodes and Base RVAs +# Notable CIL Opcodes and Tokens OPCODE_LDC_I4_0 = b"\x16" -OPCODE_LDC_I4_1 = b"\x17" OPCODE_LDSTR = b"\x72" OPCODE_LDTOKEN = b"\xd0" -OPCODE_RET = b"\x2a" MDT_FIELD_DEF = 0x04000000 MDT_METHOD_DEF = 0x06000000 MDT_STRING = 0x70000000 +PATTERN_LDSTR_OP = compile( + rb"\x72(.{3}\x70)", + flags=DOTALL, +) # IntEnum derivative used for translating a SpecialFolder ID to its name diff --git a/lib/parsers_aux/ratking/utils/dotnetpe_payload.py b/lib/parsers_aux/ratking/utils/dotnetpe_payload.py index be66c0433f6..d704b3397d2 100644 --- a/lib/parsers_aux/ratking/utils/dotnetpe_payload.py +++ b/lib/parsers_aux/ratking/utils/dotnetpe_payload.py @@ -5,7 +5,7 @@ # Author: jeFF0Falltrades # # Provides a wrapper class for accessing metadata from a DotNetPE object and -# performing RVA to data offset conversions +# performing data conversions # # MIT License # @@ -28,124 +28,182 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from dataclasses import dataclass from hashlib import sha256 from logging import getLogger from dnfile import dnPE +from yara import Rules -from .config_parser_exception import ConfigParserException +from ..config_parser_exception import ConfigParserException +from .data_utils import bytes_to_int from .dotnet_constants import MDT_FIELD_DEF, MDT_METHOD_DEF, MDT_STRING logger = getLogger(__name__) +# Helper class representing a single Method +@dataclass +class DotNetPEMethod: + name: str + offset: int + rva: int + size: int + token: int + + class DotNetPEPayload: - def __init__(self, file_data, yara_rule=None): + # def __init__(self, file_path: str, yara_rule: Rules = None) -> None: + def __init__(self, file_data: bytes, yara_rule: Rules = None) -> None: # self.file_path = file_path - self.data = file_data # self.get_file_data() - self.sha256 = self.calculate_sha256() - self.dotnetpe = None + self.data = file_data # self._get_file_data() + + # Calculate SHA256 + sha256_obj = sha256() + sha256_obj.update(self.data) + self.sha256 = sha256_obj.hexdigest() + + self.dotnetpe: dnPE = None try: + # self.dotnetpe = dnPE(self.file_path, clr_lazy_load=True) self.dotnetpe = dnPE(data=file_data, clr_lazy_load=True) - except Exception as e: - logger.exception(e) + except Exception: + raise ConfigParserException("Failed to load project as dotnet executable") + self.yara_match = "" if yara_rule is not None: - self.yara_match = self.match_yara(yara_rule) + self.yara_match = self._match_yara(yara_rule) + + # Pre-sort Method table for efficient lookups + self._methods = self._generate_method_list() + self._methods_by_offset = sorted(self._methods, key=lambda m: m.offset) + self._methods_by_token = sorted(self._methods, key=lambda m: m.token) # Given a byte array's size and RVA, translates the RVA to the offset of # the byte array and returns the bytes of the array as a byte string - def byte_array_from_size_and_rva(self, arr_size, arr_rva): + def byte_array_from_size_and_rva(self, arr_size: int, arr_rva: int) -> bytes: arr_field_rva = self.fieldrva_from_rva(arr_rva) arr_offset = self.offset_from_rva(arr_field_rva) - arr_value = self.data[arr_offset : arr_offset + arr_size] - return arr_value - - # Calculates the SHA256 hash of file data - def calculate_sha256(self): - sha256_hash = sha256() - sha256_hash.update(self.data) - return sha256_hash.hexdigest() - - # Given an RVA, derives the corresponding Field name from the RVA - def field_name_from_rva(self, rva): - return self.dotnetpe.net.mdtables.Field.rows[(rva ^ MDT_FIELD_DEF) - 1].Name.value + return self.data[arr_offset : arr_offset + arr_size] + + # Given an offset, and either a terminating offset or delimiter, extracts + # the byte string + def byte_string_from_offset( + self, offset_start: int, offstart_end: int = -1, delimiter: bytes = b"\0" + ) -> bytes: + if offstart_end != -1: + try: + return self.data[offset_start:offstart_end] + except Exception: + raise ConfigParserException( + f"Could not extract string value from offset range [{hex(offset_start)}:{offstart_end}]" + ) + try: + return self.data[offset_start:].partition(delimiter)[0] + except Exception: + raise ConfigParserException( + f"Could not extract string value from offset {hex(offset_start)} with delimiter {delimiter}" + ) - # Given an RVA, derives the corresponding FieldRVA value from the RVA - def fieldrva_from_rva(self, rva): + # Given an RVA, derives the corresponding Field name + def field_name_from_rva(self, rva: int) -> str: + try: + return self.dotnetpe.net.mdtables.Field.rows[ + (rva ^ MDT_FIELD_DEF) - 1 + ].Name.value + except Exception: + raise ConfigParserException(f"Could not find Field for RVA {rva}") + + # Given an RVA, derives the corresponding FieldRVA value + def fieldrva_from_rva(self, rva: int) -> int: field_id = rva ^ MDT_FIELD_DEF for row in self.dotnetpe.net.mdtables.FieldRva: if row.struct.Field_Index == field_id: return row.struct.Rva - raise ConfigParserException(f"Could not find FieldRVA for address {rva}") + raise ConfigParserException(f"Could not find FieldRVA for RVA {rva}") + + # Generates a list of DotNetPEMethod objects for efficient lookups of method + # metadata in other operations + def _generate_method_list( + self, + ) -> list[DotNetPEMethod]: + method_objs = [] + + for idx, method in enumerate(self.dotnetpe.net.mdtables.MethodDef.rows): + method_offset = self.offset_from_rva(method.Rva) + + # Parse size from flags + flags = self.data[method_offset] + method_size = 0 + if flags & 3 == 2: # Tiny format + method_size = flags >> 2 + elif flags & 3 == 3: # Fat format (add 12-byte header) + method_size = 12 + bytes_to_int( + self.data[method_offset + 4 : method_offset + 8] + ) + + method_objs.append( + DotNetPEMethod( + method.Name.value, + method_offset, + method.Rva, + method_size, + (MDT_METHOD_DEF ^ idx) + 1, + ) + ) + return method_objs - # Reads in payload binary content - def get_file_data(self): + # Returns payload binary content + def _get_file_data(self) -> bytes: logger.debug(f"Reading contents from: {self.file_path}") try: with open(self.file_path, "rb") as fp: data = fp.read() - except Exception as e: - raise ConfigParserException(f"Error reading from path: {self.file_path}") from e - logger.debug("Successfully read data") + except Exception: + raise ConfigParserException(f"Error reading from path: {self.file_path}") + logger.debug(f"Successfully read {len(data)} bytes") return data - # Tests a given YARA rule object against the file at file_path - def match_yara(self, rule): + # Tests a given YARA rule object against the file at self.file_path, + # returning the matching rule's name, or "No match" + def _match_yara(self, rule: Rules) -> str: try: - match = rule.match(data=self.file_data) + match = rule.match(self.file_path) return str(match[0]) if len(match) > 0 else "No match" except Exception as e: logger.exception(e) return f"Exception encountered: {e}" - # Given a method name, returns RVAs of methods matching that name - def method_rvas_from_name(self, name): - return [row.Rva for row in self.dotnetpe.net.mdtables.MethodDef if row.Name.value == name] + # Given a DotNetPEMethod, returns its body as raw bytes + def method_body_from_method(self, method: DotNetPEMethod) -> bytes: + return self.byte_string_from_offset(method.offset, method.offset + method.size) + + # Given a Method name, returns a list of DotNetPEMethods matching that name + def methods_from_name(self, name: str) -> list[DotNetPEMethod]: + return [method for method in self._methods if method.name == name] # Given the offset to an instruction, reverses the instruction to its - # parent Method, and then finds the subsequent Method in the MethodDef - # table and returns its offset or index - def next_method_from_instruction_offset(self, ins_offset, step_back=0, by_token=False): - # Translate the instruction offset to RVA - ins_rva = self.dotnetpe.get_rva_from_offset(ins_offset) - # Get both the regular MethodDef table and a sorted (by RVA) copy - # This is because the table is not guaranteed to be ordered by RVA - methods = self.dotnetpe.net.mdtables.MethodDef.rows - sorted_methods = sorted(methods, key=lambda m: m.Rva) - # Go through the sorted table and find the Method RVA that is greater - # than the instruction RVA (the subsequent function), and use step_back - # to get the function containing the instruction if necessary - for idx, method in enumerate(sorted_methods): - if method.Rva > ins_rva: + # parent Method, optionally returning an adjacent Method using step to + # signify the direction of adjacency, and using by_token to determine + # whether to calculate adjacency by token or offset + def method_from_instruction_offset( + self, ins_offset: int, step: int = 0, by_token: bool = False + ) -> DotNetPEMethod: + for idx, method in enumerate(self._methods_by_offset): + if method.offset <= ins_offset < method.offset + method.size: return ( - # Add 1 to token ID as table starts at index 1, not 0 - methods.index(sorted_methods[idx - step_back]) + 1 + MDT_METHOD_DEF + self._methods_by_token[self._methods_by_token.index(method) + step] if by_token - else self.offset_from_rva(methods[methods.index(sorted_methods[idx - step_back])].Rva) + else self._methods_by_offset[idx + step] ) - raise ConfigParserException(f"Could not find method from instruction offset {ins_offset}") + raise ConfigParserException( + f"Could not find method from instruction offset {hex(ins_offset)}" + ) # Given an RVA, returns a data/file offset - def offset_from_rva(self, rva): + def offset_from_rva(self, rva: int) -> int: return self.dotnetpe.get_offset_from_rva(rva) - # Given a string offset, and, optionally, a delimiter, extracts the string - def string_from_offset(self, str_offset, delimiter=b"\0"): - try: - result = self.data[str_offset:].partition(delimiter)[0] - except Exception as e: - raise ConfigParserException( - f"Could not extract string value from offset {hex(str_offset)} with delimiter {delimiter}" - ) from e - return result - - def string_from_range(self, start_offset, end_offset): - try: - return self.data[start_offset, end_offset] - except Exception as e: - raise ConfigParserException(f"Could not extract string value from range {hex(start_offset)}:{hex(end_offset)}") from e - # Given an RVA, derives the corresponding User String - def user_string_from_rva(self, rva): + def user_string_from_rva(self, rva: int) -> str: return self.dotnetpe.net.user_strings.get(rva ^ MDT_STRING).value From bcc76e21ae44a1b21abb31ac30093f4ebe53ec92 Mon Sep 17 00:00:00 2001 From: jeFF0Falltrades <8444166+jeFF0Falltrades@users.noreply.github.com> Date: Fri, 27 Sep 2024 09:42:37 -0400 Subject: [PATCH 3/3] Minor fix to escape RVAs which may contain regex escape sequences --- .../ratking/utils/decryptors/config_decryptor_aes_cbc.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py index 8389886bcfc..13bd9b4a359 100644 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py +++ b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py @@ -31,7 +31,7 @@ # SOFTWARE. from base64 import b64decode from logging import getLogger -from re import DOTALL, compile, search +from re import DOTALL, compile, escape, search from typing import Tuple from cryptography.hazmat.backends import default_backend @@ -229,7 +229,7 @@ def _get_aes_key_rva(self, metadata_ins_offset: int) -> int: # Insert this RVA into the KEY_BASE pattern to find where the AES key # is initialized key_hit = search( - self._PATTERN_AES_KEY_BASE % int_to_bytes(metadata_method_token), + self._PATTERN_AES_KEY_BASE % escape(int_to_bytes(metadata_method_token)), self._payload.data, DOTALL, ) @@ -270,7 +270,7 @@ def _get_aes_salt(self, salt_rva: int) -> bytes: # stsfld uint8[] Client.Algorithm.Aes256::Salt # ret aes_salt_initialization = self._payload.data.find( - self._PATTERN_AES_SALT_INIT % salt_rva + self._PATTERN_AES_SALT_INIT % escape(salt_rva) ) if aes_salt_initialization == -1: raise ConfigParserException("Could not identify AES salt initialization") @@ -308,3 +308,4 @@ def _get_aes_salt(self, salt_rva: int) -> bytes: logger.debug(f"Found salt value: {salt.hex()}") return salt +