Skip to content

Commit

Permalink
fortinet _preferred_kex settings interfering with other devices (ktby…
Browse files Browse the repository at this point in the history
…ers#3530)

Co-authored-by: Karel <[email protected]>
  • Loading branch information
ktbyers and Noppes authored Nov 8, 2024
1 parent fe00998 commit 3330854
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main_testing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
shell: bash
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10', '3.11', "3.12", "3.13.0-beta.2" ]
python-version: [ '3.9', '3.10', '3.11', "3.12", "3.13" ]
platform: [ubuntu-24.04, windows-2022]

runs-on: ${{ matrix.platform }}
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
shell: bash
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10', '3.11' ]
python-version: [ '3.9', '3.10', '3.11' ]
platform: [macos-13]

runs-on: ${{ matrix.platform }}
Expand Down
19 changes: 11 additions & 8 deletions netmiko/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,15 +472,18 @@ def __init__(
self.system_host_keys = system_host_keys
self.alt_host_keys = alt_host_keys
self.alt_key_file = alt_key_file
self.disabled_algorithms = disabled_algorithms

if disabled_algorithms:
self.disabled_algorithms = disabled_algorithms
else:
self.disabled_algorithms = (
{"pubkeys": ["rsa-sha2-256", "rsa-sha2-512"]}
if disable_sha2_fix
else {}
)
if disable_sha2_fix:
sha2_pubkeys = ["rsa-sha2-256", "rsa-sha2-512"]
if self.disabled_algorithms is None:
self.disabled_algorithms = {"pubkeys": sha2_pubkeys}
else:
# Merge sha2_pubkeys into pubkeys and prevent duplicates
current_pubkeys = self.disabled_algorithms.get("pubkeys", [])
self.disabled_algorithms["pubkeys"] = list(
set(current_pubkeys + sha2_pubkeys)
)

# For SSH proxy support
self.ssh_config_file = ssh_config_file
Expand Down
30 changes: 19 additions & 11 deletions netmiko/fortinet/fortinet_ssh.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import paramiko
import re
from typing import Optional
from typing import Optional, Any

from netmiko.no_config import NoConfig
from netmiko.no_enable import NoEnable
Expand All @@ -9,16 +9,24 @@

class FortinetSSH(NoConfig, NoEnable, CiscoSSHConnection):
prompt_pattern = r"[#$]"

def _modify_connection_params(self) -> None:
"""Modify connection parameters prior to SSH connection."""
paramiko_transport = getattr(paramiko, "Transport")
paramiko_transport._preferred_kex = (
"diffie-hellman-group14-sha1",
"diffie-hellman-group-exchange-sha1",
"diffie-hellman-group-exchange-sha256",
"diffie-hellman-group1-sha1",
)
preferred_kex = {
"diffie-hellman-group14-sha1",
"diffie-hellman-group-exchange-sha1",
"diffie-hellman-group-exchange-sha256",
"diffie-hellman-group1-sha1",
}

def __init__(self, *args: Any, **kwargs: Any) -> None:
disabled_algorithms = kwargs.get("disabled_algorithms")
# Set this as long as no "kex" settings being passed via disabled_algorithms
if disabled_algorithms is None or not disabled_algorithms.get("kex"):
paramiko_transport = getattr(paramiko, "Transport")
paramiko_cur_kex = set(paramiko_transport._preferred_kex)
# Disable any kex not in allowed fortinet set
disabled_kex = list(paramiko_cur_kex - self.preferred_kex)
kwargs["disabled_algorithms"] = {"kex": disabled_kex}

super().__init__(*args, **kwargs)

def _try_session_preparation(self, force_data: bool = False) -> None:
super()._try_session_preparation(force_data=force_data)
Expand Down
62 changes: 61 additions & 1 deletion tests/unit/test_base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from os.path import dirname, join
from threading import Lock

from netmiko import NetmikoTimeoutException, log
import paramiko
from netmiko import NetmikoTimeoutException, log, ConnectHandler
from netmiko.base_connection import BaseConnection

RESOURCE_FOLDER = join(dirname(dirname(__file__)), "etc")
Expand Down Expand Up @@ -493,3 +494,62 @@ def test_remove_SecretsFilter_after_disconnection():
connection.disconnect()

assert not log.filters


def test_fortinet_kex_values():
"""Verify KEX override in Fortinet driver works properly"""
connection = ConnectHandler(
host="testhost",
device_type="fortinet",
auto_connect=False, # No need to connect for the test purposes
)
paramiko_transport = getattr(paramiko, "Transport")
paramiko_default_kex = set(paramiko_transport._preferred_kex)

allowed_fortinet_kex = set(connection.preferred_kex)
disabled_kex = list(paramiko_default_kex - allowed_fortinet_kex)
allowed_kex = paramiko_default_kex & allowed_fortinet_kex

# Ensure disabled_kex matches expectations
assert disabled_kex == connection.disabled_algorithms.get("kex", [])
# Ensure allowed_kex is not an empty set
assert allowed_kex

connection.disconnect()


def test_disable_sha2_fix():
"""
Verify SHA2 fix works properly; test with fortinet device_type as it is more of an edge
case.
"""
connection = ConnectHandler(
host="testhost",
device_type="fortinet",
disable_sha2_fix=True,
auto_connect=False, # No need to connect for the test purposes
)
paramiko_transport = getattr(paramiko, "Transport")

# Verify fortinet kex fix and disable_sha2_fix work properly together
paramiko_default_kex = set(paramiko_transport._preferred_kex)
allowed_fortinet_kex = set(connection.preferred_kex)
disabled_kex = list(paramiko_default_kex - allowed_fortinet_kex)
allowed_kex = paramiko_default_kex & allowed_fortinet_kex

# Ensure disabled_kex matches expectations
assert disabled_kex == connection.disabled_algorithms.get("kex", [])
# Ensure allowed_kex is not an empty set
assert allowed_kex

# Verify 'sha2' algorithms have been disabled
paramiko_default_pubkeys = set(paramiko_transport._preferred_keys)
disabled_pubkey_algos = set(connection.disabled_algorithms.get("pubkeys", []))

allowed_pubkeys = paramiko_default_pubkeys - disabled_pubkey_algos
# Check allowed_pubkeys is not an empty set
assert allowed_pubkeys
# Check both 'sha2' pubkeys are not in allowed_pubkeys
assert {"rsa-sha2-512", "rsa-sha2-256"} & allowed_pubkeys == set()

connection.disconnect()

0 comments on commit 3330854

Please sign in to comment.