Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rafactoring ACL attributes #464

Open
wants to merge 31 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 145 additions & 75 deletions netutils/acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import itertools
import copy
import types
import typing as t
from netutils.protocol_mapper import PROTO_NAME_TO_NUM, TCP_NAME_TO_NUM, UDP_NAME_TO_NUM
from netutils.ip import is_ip_within
Expand All @@ -16,8 +17,8 @@
INPUT_SCHEMA = {
"type": "object",
"properties": {
"name": {"type": "string"},
"src_zone": {"type": ["string", "array"]},
"name": {"type": ["string", "null"]},
"src_zone": {"type": ["string", "array", "null"]},
"src_ip": {"$ref": "#/definitions/arrayOrIP"},
"dst_ip": {"$ref": "#/definitions/arrayOrIP"},
"dst_port": {
Expand All @@ -35,8 +36,9 @@
},
],
},
"dst_zone": {"type": ["string", "array"]},
"dst_zone": {"type": ["string", "array", "null"]},
"action": {"type": "string"},
"protocol": {"type": ["string", "null"]},
},
"definitions": {
"ipv4": {"type": "string", "pattern": "^(?:\\d{1,3}\\.){3}\\d{1,3}$"},
Expand Down Expand Up @@ -130,6 +132,8 @@ def _cartesian_product(data: t.Dict[str, str]) -> t.List[t.Dict[str, t.Any]]:
keys.append(key)
if isinstance(value, (str, int)):
values.append([value])
elif value is None:
values.append([None])
else:
values.append(value)
product = list(itertools.product(*values))
Expand All @@ -147,30 +151,54 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None:
raise ValueError()


def get_attributes(obj):
"""Function that describes class attributes."""
result = {
# name for name in dir(cls)
attr: getattr(obj, attr)
for attr in dir(obj)
if not attr.startswith("_")
and not callable(getattr(obj, attr))
and not isinstance(getattr(obj, attr), types.FunctionType)
}
return result


class ACLRule:
"""A class that helps you imagine an acl rule via methodologies."""

attrs: t.List[str] = ["name", "src_ip", "src_zone", "dst_ip", "dst_port", "dst_zone", "action"]
permit: str = "permit"
deny: str = "deny"
name: t.Any = None
src_ip: t.Any = None
src_zone: t.Any = None
dst_ip: t.Any = None
dst_port: t.Any = None
dst_zone: t.Any = None
protocol: t.Any = None
action: t.Any = None

input_data_verify: bool = False
input_data_schema: t.Any = INPUT_SCHEMA
class Meta: # pylint: disable=too-few-public-methods
"""Default meta class."""

result_data_verify: bool = False
result_data_schema: t.Any = RESULT_SCHEMA
permit: str = "permit"
deny: str = "deny"

matrix: t.Any = {}
matrix_enforced: bool = False
matrix_definition: t.Any = {}
input_data_verify: bool = False
input_data_schema: t.Any = INPUT_SCHEMA

dst_port_process: bool = True
result_data_verify: bool = False
result_data_schema: t.Any = RESULT_SCHEMA

order_validate: t.List[str] = []
order_enforce: t.List[str] = []
filter_same_ip: bool = True
matrix: t.Any = {}
matrix_enforced: bool = False
matrix_definition: t.Any = {}

def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument
dst_port_process: bool = False

order_validate: t.List[str] = []
order_enforce: t.List[str] = []
filter_same_ip: bool = True

def __init__(self, **kwargs): # pylint: disable=unused-argument
"""Initialize and load data.

Args:
Expand All @@ -179,55 +207,80 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab
Examples:
>>> from netutils.acl import ACLRule
>>>
>>> acl_data = dict(
>>> rule = ACLRule(
... name="Check no match",
... src_ip=["10.1.1.1"],
... dst_ip="172.16.0.10",
... dst_port="tcp/www-http",
... action="permit",
... )
>>>
>>> rule = ACLRule(acl_data)
>>>
>>> rule.expanded_rules
>>> rule._expanded_rules
[{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}]
>>>
"""
self.processed: t.Dict[str, str] = {}
self.data = data
self.load_data()
self.__load_data(kwargs=kwargs)

def load_data(self) -> None:
def __load_data(self, kwargs) -> None:
"""Load the data into the rule while verifying input data, result data, and processing data."""
# Remaining kwargs stored under ACLRule.Meta
pop_kwargs = []
for key, val in kwargs.items():
if key not in get_attributes(self):
setattr(self.Meta, key, val)
pop_kwargs.append(key)

# Pop unneeded keys
for key in pop_kwargs:
kwargs.pop(key)

# Ensure each class attr is in init kwargs.
for attr in get_attributes(self):
if attr not in kwargs:
kwargs[attr] = getattr(self, attr)

# Store the init input
self._preprocessed_data = copy.deepcopy(kwargs)
self._processed_data = copy.deepcopy(self._preprocessed_data)

# Input check
self.input_data_check()
for attr in self.attrs:
if not self.data.get(attr):
continue
if hasattr(self, f"process_{attr}"):
proccessor = getattr(self, f"process_{attr}")
_attr_data = proccessor(self.data[attr])

for attr in get_attributes(self):
processor_func = getattr(self, f"process_{attr}", None)
if processor_func:
_attr_data = processor_func(self._processed_data[attr])
else:
_attr_data = self.data[attr]
self.processed[attr] = _attr_data
_attr_data = self._processed_data[attr]

self._processed_data[attr] = _attr_data
setattr(self, attr, _attr_data)

self.result_data_check()
self.validate()
self.expanded_rules = _cartesian_product(self.processed)
if self.filter_same_ip:
self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]]

@property
def _expanded_rules(self):
"""Expanded rule property."""
_expanded_rules = _cartesian_product(self._processed_data)
if self.Meta.filter_same_ip:
_expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]]

return _expanded_rules

def input_data_check(self) -> None:
"""Verify the input data against the specified JSONSchema or using a simple dictionary check."""
return _check_schema(self.data, self.input_data_schema, self.input_data_verify)
return _check_schema(self._preprocessed_data, self.Meta.input_data_schema, self.Meta.input_data_verify)

def result_data_check(self) -> None:
"""Verify the result data against the specified JSONSchema or using a simple dictionary check."""
return _check_schema(self.processed, self.result_data_schema, self.result_data_verify)
return _check_schema(self._processed_data, self.Meta.result_data_schema, self.Meta.result_data_verify)

def validate(self) -> t.Any:
"""Run through any method that startswith('validate_') and run that method."""
if self.order_validate:
method_order = self.order_validate
if self.Meta.order_validate:
method_order = self.Meta.order_validate
else:
method_order = dir(self)
results = []
Expand Down Expand Up @@ -255,8 +308,8 @@ def process_dst_port(
https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv.
"""
output = []
if not self.dst_port_process:
return None
if not self.Meta.dst_port_process:
return self.dst_port
if not isinstance(dst_port, list):
dst_port = [dst_port]
for item in dst_port:
Expand Down Expand Up @@ -286,8 +339,8 @@ def enforce(self) -> t.List[t.Dict[str, t.Any]]:
Returns:
A list of dictionaries that explains the results of the enforcement.
"""
if self.order_enforce:
method_order = self.order_enforce
if self.Meta.order_enforce:
method_order = self.Meta.order_enforce
else:
method_order = dir(self)
results = []
Expand All @@ -308,28 +361,28 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]:
Returns:
A list of dictionaries that explains the results of the matrix being enforced.
"""
if not self.matrix_enforced:
if not self.Meta.matrix_enforced:
return None
if not self.matrix:
if not self.Meta.matrix:
raise ValueError("You must set a matrix dictionary to use the matrix feature.")
if not self.matrix_definition:
if not self.Meta.matrix_definition:
raise ValueError("You must set a matrix definition dictionary to use the matrix feature.")
actions = []
for rule in self.expanded_rules:
for rule in self._expanded_rules:
source = rule["src_ip"]
destination = rule["dst_ip"]
port = rule["dst_port"]
src_zone = ""
dst_zone = ""
as_tuple = (source, destination, port)
for zone, ips in self.matrix_definition.items():
for zone, ips in self.Meta.matrix_definition.items():
if is_ip_within(source, ips):
src_zone = zone
if is_ip_within(destination, ips):
dst_zone = zone
if port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []):
if port in self.Meta.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []):
actions.append({"obj": as_tuple, "action": "allow"})
elif port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []):
elif port in self.Meta.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []):
actions.append({"obj": as_tuple, "action": "notify"})
else:
actions.append({"obj": as_tuple, "action": "deny"})
Expand Down Expand Up @@ -357,7 +410,10 @@ def match_src_ip(self, existing_ip: str, check_ip: str) -> bool:
Returns:
True if `check_ip` is within the range of `existing_ip`, False otherwise.
"""
return is_ip_within(check_ip, existing_ip)
if existing_ip == check_ip: # None cases
return True
else:
is_ip_within(check_ip, existing_ip)

def match_src_zone(self, existing_src_zone: str, check_src_zone: str) -> bool:
"""Match the source zone for equality.
Expand All @@ -381,7 +437,10 @@ def match_dst_ip(self, existing_ip: str, check_ip: str) -> bool:
Returns:
True if `check_ip` is within the range of `existing_ip`, False otherwise.
"""
return is_ip_within(check_ip, existing_ip)
if existing_ip == check_ip: # None cases
return True
else:
return is_ip_within(check_ip, existing_ip)

def match_dst_zone(self, existing_dst_zone: str, check_dst_zone: str) -> bool:
"""Match the destination zone for equality.
Expand Down Expand Up @@ -429,11 +488,14 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint:
rules_unmatched: t.List[t.Dict[str, t.Any]] = []
rules_matched: t.List[t.Dict[str, t.Any]] = []

if not match_rule.expanded_rules:
if not match_rule._expanded_rules: # pylint: disable=protected-access
raise ValueError("There is no expanded rules to test against.")
for rule in match_rule.expanded_rules:
elif not self._expanded_rules: # pylint: disable=protected-access
raise ValueError("There is no expanded rules to test.")

for rule in match_rule._expanded_rules: # pylint: disable=protected-access
rules_found.append(False)
for existing_rule in self.expanded_rules:
for existing_rule in self._expanded_rules:
missing = False
for attr in attrs:
# Examples of obj are match_rule.src_ip, match_rule.dst_port
Expand All @@ -452,8 +514,8 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint:
break
detailed_info = {
"existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable
"match_rule": match_rule.processed,
"existing_rule": self.processed,
"match_rule": match_rule._processed_data, # pylint: disable=protected-access
"existing_rule": self._processed_data,
}
if rules_found[-1]:
detailed_info["match_rule_product"] = rule
Expand All @@ -474,34 +536,42 @@ def match(self, match_rule: "ACLRule") -> bool:
details = self.match_details(match_rule)
return not bool(details["rules_unmatched"])

def __repr__(self) -> str:
"""Set repr of the object to be sane."""
output = []
for attr in self.attrs:
if self.processed.get(attr):
output.append(f"{attr}: {self.processed[attr]}")
return ", ".join(output)
# def __repr__(self) -> str:
# """Set repr of the object to be sane."""
# return {k: v for k, v in self.__dict__.items() if not k.startswith("_")}

def serialize(self) -> str:
"""Primitive Serializer."""
return {k: v for k, v in self.__dict__.items() if not k.startswith("_")}


class ACLRules:
"""Class to help match multiple ACLRule objects."""

class_obj = ACLRule
rules: t.List[t.Any] = []

class Meta: # pylint: disable=too-few-public-methods
"""Default meta class."""

class_obj = ACLRule

def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument
"""Class to help match multiple ACLRule.

Args:
data: A list of `ACLRule` rules.
"""
self.data: t.Any = data
self.rules: t.List[t.Any] = []
self.load_data()
self.load_data(data=data)

def load_data(self) -> None:
def load_data(self, data) -> None:
"""Load the data for multiple rules."""
for item in self.data:
self.rules.append(self.class_obj(item))
for item in data:
self.rules.append(self.Meta.class_obj(**item))

def serialize(self) -> str:
"""Primitive Serializer."""
return [rule.serialize() for rule in self.rules]

def match(self, rule: ACLRule) -> str:
"""Check the rules loaded in `load_data` match against a new `rule`.
Expand All @@ -513,9 +583,9 @@ def match(self, rule: ACLRule) -> str:
The response from the rule that matched, or `deny` by default.
"""
for item in self.rules:
if item.match(self.class_obj(rule)):
return str(item.action)
return str(item.deny) # pylint: disable=undefined-loop-variable
if item.match(rule): # mzb: bugfix
return True # mzb: change to bool
return False # pylint: disable=undefined-loop-variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this as string as most vendors have more than just permit or deny, this is Palo's

allow
deny
drop
reset client
reset server
reset client and server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO match method should return only True/False if the rule is/not matched by the check. This should only check if A contains B.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable to have different levels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, I see this in a little different way :

def match_action(action, match_action):
    deny_actions = ['deny', 'drop', 'reset client', 'reset server', ...]
    if action in deny_actions and match_action in deny_actions:
        return True
    else:
        return False

I would really like to avoid putting opinionated logic into generic match resolution while we provide easily extensible framework


def match_details(self, rule: ACLRule) -> t.Any:
"""Verbosely check the rules loaded in `load_data` match against a new `rule`.
Expand All @@ -528,5 +598,5 @@ def match_details(self, rule: ACLRule) -> t.Any:
"""
output = []
for item in self.rules:
output.append(item.match_details(self.class_obj(rule)))
output.append(item.match_details(rule)) # mzb: bugfix
return output
Loading
Loading