diff --git a/.github/workflows/main_testing.yaml b/.github/workflows/main_testing.yaml index d4bf82817..b32dddda1 100644 --- a/.github/workflows/main_testing.yaml +++ b/.github/workflows/main_testing.yaml @@ -52,7 +52,7 @@ jobs: shell: bash strategy: matrix: - python-version: [ '3.8', '3.9', '3.10', '3.11', "3.12", "3.13.0-beta.2" ] + python-version: [ '3.9', '3.10', '3.11', "3.12", "3.13" ] platform: [ubuntu-24.04, windows-2022] runs-on: ${{ matrix.platform }} @@ -96,7 +96,7 @@ jobs: shell: bash strategy: matrix: - python-version: [ '3.8', '3.9', '3.10', '3.11' ] + python-version: [ '3.9', '3.10', '3.11' ] platform: [macos-13] runs-on: ${{ matrix.platform }} diff --git a/netmiko/base_connection.py b/netmiko/base_connection.py index f6fbff937..4ecf0fc25 100644 --- a/netmiko/base_connection.py +++ b/netmiko/base_connection.py @@ -472,15 +472,18 @@ def __init__( self.system_host_keys = system_host_keys self.alt_host_keys = alt_host_keys self.alt_key_file = alt_key_file + self.disabled_algorithms = disabled_algorithms - if disabled_algorithms: - self.disabled_algorithms = disabled_algorithms - else: - self.disabled_algorithms = ( - {"pubkeys": ["rsa-sha2-256", "rsa-sha2-512"]} - if disable_sha2_fix - else {} - ) + if disable_sha2_fix: + sha2_pubkeys = ["rsa-sha2-256", "rsa-sha2-512"] + if self.disabled_algorithms is None: + self.disabled_algorithms = {"pubkeys": sha2_pubkeys} + else: + # Merge sha2_pubkeys into pubkeys and prevent duplicates + current_pubkeys = self.disabled_algorithms.get("pubkeys", []) + self.disabled_algorithms["pubkeys"] = list( + set(current_pubkeys + sha2_pubkeys) + ) # For SSH proxy support self.ssh_config_file = ssh_config_file diff --git a/netmiko/fortinet/fortinet_ssh.py b/netmiko/fortinet/fortinet_ssh.py index f01779178..a0883cb99 100644 --- a/netmiko/fortinet/fortinet_ssh.py +++ b/netmiko/fortinet/fortinet_ssh.py @@ -1,6 +1,6 @@ import paramiko import re -from typing import Optional +from typing import Optional, Any from netmiko.no_config import NoConfig from netmiko.no_enable import NoEnable @@ -9,16 +9,24 @@ class FortinetSSH(NoConfig, NoEnable, CiscoSSHConnection): prompt_pattern = r"[#$]" - - def _modify_connection_params(self) -> None: - """Modify connection parameters prior to SSH connection.""" - paramiko_transport = getattr(paramiko, "Transport") - paramiko_transport._preferred_kex = ( - "diffie-hellman-group14-sha1", - "diffie-hellman-group-exchange-sha1", - "diffie-hellman-group-exchange-sha256", - "diffie-hellman-group1-sha1", - ) + preferred_kex = { + "diffie-hellman-group14-sha1", + "diffie-hellman-group-exchange-sha1", + "diffie-hellman-group-exchange-sha256", + "diffie-hellman-group1-sha1", + } + + def __init__(self, *args: Any, **kwargs: Any) -> None: + disabled_algorithms = kwargs.get("disabled_algorithms") + # Set this as long as no "kex" settings being passed via disabled_algorithms + if disabled_algorithms is None or not disabled_algorithms.get("kex"): + paramiko_transport = getattr(paramiko, "Transport") + paramiko_cur_kex = set(paramiko_transport._preferred_kex) + # Disable any kex not in allowed fortinet set + disabled_kex = list(paramiko_cur_kex - self.preferred_kex) + kwargs["disabled_algorithms"] = {"kex": disabled_kex} + + super().__init__(*args, **kwargs) def _try_session_preparation(self, force_data: bool = False) -> None: super()._try_session_preparation(force_data=force_data) diff --git a/tests/unit/test_base_connection.py b/tests/unit/test_base_connection.py index cd02f48d3..01ea8fa0f 100755 --- a/tests/unit/test_base_connection.py +++ b/tests/unit/test_base_connection.py @@ -4,7 +4,8 @@ from os.path import dirname, join from threading import Lock -from netmiko import NetmikoTimeoutException, log +import paramiko +from netmiko import NetmikoTimeoutException, log, ConnectHandler from netmiko.base_connection import BaseConnection RESOURCE_FOLDER = join(dirname(dirname(__file__)), "etc") @@ -493,3 +494,62 @@ def test_remove_SecretsFilter_after_disconnection(): connection.disconnect() assert not log.filters + + +def test_fortinet_kex_values(): + """Verify KEX override in Fortinet driver works properly""" + connection = ConnectHandler( + host="testhost", + device_type="fortinet", + auto_connect=False, # No need to connect for the test purposes + ) + paramiko_transport = getattr(paramiko, "Transport") + paramiko_default_kex = set(paramiko_transport._preferred_kex) + + allowed_fortinet_kex = set(connection.preferred_kex) + disabled_kex = list(paramiko_default_kex - allowed_fortinet_kex) + allowed_kex = paramiko_default_kex & allowed_fortinet_kex + + # Ensure disabled_kex matches expectations + assert disabled_kex == connection.disabled_algorithms.get("kex", []) + # Ensure allowed_kex is not an empty set + assert allowed_kex + + connection.disconnect() + + +def test_disable_sha2_fix(): + """ + Verify SHA2 fix works properly; test with fortinet device_type as it is more of an edge + case. + """ + connection = ConnectHandler( + host="testhost", + device_type="fortinet", + disable_sha2_fix=True, + auto_connect=False, # No need to connect for the test purposes + ) + paramiko_transport = getattr(paramiko, "Transport") + + # Verify fortinet kex fix and disable_sha2_fix work properly together + paramiko_default_kex = set(paramiko_transport._preferred_kex) + allowed_fortinet_kex = set(connection.preferred_kex) + disabled_kex = list(paramiko_default_kex - allowed_fortinet_kex) + allowed_kex = paramiko_default_kex & allowed_fortinet_kex + + # Ensure disabled_kex matches expectations + assert disabled_kex == connection.disabled_algorithms.get("kex", []) + # Ensure allowed_kex is not an empty set + assert allowed_kex + + # Verify 'sha2' algorithms have been disabled + paramiko_default_pubkeys = set(paramiko_transport._preferred_keys) + disabled_pubkey_algos = set(connection.disabled_algorithms.get("pubkeys", [])) + + allowed_pubkeys = paramiko_default_pubkeys - disabled_pubkey_algos + # Check allowed_pubkeys is not an empty set + assert allowed_pubkeys + # Check both 'sha2' pubkeys are not in allowed_pubkeys + assert {"rsa-sha2-512", "rsa-sha2-256"} & allowed_pubkeys == set() + + connection.disconnect()