From 4bda76aec7b5eaf6f6cd240df473410b8478e11e Mon Sep 17 00:00:00 2001 From: mamullen13316 Date: Mon, 26 Aug 2024 11:11:56 -0400 Subject: [PATCH] feat: Updated firewall rule functionality (#78) --- pyproject.toml | 2 +- sophosfirewall_python/firewallapi.py | 29 ++++- sophosfirewall_python/firewallrule.py | 109 +++++++++++++++++- .../templates/createfwrule.j2 | 17 ++- .../templates/updatefwrule.j2 | 61 ++++++++++ sophosfirewall_python/tests/functional.py | 13 +++ 6 files changed, 224 insertions(+), 7 deletions(-) create mode 100644 sophosfirewall_python/templates/updatefwrule.j2 diff --git a/pyproject.toml b/pyproject.toml index 82ad60e..7657d2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "sophosfirewall-python" packages = [ { include = "sophosfirewall_python" }, ] -version = "0.1.47" +version = "0.1.48" description = "Python SDK for Sophos Firewall" authors = ["Matt Mullen "] readme = "README.md" diff --git a/sophosfirewall_python/firewallapi.py b/sophosfirewall_python/firewallapi.py index faa2205..62bbceb 100644 --- a/sophosfirewall_python/firewallapi.py +++ b/sophosfirewall_python/firewallapi.py @@ -450,7 +450,10 @@ def create_rule(self, rule_params: dict, debug: bool = False): Keyword Args: rulename(str): Name of the firewall rule - after_rulename(str): Name of the rule to insert this rule after + status(str): Enable/Disable + position(str): Where the rule should be positioned (top/bottom/after/before) + after_rulename(str, optional): Name of the rule to insert this rule after if position = after + before_rulename(str, optional): Name of the rule to insert this rule before if position = before action(str): Accept, Drop, Reject description(str): Rule description log(str): Enable, Disable @@ -892,6 +895,30 @@ def update_acl_rule( } return AclRule(self.client).update(**params) + def update_rule(self, name: str, rule_params: dict, debug: bool = False): + """Update a firewall rule + + Args: + name(str): Name of the firewall rule to be updated. + rule_params (dict): Configuration parmeters for the rule, see Keyword Args for supported parameters. + + Keyword Args: + position(str): Where the rule should be positioned (top/bottom/after/before) + after_rulename(str): Name of the rule to insert this rule after if position = after + before_rulename(str): Name of the rule to insert this rule before if position = before + action(str): Accept, Drop, Reject + description(str): Rule description + log(str): Enable, Disable + src_zones(list): Name(s) of the source zone(s) + dst_zones(list): Name(s) of the destination zone(s) + src_networks(list): Name(s) of the source network(s) + dst_networks(list): Name(s) of the destination network(s) + service_list(list): Name(s) of service(s) + Returns: + dict: XML response converted to Python dictionary + """ + return FirewallRule(self.client).update(name, rule_params, debug) + # Export the error classes for backward compatibility __all__ = [ "SophosFirewall", diff --git a/sophosfirewall_python/firewallrule.py b/sophosfirewall_python/firewallrule.py index ef1b544..9c99d44 100644 --- a/sophosfirewall_python/firewallrule.py +++ b/sophosfirewall_python/firewallrule.py @@ -6,7 +6,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - +from sophosfirewall_python.utils import Utils class FirewallRule: """Class for working with firewall rule(s).""" @@ -14,7 +14,7 @@ class FirewallRule: def __init__(self, api_client): self.client = api_client - def get(self, name, operator): + def get(self, name, operator="="): """Get firewall rule(s) Args: @@ -35,7 +35,10 @@ def create(self, rule_params, debug): Keyword Args: rulename(str): Name of the firewall rule - after_rulename(str): Name of the rule to insert this rule after + status(str): Enable/Disable + position(str): Where the rule should be positioned (top/bottom/after/before) + after_rulename(str): Name of the rule to insert this rule after if position = after + before_rulename(str): Name of the rule to insert this rule before if position = before action(str): Accept, Drop, Reject description(str): Rule description log(str): Enable, Disable @@ -51,3 +54,103 @@ def create(self, rule_params, debug): "createfwrule.j2", template_vars=rule_params, debug=debug ) return resp + + def update(self, name, rule_params, debug): + """Update a firewall rule. + + Args: + name(str): Name of the firewall rule to be updated + rule_params (dict): Configuration parmeters for the rule, see Keyword Args for supported parameters. + + Keyword Args: + status(str): Enable/Disable + position(str): Where the rule should be positioned (top/bottom/after/before) + after_rulename(str, optional): Name of the rule to insert this rule after if position = after + before_rulename(str, optional): Name of the rule to insert this rule before if position = before + action(str): Accept, Drop, Reject + description(str): Rule description + log(str): Enable, Disable + src_zones(list): Name(s) of the source zone(s) + dst_zones(list): Name(s) of the destination zone(s) + src_networks(list): Name(s) of the source network(s) + dst_networks(list): Name(s) of the destination network(s) + service_list(list): Name(s) of service(s) + Returns: + dict: XML response converted to Python dictionary + """ + updated_rule_params = dict(rulename=name) + + # Get the existing rule + exist_rule = self.get(name=name)["Response"]["FirewallRule"] + + if rule_params.get("action"): + updated_rule_params["action"] = rule_params.get("action") + else: + updated_rule_params["action"] = exist_rule["NetworkPolicy"]["Action"] + + if rule_params.get("description"): + updated_rule_params["description"] = rule_params.get("description") + else: + updated_rule_params["description"] = exist_rule["Description"] + + if rule_params.get("status"): + updated_rule_params["status"] = rule_params.get("status") + + if rule_params.get("position"): + updated_rule_params["position"] = rule_params.get("position") + + if rule_params.get("after_rulename"): + updated_rule_params["after_rulename"] = rule_params.get("after_rulename") + + if rule_params.get("before_rulename"): + updated_rule_params["before_rulename"] = rule_params.get("before_rulename") + + if rule_params.get("log"): + updated_rule_params["log"] = rule_params.get("log") + else: + updated_rule_params["log"] = exist_rule["NetworkPolicy"]["LogTraffic"] + + if rule_params.get("src_zones"): + updated_rule_params["src_zones"] = rule_params.get("src_zones") + else: + if "SourceZones" in exist_rule["NetworkPolicy"]: + updated_rule_params["src_zones"] = Utils.ensure_list(exist_rule["NetworkPolicy"]["SourceZones"]["Zone"]) + else: + updated_rule_params["src_zones"] = None + + if rule_params.get("dst_zones"): + updated_rule_params["dst_zones"] = rule_params.get("dst_zones") + else: + if "DestinationZones" in exist_rule["NetworkPolicy"]: + updated_rule_params["dst_zones"] = Utils.ensure_list(exist_rule["NetworkPolicy"]["DestinationZones"]["Zone"]) + else: + updated_rule_params["dst_zones"] = None + + if rule_params.get("src_networks"): + updated_rule_params["src_networks"] = rule_params.get("src_networks") + else: + if "SourceNetworks" in exist_rule["NetworkPolicy"]: + updated_rule_params["src_networks"] = Utils.ensure_list(exist_rule["NetworkPolicy"]["SourceNetworks"]["Network"]) + else: + updated_rule_params["src_networks"] = None + + if rule_params.get("dst_networks"): + updated_rule_params["dst_networks"] = rule_params.get("dst_networks") + else: + if "DestinationNetworks" in exist_rule["NetworkPolicy"]: + updated_rule_params["dst_networks"] = Utils.ensure_list(exist_rule["NetworkPolicy"]["DestinationNetworks"]["Network"]) + else: + updated_rule_params["dst_networks"] = None + + if rule_params.get("service_list"): + updated_rule_params["service_list"] = rule_params.get("service_list") + else: + if "Services" in exist_rule["NetworkPolicy"]: + updated_rule_params["service_list"] = Utils.ensure_list(exist_rule["NetworkPolicy"]["Services"]["Service"]) + else: + updated_rule_params["service_list"] = None + + resp = self.client.submit_template( + "updatefwrule.j2", template_vars=updated_rule_params, debug=debug + ) + return resp \ No newline at end of file diff --git a/sophosfirewall_python/templates/createfwrule.j2 b/sophosfirewall_python/templates/createfwrule.j2 index 3c8016c..bfac85e 100644 --- a/sophosfirewall_python/templates/createfwrule.j2 +++ b/sophosfirewall_python/templates/createfwrule.j2 @@ -8,26 +8,39 @@ {{ rulename }} {{ description if description else '' }} IPv4 - Enable - After + {{ status }} + {{ position }} Network + {% if position == 'After' %} {{ after_rulename }} + {% endif %} + {% if position == 'Before' %} + + {{ before_rulename }} + + {% endif %} {{ action }} + {% if log %} {{ log }} + {% endif %} Disable + {% if src_zones %} {% for zone in src_zones %} {{ zone }} {% endfor %} + {% endif %} + {% if dst_zones %} {% for zone in dst_zones %} {{ zone }} {% endfor %} + {% endif %} All The Time {% for network in src_networks %} diff --git a/sophosfirewall_python/templates/updatefwrule.j2 b/sophosfirewall_python/templates/updatefwrule.j2 new file mode 100644 index 0000000..5c18891 --- /dev/null +++ b/sophosfirewall_python/templates/updatefwrule.j2 @@ -0,0 +1,61 @@ + + + {{username}} + {{password}} + + + + {{ rulename }} + {{ description if description else '' }} + IPv4 + {{ status }} + {{ position }} + Network + {% if position == 'After' %} + + {{ after_rulename }} + + {% endif %} + {% if position == 'Before' %} + + {{ before_rulename }} + + {% endif %} + + {{ action }} + {{ log }} + Disable + {% if src_zones %} + + {% for zone in src_zones %} + {{ zone }} + {% endfor %} + + {% endif %} + {% if dst_zones %} + + {% for zone in dst_zones %} + {{ zone }} + {% endfor %} + + {% endif %} + All The Time + + {% for network in src_networks %} + {{ network }} + {% endfor %} + + + {% for network in dst_networks %} + {{ network }} + {% endfor %} + + + {% for service in service_list %} + {{ service }} + {% endfor %} + + + + + \ No newline at end of file diff --git a/sophosfirewall_python/tests/functional.py b/sophosfirewall_python/tests/functional.py index 225c1dd..d7038dd 100644 --- a/sophosfirewall_python/tests/functional.py +++ b/sophosfirewall_python/tests/functional.py @@ -255,6 +255,19 @@ def test_create_rule(setup): response = setup.create_rule(rule_params=rule_params) assert response["Response"]["FirewallRule"]["Status"] == expected_result +def test_update_rule(setup): + """Test update_rule method.""" + + update_result = { + "@code": "200", + "#text": "Configuration applied successfully.", + } + + response = setup.update_rule(name="FUNC_TESTRULE1", rule_params={"action": "Drop"}) + assert response["Response"]["FirewallRule"]["Status"] == update_result + + response = setup.get_rule(name="FUNC_TESTRULE1") + assert response["Response"]["FirewallRule"]["NetworkPolicy"]["Action"] == "Drop" def test_create_urlgroup(setup): """Test create_urlgroup method."""