diff --git a/empire/server/common/config.py b/empire/server/common/config.py index b04bb3fab..680c8f757 100644 --- a/empire/server/common/config.py +++ b/empire/server/common/config.py @@ -1,30 +1,68 @@ import sys -from typing import Dict +from typing import Dict, Union import yaml +from pydantic import BaseModel, Field from empire.server.common import helpers -class EmpireConfig(object): - def __init__(self): - self.yaml: Dict = {} - if "--config" in sys.argv: - location = sys.argv[sys.argv.index("--config") + 1] - print(f"Loading config from {location}") - self.set_yaml(location) - if len(self.yaml.items()) == 0: - print(helpers.color("[*] Loading default config")) - self.set_yaml("./empire/server/config.yaml") - - def set_yaml(self, location: str): - try: - with open(location, "r") as stream: - self.yaml = yaml.safe_load(stream) - except yaml.YAMLError as exc: - print(exc) - except FileNotFoundError as exc: - print(exc) - - -empire_config = EmpireConfig() +class DatabaseConfig(BaseModel): + type: str + defaults: Dict[str, Union[bool, int, str]] + + # sqlite + location: str = "empire/server/data/empire.db" + + # mysql + url: str = "localhost:3306" + username: str = "" + password: str = "" + + +class ModulesConfig(BaseModel): + # todo vr In 5.0 we should pick a single naming convention for config. + retain_last_value: bool = Field(alias="retain-last-value") + + +class DirectoriesConfig(BaseModel): + downloads: str + module_source: str + obfuscated_module_source: str + + +class EmpireConfig(BaseModel): + supress_self_cert_warning: bool = Field( + alias="supress-self-cert-warning", default=True + ) + database: DatabaseConfig + modules: ModulesConfig + plugins: Dict[str, Dict[str, str]] = {} + directories: DirectoriesConfig + + # For backwards compatibility + @property + def yaml(self): + return self.dict() + + +def set_yaml(location: str): + try: + with open(location, "r") as stream: + return yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(exc) + except FileNotFoundError as exc: + print(exc) + + +config_dict = {} +if "--config" in sys.argv: + location = sys.argv[sys.argv.index("--config") + 1] + print(f"Loading config from {location}") + config_dict = set_yaml(location) +if len(config_dict.items()) == 0: + print(helpers.color("[*] Loading default config")) + config_dict = set_yaml("./empire/server/config.yaml") + +empire_config = EmpireConfig(**config_dict) diff --git a/empire/server/common/modules.py b/empire/server/common/modules.py index a8344f638..2a93a2fc8 100644 --- a/empire/server/common/modules.py +++ b/empire/server/common/modules.py @@ -147,7 +147,7 @@ def execute_module( msg = f"tasked agent {session_id} to run module {module.name}" self.main_menu.agents.save_agent_log(session_id, msg) - if empire_config.yaml.get("modules", {}).get("retain-last-value", True): + if empire_config.modules.retain_last_value: self._set_default_values(module, cleaned_options) return {"success": True, "taskID": task_id, "msg": msg}, None @@ -160,9 +160,9 @@ def get_module_source( """ try: if obfuscate: - obfuscated_module_source = empire_config.yaml.get("directories", {})[ - "obfuscated_module_source" - ] + obfuscated_module_source = ( + empire_config.directories.obfuscated_module_source + ) module_path = os.path.join(obfuscated_module_source, module_name) # If pre-obfuscated module exists then return code if os.path.exists(module_path): @@ -172,9 +172,7 @@ def get_module_source( # If pre-obfuscated module does not exist then generate obfuscated code and return it else: - module_source = empire_config.yaml.get("directories", {})[ - "module_source" - ] + module_source = empire_config.directories.module_source module_path = os.path.join(module_source, module_name) with open(module_path, "r") as f: module_code = f.read() @@ -187,9 +185,7 @@ def get_module_source( # Use regular/unobfuscated code else: - module_source = empire_config.yaml.get("directories", {})[ - "module_source" - ] + module_source = empire_config.directories.module_source module_path = os.path.join(module_source, module_name) with open(module_path, "r") as f: module_code = f.read() @@ -334,7 +330,7 @@ def _generate_script_python( ) -> Tuple[Optional[str], Optional[str]]: if module.script_path: script_path = os.path.join( - empire_config.yaml.get("directories", {})["module_source"], + empire_config.directories.module_source, module.script_path, ) with open(script_path, "r") as stream: @@ -530,7 +526,7 @@ def _load_module(self, yaml_module, root_path, file_path: str): elif my_model.script_path: if not path.exists( os.path.join( - empire_config.yaml.get("directories", {})["module_source"], + empire_config.directories.module_source, my_model.script_path, ) ): diff --git a/empire/server/database/base.py b/empire/server/database/base.py index b19409359..5ebbdd5f7 100644 --- a/empire/server/database/base.py +++ b/empire/server/database/base.py @@ -13,22 +13,26 @@ ) from empire.server.database.models import Base -database_config = empire_config.yaml.get('database', {}) - -if database_config.get('type') == 'mysql': - url = database_config.get('url') - username = database_config.get('username') or '' - password = database_config.get('password') or '' - engine = create_engine(f'mysql+pymysql://{username}:{password}@{url}/empire', echo=False) +database_config = empire_config.database + +if database_config.type == "mysql": + url = database_config.url + username = database_config.username + password = database_config.password + engine = create_engine( + f"mysql+pymysql://{username}:{password}@{url}/empire", echo=False + ) else: - location = database_config.get('location', 'data/empire.db') - engine = create_engine(f'sqlite:///{location}?check_same_thread=false', echo=False) + location = database_config.location + engine = create_engine(f"sqlite:///{location}?check_same_thread=false", echo=False) Session = scoped_session(sessionmaker(bind=engine)) args = arguments.args if args.reset: - choice = input("\x1b[1;33m[>] Would you like to reset your Empire instance? [y/N]: \x1b[0m") + choice = input( + "\x1b[1;33m[>] Would you like to reset your Empire instance? [y/N]: \x1b[0m" + ) if choice.lower() == "y": # The reset script will delete the default db file. This will drop tables if connected to MySQL or # a different SQLite .db file. @@ -48,32 +52,32 @@ def color(string, color=None): """ attr = [] # bold - attr.append('1') + attr.append("1") if color: if color.lower() == "red": - attr.append('31') + attr.append("31") elif color.lower() == "green": - attr.append('32') + attr.append("32") elif color.lower() == "yellow": - attr.append('33') + attr.append("33") elif color.lower() == "blue": - attr.append('34') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string) + attr.append("34") + return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string) else: if string.strip().startswith("[!]"): - attr.append('31') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string) + attr.append("31") + return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string) elif string.strip().startswith("[+]"): - attr.append('32') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string) + attr.append("32") + return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string) elif string.strip().startswith("[*]"): - attr.append('34') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string) + attr.append("34") + return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string) elif string.strip().startswith("[>]"): - attr.append('33') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string) + attr.append("33") + return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string) else: return string @@ -81,20 +85,20 @@ def color(string, color=None): # When Empire starts up for the first time, it will create the database and create # these default records. if len(Session().query(models.User).all()) == 0: - print(color('[*] Setting up database.')) - print(color('[*] Adding default user.')) + print(color("[*] Setting up database.")) + print(color("[*] Adding default user.")) Session().add(get_default_user()) Session().commit() Session.remove() if len(Session().query(models.Config).all()) == 0: - print(color('[*] Adding database config.')) + print(color("[*] Adding database config.")) Session().add(get_default_config()) Session().commit() Session.remove() if len(Session().query(models.Function).all()) == 0: - print(color('[*] Adding default keyword obfuscation functions.')) + print(color("[*] Adding default keyword obfuscation functions.")) functions = get_default_functions() for function in functions: diff --git a/empire/server/database/defaults.py b/empire/server/database/defaults.py index 4b31b14c2..e68d0b967 100644 --- a/empire/server/database/defaults.py +++ b/empire/server/database/defaults.py @@ -8,55 +8,73 @@ from empire.server.common.config import empire_config from empire.server.database import models -database_config = empire_config.yaml.get('database', {}).get('defaults', {}) +database_config = empire_config.database.defaults def get_default_hashed_password(): - password = database_config.get('password', 'password123') - password = bytes(password, 'UTF-8') + password = database_config.get("password", "password123") + password = bytes(password, "UTF-8") return bcrypt.hashpw(password, bcrypt.gensalt()) def get_default_user(): - return models.User(username=database_config.get('username', 'empireadmin'), - password=get_default_hashed_password(), - enabled=True, - admin=True) + return models.User( + username=database_config.get("username", "empireadmin"), + password=get_default_hashed_password(), + enabled=True, + admin=True, + ) def get_default_config(): # Calculate the install path. We know the project directory will always be two levels up of the current directory. # Any modifications of the folder structure will need to be applied here. install_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - return models.Config(staging_key=get_staging_key(), - install_path=install_path, - ip_whitelist=database_config.get('ip-whitelist', ''), - ip_blacklist=database_config.get('ip-blacklist', ''), - autorun_command="", - autorun_data="", - rootuser=True, - obfuscate=database_config.get('obfuscate', False), - obfuscate_command=database_config.get('obfuscate-command', r'Token\All\1')) + return models.Config( + staging_key=get_staging_key(), + install_path=install_path, + ip_whitelist=database_config.get("ip-whitelist", ""), + ip_blacklist=database_config.get("ip-blacklist", ""), + autorun_command="", + autorun_data="", + rootuser=True, + obfuscate=database_config.get("obfuscate", False), + obfuscate_command=database_config.get("obfuscate-command", r"Token\All\1"), + ) def get_default_functions(): return [ - models.Function(keyword='Invoke_Empire', - replacement=''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))), - models.Function(keyword='Invoke_Mimikatz', - replacement=''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))) + models.Function( + keyword="Invoke_Empire", + replacement="".join( + random.choice(string.ascii_uppercase + string.digits) for _ in range(5) + ), + ), + models.Function( + keyword="Invoke_Mimikatz", + replacement="".join( + random.choice(string.ascii_uppercase + string.digits) for _ in range(5) + ), + ), ] def get_staging_key(): # Staging Key is set up via environmental variable or config.yaml. By setting RANDOM a randomly selected password # will automatically be selected. - staging_key = os.getenv('STAGING_KEY') or database_config.get('staging-key', 'BLANK') - punctuation = '!#%&()*+,-./:;<=>?@[]^_{|}~' + staging_key = os.getenv("STAGING_KEY") or database_config.get( + "staging-key", "BLANK" + ) + punctuation = "!#%&()*+,-./:;<=>?@[]^_{|}~" if staging_key == "BLANK": - choice = input("\n [>] Enter server negotiation password, enter for random generation: ") + choice = input( + "\n [>] Enter server negotiation password, enter for random generation: " + ) if choice != "" and choice != "RANDOM": - return hashlib.md5(choice.encode('utf-8')).hexdigest() + return hashlib.md5(choice.encode("utf-8")).hexdigest() - print('\x1b[1;34m[*] Generating random staging key\x1b[0m') - return ''.join(random.sample(string.ascii_letters + string.digits + punctuation, 32)) + print("\x1b[1;34m[*] Generating random staging key\x1b[0m") + return "".join( + random.sample(string.ascii_letters + string.digits + punctuation, 32) + ) diff --git a/empire/server/server.py b/empire/server/server.py index 4fd1825d6..264171742 100755 --- a/empire/server/server.py +++ b/empire/server/server.py @@ -52,7 +52,7 @@ cli.show_server_banner = lambda *x: None # Disable http warnings -if empire_config.yaml.get("suppress-self-cert-warning", True): +if empire_config.supress_self_cert_warning: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Set proxy IDs @@ -3269,7 +3269,7 @@ def autostart_plugins(): """ Autorun plugin commands at server startup. """ - plugins = empire_config.yaml.get("plugins") + plugins = empire_config.plugins if plugins: for plugin in plugins: use_plugin = main.loadedPlugins[plugin] diff --git a/empire/test/conftest.py b/empire/test/conftest.py index 6ace22bfd..0f3a778d2 100644 --- a/empire/test/conftest.py +++ b/empire/test/conftest.py @@ -1,8 +1,18 @@ +import os import sys +from pathlib import Path import pytest +DEFAULT_ARGV = ["", "server", "--config", "empire/test/test_config.yaml"] + @pytest.fixture(scope="session", autouse=True) def setup_args(): - sys.argv = ["", "server", "--config", "empire/test/test_config.yaml"] + os.chdir(Path(os.path.dirname(os.path.abspath(__file__))).parent.parent) + sys.argv = DEFAULT_ARGV + + +@pytest.fixture(scope="session") +def default_argv(): + return DEFAULT_ARGV diff --git a/empire/test/test_config.py b/empire/test/test_config.py new file mode 100644 index 000000000..ed3ab5e63 --- /dev/null +++ b/empire/test/test_config.py @@ -0,0 +1,31 @@ +import sys +from importlib import reload + + +def test_load_sqlite(): + import empire.server.common.config + + reload(empire.server.common.config) + from empire.server.common.config import EmpireConfig, empire_config + + config: EmpireConfig = empire_config + + assert config.database.type == "sqlite" + assert config.database.location == "empire/server/data/test_empire.db" + + +def test_load_mysql(default_argv): + sys.argv = ["", "server", "--config", "empire/test/test_config_mysql.yaml"] + import empire.server.common.config + + reload(empire.server.common.config) + from empire.server.common.config import EmpireConfig, empire_config + + config: EmpireConfig = empire_config + + assert config.database.type == "mysql" + assert config.database.url == "localhost:3306" + + # set back to sqlite for subsequent tests + sys.argv = default_argv + reload(empire.server.common.config) diff --git a/empire/test/test_config_mysql.yaml b/empire/test/test_config_mysql.yaml new file mode 100644 index 000000000..51388b42f --- /dev/null +++ b/empire/test/test_config_mysql.yaml @@ -0,0 +1,25 @@ +suppress-self-cert-warning: true +database: + type: mysql + url: localhost:3306 + defaults: + # staging key will first look at OS environment variables, then here. + # If empty, will be prompted (like Empire <3.7). + staging-key: RANDOM + username: empireadmin + password: password123 + obfuscate: false + # Note the escaped backslashes + obfuscate-command: "Token\\All\\1" + # an IP white list to ONLY accept clients from + # format is "192.168.1.1,192.168.1.10-192.168.1.100,10.0.0.0/8" + ip-whitelist: "" + # an IP black list to reject accept clients from + # format is "192.168.1.1,192.168.1.10-192.168.1.100,10.0.0.0/8" + ip-blacklist: "" +modules: + retain-last-value: false +directories: + downloads: empire/server/downloads/ + module_source: empire/server/data/module_source/ + obfuscated_module_source: empire/server/data/obfuscated_module_source/ \ No newline at end of file