diff --git a/test/qa/lib/firewall.py b/test/qa/lib/firewall.py index b2b5f3b6..f4c58e5b 100644 --- a/test/qa/lib/firewall.py +++ b/test/qa/lib/firewall.py @@ -78,6 +78,19 @@ # -A OUTPUT -o {iface} -m mark --mark 0xe1f1 -m comment --comment nordvpn -j ACCEPT # -A OUTPUT -o {iface} -m comment --comment nordvpn -j DROP +inputLanDiscoveryRules = [ + "-A INPUT -s 169.254.0.0/16 -i eth0 -m comment --comment nordvpn -j ACCEPT", + "-A INPUT -s 192.168.0.0/16 -i eth0 -m comment --comment nordvpn -j ACCEPT", + "-A INPUT -s 172.16.0.0/12 -i eth0 -m comment --comment nordvpn -j ACCEPT", + "-A INPUT -s 10.0.0.0/8 -i eth0 -m comment --comment nordvpn -j ACCEPT", +] + +outputLanDiscoveryRules = [ + "-A OUTPUT -d 169.254.0.0/16 -o eth0 -m comment --comment nordvpn -j ACCEPT", + "-A OUTPUT -d 192.168.0.0/16 -o eth0 -m comment --comment nordvpn -j ACCEPT", + "-A OUTPUT -d 172.16.0.0/12 -o eth0 -m comment --comment nordvpn -j ACCEPT", + "-A OUTPUT -d 10.0.0.0/8 -o eth0 -m comment --comment nordvpn -j ACCEPT", +] # ToDo: Add missing IPv6 rules (icmp6 & dhcp6) def _get_firewall_rules(killswitch, server_ip, iface, port="", protocol="", subnet=""): diff --git a/test/qa/lib/meshnet.py b/test/qa/lib/meshnet.py index d2fe5025..82f2ad95 100644 --- a/test/qa/lib/meshnet.py +++ b/test/qa/lib/meshnet.py @@ -8,11 +8,19 @@ PEER_USERNAME = os.environ.get("QA_PEER_USERNAME") +LANS = [ + "169.254.0.0/16", + "192.168.0.0/16", + "172.16.0.0/12", + "10.0.0.0/8", +] + class PeerName(Enum): Hostname = 0 Ip = 1 Pubkey = 2 + def get_peer_name(output: str, name_type: PeerName) -> str: match name_type: case PeerName.Hostname: @@ -22,16 +30,37 @@ def get_peer_name(output: str, name_type: PeerName) -> str: case PeerName.Pubkey: return get_this_device_pubkey(output) -def add_peer(ssh_client: ssh.Ssh, tester_allow_fileshare: bool = True, peer_allow_fileshare: bool = True): + +def add_peer(ssh_client: ssh.Ssh, + tester_allow_fileshare: bool = True, + tester_allow_routing: bool = True, + tester_allow_local: bool = True, + tester_allow_incoming: bool = True, + peer_allow_fileshare: bool = True, + peer_allow_routing: bool = True, + peer_allow_local: bool = True, + peer_allow_incoming: bool = True): """ adds QA peer to meshnet try to minimize usage of this, because there's a weekly invite limit """ - tester_allow_fileshare_arg = f"-allow-peer-send-files={str(tester_allow_fileshare).lower()}" - peer_allow_fileshare_arg = f"-allow-peer-send-files={str(peer_allow_fileshare).lower()}" - sh.nordvpn.mesh.inv.send("--allow-incoming-traffic=true", "--allow-traffic-routing=true", tester_allow_fileshare_arg, PEER_USERNAME) + tester_allow_fileshare_arg = f"--allow-peer-send-files={str(tester_allow_fileshare).lower()}" + tester_allow_routing_arg = f"--allow-traffic-routing={str(tester_allow_routing).lower()}" + tester_allow_local_arg = f"--allow-local-network-access={str(tester_allow_local).lower()}" + tester_allow_incoming_arg = f"--allow-incoming-traffic={str(tester_allow_incoming).lower()}" + + + peer_allow_fileshare_arg = f"--allow-peer-send-files={str(peer_allow_fileshare).lower()}" + peer_allow_routing_arg = f"--allow-traffic-routing={str(peer_allow_routing).lower()}" + peer_allow_local_arg = f"--allow-local-network-access={str(peer_allow_local).lower()}" + peer_allow_incoming_arg = f"--allow-incoming-traffic={str(peer_allow_incoming).lower()}" + + sh.nordvpn.mesh.inv.send(tester_allow_incoming_arg, tester_allow_local_arg, tester_allow_routing_arg, tester_allow_fileshare_arg, PEER_USERNAME) local_user, _ = login.get_default_credentials() - ssh_client.exec_command(f"yes | nordvpn mesh inv accept --allow-incoming-traffic=true --allow-traffic-routing=true {peer_allow_fileshare_arg} {local_user}") + ssh_client.exec_command(f"yes | nordvpn mesh inv accept {peer_allow_local_arg} {peer_allow_incoming_arg} {peer_allow_routing_arg} {peer_allow_fileshare_arg} {local_user}") + + sh.nordvpn.mesh.peer.refresh() + def get_peers(output: str) -> list: """parses list of peer names from 'nordvpn meshnet peer list' output""" @@ -42,6 +71,7 @@ def get_peers(output: str) -> list: peers.append(line.split(" ")[1]) return peers + def get_this_device(output: str): """parses current device hostname from 'nordvpn meshnet peer list' output""" output_lines = output.split("\n") @@ -49,6 +79,7 @@ def get_this_device(output: str): if line.find("This device:") != -1: return output_lines[i+1].split(" ")[1] + def get_this_device_ipv4(output: str): """parses current device ip from 'nordvpn meshnet peer list' output""" output_lines = output.split("\n") @@ -56,6 +87,7 @@ def get_this_device_ipv4(output: str): if line.find("This device:") != -1: return output_lines[i+2].split(" ")[1] + def get_this_device_pubkey(output: str): """parses current device pubkey from 'nordvpn meshnet peer list' output""" output_lines = output.split("\n") @@ -64,18 +96,21 @@ def get_this_device_pubkey(output: str): # example: Public Key: uAexQo2yuiVBZocvuiFPQjAujkDmQVemKaircpxDaUc= return output_lines[i+3].split(" ")[2] + def remove_all_peers(): """removes all meshnet peers from local device""" output = f"{sh.nordvpn.mesh.peer.list(_tty_out=False)}" # convert to string, _tty_out false disables colors for p in get_peers(output): sh.nordvpn.mesh.peer.remove(p) + def remove_all_peers_in_peer(ssh_client: ssh.Ssh): """removes all meshnet peers from peer device""" output = ssh_client.exec_command("nordvpn mesh peer list") for p in get_peers(output): ssh_client.exec_command(f"nordvpn mesh peer remove {p}") + def is_peer_reachable(ssh_client: ssh.Ssh, retry: int = 5) -> bool: """returns True when ping to peer succeeds.""" output = ssh_client.exec_command("nordvpn mesh peer list") @@ -94,6 +129,7 @@ def is_peer_reachable(ssh_client: ssh.Ssh, retry: int = 5) -> bool: print(output) return False + def get_sent_invites(output: str) -> list: """parses list of sent invites from 'nordvpn meshnet inv list' output""" emails = [] @@ -104,12 +140,14 @@ def get_sent_invites(output: str) -> list: emails.append(line.split(" ")[1]) return emails + def revoke_all_invites(): """revokes all sent meshnet invites in local device""" output = f"{sh.nordvpn.mesh.inv.list(_tty_out=False)}" # convert to string, _tty_out false disables colors for i in get_sent_invites(output): sh.nordvpn.mesh.inv.revoke(i) + def revoke_all_invites_in_peer(ssh_client: ssh.Ssh): """revokes all sent meshnet invites in peer device""" output = ssh_client.exec_command("nordvpn mesh inv list") diff --git a/test/qa/test_firewall.py b/test/qa/test_firewall.py index 14c29ef4..23b5bb05 100644 --- a/test/qa/test_firewall.py +++ b/test/qa/test_firewall.py @@ -29,6 +29,7 @@ def setup_function(function): def teardown_function(function): logging.log(data=info.collect()) logging.log() + sh.nordvpn.set("lan-discovery", "off", _ok_code=(0,1)) @pytest.mark.flaky(reruns=2, reruns_delay=90) @@ -267,4 +268,58 @@ def test_firewall_exitnode(): assert lib.is_disconnect_successful(output) assert network.is_disconnected() - assert not firewall.is_active() \ No newline at end of file + assert not firewall.is_active() + + +@pytest.mark.parametrize("before_connect", [True, False]) +def test_firewall_lan_discovery(before_connect): + if before_connect: + sh.nordvpn.set("lan-discovery", "on") + + sh.nordvpn.connect() + + if not before_connect: + sh.nordvpn.set("lan-discovery", "on") + + rules = sh.sudo.iptables("-S", "INPUT") + for rule in firewall.inputLanDiscoveryRules: + assert rule in rules, f"{rule} input rule not found in iptables." + + rules = sh.sudo.iptables("-S", "OUTPUT") + for rule in firewall.outputLanDiscoveryRules: + assert rule in rules, f"{rule} output rule not found in iptables" + + sh.nordvpn.set("lan-discovery", "off") + + rules = sh.sudo.iptables("-S", "INPUT") + for rule in firewall.inputLanDiscoveryRules: + assert rule not in rules, f"{rule} input rule not found in iptables." + + rules = sh.sudo.iptables("-S", "OUTPUT") + for rule in firewall.outputLanDiscoveryRules: + assert rule not in rules, f"{rule} output rule not found in iptables" + + +def test_firewall_lan_allowlist_interaction(): + sh.nordvpn.connect() + + subnet = "192.168.0.0/18" + + sh.nordvpn.allowlist.add.subnet(subnet) + sh.nordvpn.set("lan-discovery", "on") + + rules = sh.sudo.iptables("-S", "INPUT") + assert f"-A INPUT -s {subnet} -i eth0 -m comment --comment nordvpn -j ACCEPT" not in rules, "Whitelist rule was not removed from the INPUT chain when LAN discovery was enabled." + + rules = sh.sudo.iptables("-S", "OUTPUT") + assert f"-A OUTPUT -s {subnet} -o eth0 -m comment --comment nordvpn -j ACCEPT" not in rules, "Whitelist rule was not removed from the OUTPUT chain when LAN discovery was enabled." + + sh.nordvpn.set("lan-discovery", "off") + + rules = sh.sudo.iptables("-S", "INPUT") + for rule in firewall.inputLanDiscoveryRules: + assert rule not in rules, f"{rule} input rule not found in iptables." + + rules = sh.sudo.iptables("-S", "OUTPUT") + for rule in firewall.outputLanDiscoveryRules: + assert rule not in rules, f"{rule} output rule not found in iptables" diff --git a/test/qa/test_meshnet.py b/test/qa/test_meshnet.py index dac1e65a..60ad2917 100644 --- a/test/qa/test_meshnet.py +++ b/test/qa/test_meshnet.py @@ -22,9 +22,12 @@ def setup_module(module): meshnet.remove_all_peers_in_peer(ssh_client) meshnet.revoke_all_invites() meshnet.revoke_all_invites_in_peer(ssh_client) + meshnet.add_peer(ssh_client) def teardown_module(module): + meshnet.revoke_all_invites() + meshnet.remove_all_peers() ssh_client.exec_command("nordvpn set mesh off") sh.nordvpn.set.meshnet.off() ssh_client.exec_command("nordvpn logout --persist-token") @@ -45,12 +48,9 @@ def teardown_function(function): def test_meshnet_connect(): - with lib.Defer(meshnet.remove_all_peers): - meshnet.add_peer(ssh_client) - # Ideally peer update should happen through Notification Center, but that doesn't work often - sh.nordvpn.meshnet.peer.refresh() - assert meshnet.is_peer_reachable(ssh_client) - + # Ideally peer update should happen through Notification Center, but that doesn't work often + sh.nordvpn.meshnet.peer.refresh() + assert meshnet.is_peer_reachable(ssh_client) def test_mesh_removed_machine_by_other(): @@ -88,31 +88,210 @@ def test_mesh_removed_machine_by_other(): assert "Meshnet is not enabled." in str(e) sh.nordvpn.set.meshnet.on() # enable back on for other tests + meshnet.add_peer(ssh_client) @pytest.mark.flaky(reruns=2, reruns_delay=90) @timeout_decorator.timeout(40) # This doesn't directly test meshnet, but it uses it def test_allowlist_incoming_connection(): - with lib.Defer(meshnet.remove_all_peers): - meshnet.add_peer(ssh_client) - # Ideally peer update should happen through Notification Center, but that doesn't work often - sh.nordvpn.meshnet.peer.refresh() - my_ip = ssh_client.exec_command("echo $SSH_CLIENT").split()[0] - - peer_hostname = meshnet.get_this_device(ssh_client.exec_command("nordvpn mesh peer list")) - # Initiate ssh connection via mesh because we are going to lose the main connection - ssh_client_mesh = ssh.Ssh(peer_hostname, "root", "root") - ssh_client_mesh.connect() - with lib.Defer(ssh_client_mesh.disconnect): - ssh_client_mesh.exec_command("nordvpn c") - with lib.Defer(lambda: ssh_client_mesh.exec_command("nordvpn d")): - # We should not have direct connection anymore after connecting to VPN - with pytest.raises(sh.ErrorReturnCode_1) as ex: - assert "icmp_seq=" not in sh.ping("-c", "1", "qa-peer") - - ssh_client_mesh.exec_command(f"nordvpn allowlist add subnet {my_ip}/32") - with lib.Defer(lambda: ssh_client_mesh.exec_command("nordvpn allowlist remove all")): - # Direct connection should work again after allowlisting - assert "icmp_seq=" in sh.ping("-c", "1", "qa-peer") + my_ip = ssh_client.exec_command("echo $SSH_CLIENT").split()[0] + + peer_hostname = meshnet.get_this_device(ssh_client.exec_command("nordvpn mesh peer list")) + # Initiate ssh connection via mesh because we are going to lose the main connection + ssh_client_mesh = ssh.Ssh(peer_hostname, "root", "root") + ssh_client_mesh.connect() + with lib.Defer(ssh_client_mesh.disconnect): + ssh_client_mesh.exec_command("nordvpn c") + with lib.Defer(lambda: ssh_client_mesh.exec_command("nordvpn d")): + # We should not have direct connection anymore after connecting to VPN + with pytest.raises(sh.ErrorReturnCode_1) as ex: + assert "icmp_seq=" not in sh.ping("-c", "1", "qa-peer") + + ssh_client_mesh.exec_command(f"nordvpn allowlist add subnet {my_ip}/32") + with lib.Defer(lambda: ssh_client_mesh.exec_command("nordvpn allowlist remove all")): + # Direct connection should work again after allowlisting + assert "icmp_seq=" in sh.ping("-c", "1", "qa-peer") + + +def validate_input_chain(peer_ip: str, routing: bool, local: bool, incoming: bool, fileshare: bool) -> (bool, str): + rules = sh.sudo.iptables("-S", "INPUT") + + fileshare_rule = f"-A INPUT -s {peer_ip}/32 -p tcp -m tcp --dport 49111 -m comment --comment nordvpn -j ACCEPT" + if (fileshare_rule in rules) != fileshare: + return (False, f"Fileshare permissions configured incorrectly, rule expected: {fileshare}\nrules:{rules}") + + incoming_rule = f"-A INPUT -s {peer_ip}/32 -m comment --comment nordvpn -j ACCEPT" + if (incoming_rule in rules) != incoming: + return (False, f"Incoming permissions configured incorrectly, rule expected: {incoming}\nrules:{rules}") + + # If incoming is not enabled, no rules other than fileshare(if enabled) for that peer should be added + if not incoming: + if fileshare: + rules = rules.replace(fileshare_rule, "") + if peer_ip not in rules: + return (True, "") + else: + return (False, f"Rules for peer({peer_ip}) found in the INCOMING chain but peer does not have the icoming permissions\nrules:\n{rules}") + + incomig_rule_idx = rules.find(incoming_rule) + + for lan in meshnet.LANS: + lan_rule = f"-A INPUT -s {peer_ip}/32 -d {lan} -m comment --comment nordvpn -j DROP" + lan_rule_idx = rules.find(lan_rule) + if (routing and local) and lan_rule_idx != -1: + return (False, f"LAN/Routing permissions configured incorrectly\nlocal enabled: {local}\nrouting enabled: {routing}\nrules:\n{rules}") + # verify that lan_rule is located above the local rule + if lan_rule_idx > incomig_rule_idx: + return (False, f"LAN/Routing rules ineffective(added after incoming traffic rule)\nlocal enabled: {local}\nrouting enabled: {routing}\nrules:\n{rules}") + + return (True, "") + + +def validate_forward_chain(peer_ip: str, routing: bool, local: bool, incoming: bool, fileshare: bool) -> (bool, str): + rules = sh.sudo.iptables("-S", "FORWARD") + + # This rule is added above the LAN denial rules if both local and routing is allowed to peer, or bellow LAN denial + # if only routing is allowed. + routing_enabled_rule = f"-A FORWARD -s {peer_ip}/32 -m comment --comment nordvpn-exitnode-transient -j ACCEPT" + routing_enabled_rule_index = rules.find(routing_enabled_rule) + + if routing and (routing_enabled_rule_index == -1): + return (False, f"Routing permission not found\nrules:{rules}") + if not routing and (routing_enabled_rule_index != -1): + return (False, f"Routing permission found\nrules:{rules}") + + for lan in meshnet.LANS: + lan_drop_rule = f"-A FORWARD -s 100.64.0.0/10 -d {lan} -m comment --comment nordvpn-exitnode-transient -j DROP" + lan_drop_rule_index = rules.find(lan_drop_rule) + + # If any peer has routing or local permission, lan block rules should be added, otherwise no rules should be added. + if (routing or local) and lan_drop_rule_index == -1: + return (False, f"LAN drop rule not added for subnet {lan}\nrules:\n{rules}") + elif (not routing) and (not lan) and lan_drop_rule_index != -1: + return (False, f"LAN drop rule added for subnet {lan}\nrules:\n{rules}") + + if routing: + # Local is allowed, routing rule should be above LAN block rules to allow peer to access any subnet. + if local and (lan_drop_rule_index < routing_enabled_rule_index): + return (False, f"LAN drop rule for subnet {lan} added before routing\nrules: {rules}") + # Local is not allowed, routing rule should be bellow LAN block rules to deny peer access to local subnets. + if (not local) and (lan_drop_rule_index > routing_enabled_rule_index): + return (False, f"LAN drop rule for subnet {lan} added after routing\nrules: {rules}") + continue + + # If routing is not enabled, but lan is enabled, there should be one rule for each local network for the peer. + # They should be located abouve the LAN block rules. + if not local: + continue + + lan_allow_rule = f"-A FORWARD -s {peer_ip}/32 -d {lan} -m comment --comment nordvpn-exitnode-transient -j ACCEPT" + lan_allow_rule_index = rules.find(lan_allow_rule) + + if lan_allow_rule not in rules: + return (False, f"LAN allow rule for subnet {lan} not found\nrules:\n{rules}") + + if lan_allow_rule_index > lan_drop_rule_index: + return (False, f"LAN allow rule is added after LAN drop rule\nrules:\n{rules}") + + + return (True, "") + + +def set_permissions(peer: str, routing: bool, local: bool, incoming: bool, fileshare: bool): + def bool_to_permission(permission: bool) -> str: + if permission: + return "allow" + return "deny" + + # ignore any failures that might occur when permissions are already configured to the desired value + sh.nordvpn.mesh.peer.routing(bool_to_permission(routing), peer, _ok_code=(0, 1)) + sh.nordvpn.mesh.peer.local(bool_to_permission(local), peer, _ok_code=(0, 1)) + sh.nordvpn.mesh.peer.incoming(bool_to_permission(incoming), peer, _ok_code=(0, 1)) + sh.nordvpn.mesh.peer.fileshare(bool_to_permission(fileshare), peer, _ok_code=(0, 1)) + + +@pytest.mark.parametrize("routing", [True, False]) +@pytest.mark.parametrize("local", [True, False]) +@pytest.mark.parametrize("incoming", [True, False]) +@pytest.mark.parametrize("fileshare", [True, False]) +def test_exitnode_permissions(routing: bool, + local: bool, + incoming: bool, + fileshare: bool): + peer_ip = meshnet.get_this_device_ipv4(ssh_client.exec_command("nordvpn mesh peer list")) + set_permissions(peer_ip, routing, local, incoming, fileshare) + + (result, message) = validate_input_chain(peer_ip, routing, local, incoming, fileshare) + assert result, message + + (result, message) = validate_forward_chain(peer_ip, routing, local, incoming, fileshare) + assert result, message + + rules = sh.sudo.iptables("-S", "POSTROUTING", "-t", "nat") + + if routing: + assert f"-A POSTROUTING -s {peer_ip}/32 -o eth0 -m comment --comment nordvpn -j MASQUERADE" in rules + else: + assert f"-A POSTROUTING -s {peer_ip}/32 -o eth0 -m comment --comment nordvpn -j MASQUERADE" not in rules + + +@pytest.mark.parametrize("lan_discovery", [True, False]) +@pytest.mark.parametrize("local", [True, False]) +def test_lan_discovery_exitnode(lan_discovery: bool, local: bool): + peer_ip = meshnet.get_this_device_ipv4(ssh_client.exec_command("nordvpn mesh peer list")) + set_permissions(peer_ip, True, local, True, True) + + lan_discovery_value = "on" if lan_discovery else "off" + sh.nordvpn.set("lan-discovery", lan_discovery_value, _ok_code=(0, 1)) + + # If either LAN discovery or local(or both) is disabled, routing rule should bellow LAN blocking rules. + def check_rules_routing() -> (bool, str): + rules = sh.sudo.iptables("-S", "FORWARD") + + routing_rule = f"-A FORWARD -s {peer_ip}/32 -m comment --comment nordvpn-exitnode-transient -j ACCEPT" + routing_rule_idx = rules.find(routing_rule) + if routing_rule_idx == -1: + return (False, f"Routing rule not found\nrules:\n{rules}") + + for lan in meshnet.LANS: + lan_drop_rule = f"-A FORWARD -s 100.64.0.0/10 -d {lan} -m comment --comment nordvpn-exitnode-transient -j DROP" + lan_drop_rule_idx = rules.find(lan_drop_rule) + if lan_drop_rule_idx == -1: + return (False, f"LAN drop rule not found for subnet {lan}\nrules:\n{rules}") + + if local and lan_discovery: + if lan_drop_rule_idx < routing_rule_idx: + return (False, f"Routing rule was added after LAN block rule for subnet {lan}\nrules:\n{rules}") + else: + if lan_drop_rule_idx > routing_rule_idx: + return (False, f"Routing rule was added before LAN block rule for subnet {lan}\nrules:\n{rules}") + + return (True, "") + + sh.nordvpn.connect() + with lib.Defer(sh.nordvpn.disconnect): + for (result, message) in lib.poll(check_rules_routing): + if result: + break + assert result, message + + +def test_remove_peer_firewall_update(): + peer_ip = meshnet.get_this_device_ipv4(ssh_client.exec_command("nordvpn mesh peer list")) + set_permissions(peer_ip, True, True, True, True) + + sh.nordvpn.mesh.peer.remove(peer_ip) + sh.nordvpn.mesh.peer.refresh() + + def all_peer_permissions_removed() -> (bool, str): + rules = sh.sudo.iptables("-S") + if peer_ip not in rules: + return (True, "") + return (False, f"Rules for peer were not removed from firewall\nPeer IP: {peer_ip}\nrules:\n{rules}") + + for (result, message) in lib.poll(all_peer_permissions_removed): + if result: + break + assert result, message \ No newline at end of file