Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ssh module refactoring separate auth/shell checks #331

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 89 additions & 82 deletions nxc/protocols/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
import logging
import time
from io import StringIO

from nxc.config import process_secret
from nxc.connection import connection, highlight
Expand All @@ -20,6 +21,7 @@ def __init__(self, args, db, host):
self.remote_version = "Unknown SSH Version"
self.server_os_platform = "Linux"
self.uac = ""
self.user_principal = ""
super().__init__(args, db, host)

def proto_flow(self):
Expand Down Expand Up @@ -74,7 +76,7 @@ def create_conn_obj(self):
except OSError:
return False

def check_if_admin(self):
def check_linux_priv(self):
self.admin_privs = False

if self.args.sudo_check:
Expand Down Expand Up @@ -184,99 +186,104 @@ def check_if_admin_sudo(self):
def plaintext_login(self, username, password, private_key=""):
self.username = username
self.password = password
stdout = None
try:
key_content = None
if self.args.key_file or private_key:
self.logger.debug(f"Logging {self.host} with username: {username}, keyfile: {self.args.key_file}")

self.conn.connect(
self.host,
port=self.port,
username=username,
passphrase=password if password != "" else None,
key_filename=private_key if private_key else self.args.key_file,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)

cred_id = self.db.add_credential(
"key",
username,
password if password != "" else "",
key=private_key,
)

else:
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(
self.host,
port=self.port,
username=username,
password=password,
timeout=self.args.ssh_timeout,
look_for_keys=False,
allow_agent=False,
banner_timeout=self.args.ssh_timeout,
)
cred_id = self.db.add_credential("plaintext", username, password)

# Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey
_, stdout, _ = self.conn.exec_command("id")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
except AuthenticationException:
self.logger.fail(f"{username}:{process_secret(password)}")
except SSHException as e:
if "Invalid key" in str(e):
self.logger.fail(f"{username}:{process_secret(password)} Could not decrypt private key, error: {e}")
if "Error reading SSH protocol banner" in str(e):
self.logger.error(f"Internal Paramiko error for {username}:{process_secret(password)}, {e}")
Comment on lines -228 to -234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please preserve the current Exception handling, as with this we can catch some weird paramiko issues

Copy link
Contributor Author

@nikaiw nikaiw Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those exception are already handled in create_conn_obj() However, I could add the logger calls to keep the previous behavior.

Edit: actually, I didn't remembered well, there are 2 connections, a first one to check that the service is present with conn_obj() which as exception handling. A second one with plaintext_login() for which I added handle_connection_failure() to handle all kind of exception

key_content = self.read_key_file(private_key or self.args.key_file)
self.connect_with_key(username, password, key_content)
cred_type = "key"
else:
self.logger.exception(e)
self.connect_with_password(username, password)
cred_type = "plaintext"

cred_id = self.db.add_credential(cred_type, username, password, key=key_content)
shell_access = self.test_shell_access()
host_id = self.db.get_hosts(self.host)[0].id
if self.admin_privs:
self.logger.debug(f"User {username} logged in successfully and is root!")
self.db.add_admin_user(cred_type, username, password, host_id=host_id, cred_id=cred_id)
display_shell_access = f"({self.user_principal}) {self.server_os_platform} - {'Shell access!' if shell_access else 'Undetermined shell access.'}"
if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"
self.logger.success(f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}")
self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
return shell_access
except Exception as e:
self.logger.exception(e)
self.conn.close()
self.handle_connection_failure(e)
return False
else:
shell_access = False
host_id = self.db.get_hosts(self.host)[0].id
finally:
self.conn.close()

if not stdout:
_, stdout, _ = self.conn.exec_command("whoami /priv")
stdout = stdout.read().decode(self.args.codec, errors="ignore")
self.server_os_platform = "Windows"
if "SeDebugPrivilege" in stdout:
self.admin_privs = True
elif "SeUndockPrivilege" in stdout:
self.admin_privs = True
self.uac = "with UAC - "
def read_key_file(self, private_key):
if self.args.key_file:
with open(self.args.key_file) as f:
private_key = f.read()
return private_key

if not stdout:
self.logger.debug(f"User: {self.username} can't get a basic shell")
self.server_os_platform = "Network Devices"
shell_access = False
else:
shell_access = True
def connect_with_key(self, username, password, key_content):
self.logger.debug(f"Logging {self.host} with username: {self.username}, key: {self.args.key_file}")
pkey = paramiko.RSAKey.from_private_key(StringIO(key_content))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is with all the other key types? This should not be limited to RSA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good catch this is a last PR modification mistake, I'll fix that

self.conn.connect(self.host, port=self.port, username=username, passphrase=password, pkey=pkey, look_for_keys=False, allow_agent=False)

self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
def connect_with_password(self, username, password):
self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}")
self.conn.connect(self.host, port=self.port, username=username, password=password, look_for_keys=False, allow_agent=False)

if shell_access and self.server_os_platform == "Linux":
self.check_if_admin()
if self.admin_privs:
self.logger.debug(f"User {username} logged in successfully and is root!")
if self.args.key_file:
self.db.add_admin_user("key", username, password, host_id=host_id, cred_id=cred_id)
def test_shell_access(self):
try:
user_stdout = self.execute_command("whoami")
if user_stdout:
self.user_principal = user_stdout.strip()
self.logger.debug(f"User: {self.user_principal}")
if "\\" in user_stdout: # Likely a Windows username in DOMAIN\user format
priv_stdout = self.execute_command("whoami /priv")
if priv_stdout:
self.server_os_platform = "Windows"
self.check_windows_priv(priv_stdout)
else:
self.db.add_admin_user("plaintext", username, password, host_id=host_id, cred_id=cred_id)

if self.args.key_file:
password = f"{process_secret(password)} (keyfile: {self.args.key_file})"
self.logger.error(f"User: {self.user_principal} unable to determine Windows privileges")
return True
else:
self.server_os_platform = "Linux"
self.check_linux_priv()
return True
else:
uname_stdout = self.execute_command("uname")
if uname_stdout.strip():
self.logger.debug("Basic shell access check passed.")
self.server_os_platform = uname_stdout
return True
else:
self.logger.debug(f"User: {self.username} Shell status undermined; key commands not found.")
self.server_os_platform = "Network Devices"
return False
except Exception as e:
self.logger.error(f"Authenticated but failed to execute command: {e}")
return False

def execute_command(self, command):
_, stdout, _ = self.conn.exec_command(command)
return stdout.read().decode(self.args.codec, errors="ignore")


display_shell_access = f"{self.uac}{self.server_os_platform}{' - Shell access!' if shell_access else ''}"
self.logger.success(f"{username}:{process_secret(password)} {self.mark_pwned()} {highlight(display_shell_access)}")
def check_windows_priv(self, output):
if "SeDebugPrivilege" in output:
self.admin_privs = True
self.user_principal = "admin"
elif "SeUndockPrivilege" in output:
self.admin_privs = True
self.user_principal = "admin (UAC)"
else:
self.user_principal = "user (low priv)"

return True
def handle_connection_failure(self, exception):
error_msg = str(exception)
if self.args.key_file:
self.password = f"{process_secret(self.password)} (keyfile: {self.args.key_file})"
if "OpenSSH private key file checkints do not match" in error_msg:
self.logger.fail(f"{self.username}:{self.password} - Could not decrypt key file, wrong password")
else:
self.logger.fail(f"{self.username}:{self.password} {exception}")

def execute(self, payload=None, get_output=False):
if not payload and self.args.execute:
Expand Down