Skip to content

Commit

Permalink
Typed config (#308)
Browse files Browse the repository at this point in the history
* make config a typed object to reduce the pain of far-nested dicts and do some validation on-load

* cleanup

* try to get tests to run without mysql

* switch from union type
  • Loading branch information
vinnybod authored Mar 19, 2022
1 parent fac9506 commit 5bfa9fa
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 92 deletions.
84 changes: 61 additions & 23 deletions empire/server/common/config.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,68 @@
import sys
from typing import Dict
from typing import Dict, Union

import yaml
from pydantic import BaseModel, Field

from empire.server.common import helpers


class EmpireConfig(object):
def __init__(self):
self.yaml: Dict = {}
if "--config" in sys.argv:
location = sys.argv[sys.argv.index("--config") + 1]
print(f"Loading config from {location}")
self.set_yaml(location)
if len(self.yaml.items()) == 0:
print(helpers.color("[*] Loading default config"))
self.set_yaml("./empire/server/config.yaml")

def set_yaml(self, location: str):
try:
with open(location, "r") as stream:
self.yaml = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
except FileNotFoundError as exc:
print(exc)


empire_config = EmpireConfig()
class DatabaseConfig(BaseModel):
type: str
defaults: Dict[str, Union[bool, int, str]]

# sqlite
location: str = "empire/server/data/empire.db"

# mysql
url: str = "localhost:3306"
username: str = ""
password: str = ""


class ModulesConfig(BaseModel):
# todo vr In 5.0 we should pick a single naming convention for config.
retain_last_value: bool = Field(alias="retain-last-value")


class DirectoriesConfig(BaseModel):
downloads: str
module_source: str
obfuscated_module_source: str


class EmpireConfig(BaseModel):
supress_self_cert_warning: bool = Field(
alias="supress-self-cert-warning", default=True
)
database: DatabaseConfig
modules: ModulesConfig
plugins: Dict[str, Dict[str, str]] = {}
directories: DirectoriesConfig

# For backwards compatibility
@property
def yaml(self):
return self.dict()


def set_yaml(location: str):
try:
with open(location, "r") as stream:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
except FileNotFoundError as exc:
print(exc)


config_dict = {}
if "--config" in sys.argv:
location = sys.argv[sys.argv.index("--config") + 1]
print(f"Loading config from {location}")
config_dict = set_yaml(location)
if len(config_dict.items()) == 0:
print(helpers.color("[*] Loading default config"))
config_dict = set_yaml("./empire/server/config.yaml")

empire_config = EmpireConfig(**config_dict)
20 changes: 8 additions & 12 deletions empire/server/common/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def execute_module(
msg = f"tasked agent {session_id} to run module {module.name}"
self.main_menu.agents.save_agent_log(session_id, msg)

if empire_config.yaml.get("modules", {}).get("retain-last-value", True):
if empire_config.modules.retain_last_value:
self._set_default_values(module, cleaned_options)

return {"success": True, "taskID": task_id, "msg": msg}, None
Expand All @@ -160,9 +160,9 @@ def get_module_source(
"""
try:
if obfuscate:
obfuscated_module_source = empire_config.yaml.get("directories", {})[
"obfuscated_module_source"
]
obfuscated_module_source = (
empire_config.directories.obfuscated_module_source
)
module_path = os.path.join(obfuscated_module_source, module_name)
# If pre-obfuscated module exists then return code
if os.path.exists(module_path):
Expand All @@ -172,9 +172,7 @@ def get_module_source(

# If pre-obfuscated module does not exist then generate obfuscated code and return it
else:
module_source = empire_config.yaml.get("directories", {})[
"module_source"
]
module_source = empire_config.directories.module_source
module_path = os.path.join(module_source, module_name)
with open(module_path, "r") as f:
module_code = f.read()
Expand All @@ -187,9 +185,7 @@ def get_module_source(

# Use regular/unobfuscated code
else:
module_source = empire_config.yaml.get("directories", {})[
"module_source"
]
module_source = empire_config.directories.module_source
module_path = os.path.join(module_source, module_name)
with open(module_path, "r") as f:
module_code = f.read()
Expand Down Expand Up @@ -334,7 +330,7 @@ def _generate_script_python(
) -> Tuple[Optional[str], Optional[str]]:
if module.script_path:
script_path = os.path.join(
empire_config.yaml.get("directories", {})["module_source"],
empire_config.directories.module_source,
module.script_path,
)
with open(script_path, "r") as stream:
Expand Down Expand Up @@ -530,7 +526,7 @@ def _load_module(self, yaml_module, root_path, file_path: str):
elif my_model.script_path:
if not path.exists(
os.path.join(
empire_config.yaml.get("directories", {})["module_source"],
empire_config.directories.module_source,
my_model.script_path,
)
):
Expand Down
60 changes: 32 additions & 28 deletions empire/server/database/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,26 @@
)
from empire.server.database.models import Base

database_config = empire_config.yaml.get('database', {})

if database_config.get('type') == 'mysql':
url = database_config.get('url')
username = database_config.get('username') or ''
password = database_config.get('password') or ''
engine = create_engine(f'mysql+pymysql://{username}:{password}@{url}/empire', echo=False)
database_config = empire_config.database

if database_config.type == "mysql":
url = database_config.url
username = database_config.username
password = database_config.password
engine = create_engine(
f"mysql+pymysql://{username}:{password}@{url}/empire", echo=False
)
else:
location = database_config.get('location', 'data/empire.db')
engine = create_engine(f'sqlite:///{location}?check_same_thread=false', echo=False)
location = database_config.location
engine = create_engine(f"sqlite:///{location}?check_same_thread=false", echo=False)

Session = scoped_session(sessionmaker(bind=engine))

args = arguments.args
if args.reset:
choice = input("\x1b[1;33m[>] Would you like to reset your Empire instance? [y/N]: \x1b[0m")
choice = input(
"\x1b[1;33m[>] Would you like to reset your Empire instance? [y/N]: \x1b[0m"
)
if choice.lower() == "y":
# The reset script will delete the default db file. This will drop tables if connected to MySQL or
# a different SQLite .db file.
Expand All @@ -48,53 +52,53 @@ def color(string, color=None):
"""
attr = []
# bold
attr.append('1')
attr.append("1")

if color:
if color.lower() == "red":
attr.append('31')
attr.append("31")
elif color.lower() == "green":
attr.append('32')
attr.append("32")
elif color.lower() == "yellow":
attr.append('33')
attr.append("33")
elif color.lower() == "blue":
attr.append('34')
return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
attr.append("34")
return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string)

else:
if string.strip().startswith("[!]"):
attr.append('31')
return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
attr.append("31")
return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string)
elif string.strip().startswith("[+]"):
attr.append('32')
return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
attr.append("32")
return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string)
elif string.strip().startswith("[*]"):
attr.append('34')
return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
attr.append("34")
return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string)
elif string.strip().startswith("[>]"):
attr.append('33')
return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
attr.append("33")
return "\x1b[%sm%s\x1b[0m" % (";".join(attr), string)
else:
return string


# When Empire starts up for the first time, it will create the database and create
# these default records.
if len(Session().query(models.User).all()) == 0:
print(color('[*] Setting up database.'))
print(color('[*] Adding default user.'))
print(color("[*] Setting up database."))
print(color("[*] Adding default user."))
Session().add(get_default_user())
Session().commit()
Session.remove()

if len(Session().query(models.Config).all()) == 0:
print(color('[*] Adding database config.'))
print(color("[*] Adding database config."))
Session().add(get_default_config())
Session().commit()
Session.remove()

if len(Session().query(models.Function).all()) == 0:
print(color('[*] Adding default keyword obfuscation functions.'))
print(color("[*] Adding default keyword obfuscation functions."))
functions = get_default_functions()

for function in functions:
Expand Down
70 changes: 44 additions & 26 deletions empire/server/database/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,73 @@
from empire.server.common.config import empire_config
from empire.server.database import models

database_config = empire_config.yaml.get('database', {}).get('defaults', {})
database_config = empire_config.database.defaults


def get_default_hashed_password():
password = database_config.get('password', 'password123')
password = bytes(password, 'UTF-8')
password = database_config.get("password", "password123")
password = bytes(password, "UTF-8")
return bcrypt.hashpw(password, bcrypt.gensalt())


def get_default_user():
return models.User(username=database_config.get('username', 'empireadmin'),
password=get_default_hashed_password(),
enabled=True,
admin=True)
return models.User(
username=database_config.get("username", "empireadmin"),
password=get_default_hashed_password(),
enabled=True,
admin=True,
)


def get_default_config():
# Calculate the install path. We know the project directory will always be two levels up of the current directory.
# Any modifications of the folder structure will need to be applied here.
install_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
return models.Config(staging_key=get_staging_key(),
install_path=install_path,
ip_whitelist=database_config.get('ip-whitelist', ''),
ip_blacklist=database_config.get('ip-blacklist', ''),
autorun_command="",
autorun_data="",
rootuser=True,
obfuscate=database_config.get('obfuscate', False),
obfuscate_command=database_config.get('obfuscate-command', r'Token\All\1'))
return models.Config(
staging_key=get_staging_key(),
install_path=install_path,
ip_whitelist=database_config.get("ip-whitelist", ""),
ip_blacklist=database_config.get("ip-blacklist", ""),
autorun_command="",
autorun_data="",
rootuser=True,
obfuscate=database_config.get("obfuscate", False),
obfuscate_command=database_config.get("obfuscate-command", r"Token\All\1"),
)


def get_default_functions():
return [
models.Function(keyword='Invoke_Empire',
replacement=''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))),
models.Function(keyword='Invoke_Mimikatz',
replacement=''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5)))
models.Function(
keyword="Invoke_Empire",
replacement="".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(5)
),
),
models.Function(
keyword="Invoke_Mimikatz",
replacement="".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(5)
),
),
]


def get_staging_key():
# Staging Key is set up via environmental variable or config.yaml. By setting RANDOM a randomly selected password
# will automatically be selected.
staging_key = os.getenv('STAGING_KEY') or database_config.get('staging-key', 'BLANK')
punctuation = '!#%&()*+,-./:;<=>?@[]^_{|}~'
staging_key = os.getenv("STAGING_KEY") or database_config.get(
"staging-key", "BLANK"
)
punctuation = "!#%&()*+,-./:;<=>?@[]^_{|}~"
if staging_key == "BLANK":
choice = input("\n [>] Enter server negotiation password, enter for random generation: ")
choice = input(
"\n [>] Enter server negotiation password, enter for random generation: "
)
if choice != "" and choice != "RANDOM":
return hashlib.md5(choice.encode('utf-8')).hexdigest()
return hashlib.md5(choice.encode("utf-8")).hexdigest()

print('\x1b[1;34m[*] Generating random staging key\x1b[0m')
return ''.join(random.sample(string.ascii_letters + string.digits + punctuation, 32))
print("\x1b[1;34m[*] Generating random staging key\x1b[0m")
return "".join(
random.sample(string.ascii_letters + string.digits + punctuation, 32)
)
4 changes: 2 additions & 2 deletions empire/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
cli.show_server_banner = lambda *x: None

# Disable http warnings
if empire_config.yaml.get("suppress-self-cert-warning", True):
if empire_config.supress_self_cert_warning:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Set proxy IDs
Expand Down Expand Up @@ -3269,7 +3269,7 @@ def autostart_plugins():
"""
Autorun plugin commands at server startup.
"""
plugins = empire_config.yaml.get("plugins")
plugins = empire_config.plugins
if plugins:
for plugin in plugins:
use_plugin = main.loadedPlugins[plugin]
Expand Down
Loading

0 comments on commit 5bfa9fa

Please sign in to comment.