diff --git a/README.md b/README.md index 8964887..29c768a 100755 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ To run the schema validations, the command `schema-enforcer validate` can be run ```shell bash$ schema-enforcer validate -schema-enforcer validate +schema-enforcer validate ALL SCHEMA VALIDATION CHECKS PASSED ``` @@ -140,14 +140,14 @@ If we modify one of the addresses in the `chi-beijing-rt1/dns.yml` file so that ```yaml bash$ cat chi-beijing-rt1/dns.yml -# jsonschema: schemas/dns_servers +# jsonschema: schemas/dns_servers --- dns_servers: - address: true - address: "10.2.2.2" ``` ```shell -bash$ test-schema validate +bash$ test-schema validate FAIL | [ERROR] True is not of type 'string' [FILE] ./chi-beijing-rt1/dns.yml [PROPERTY] dns_servers:0:address bash$ echo $? 1 @@ -160,7 +160,7 @@ When a structured data file fails schema validation, `schema-enforcer` exits wit Schema enforcer will work with default settings, however, a `pyproject.toml` file can be placed at the root of the path in which `schema-enforcer` is run in order to override default settings or declare configuration for more advanced features. Inside of this `pyproject.toml` file, `tool.schema_enfocer` sections can be used to declare settings for schema enforcer. Take for example the `pyproject.toml` file in example 2. ```shell -bash$ cd examples/example2 && tree -L 2 +bash$ cd examples/example2 && tree -L 2 . ├── README.md ├── hostvars @@ -198,3 +198,4 @@ Detailed documentation can be found in the README.md files inside of the `docs/` - [The `validate` command](docs/validate_command.md) - [Mapping Structured Data Files to Schema Files](docs/mapping_schemas.md) - [The `schema` command](docs/schema_command.md) +- [Implementing custom validators](docs/custom_validators.md) diff --git a/docs/custom_validators.md b/docs/custom_validators.md new file mode 100644 index 0000000..96c3900 --- /dev/null +++ b/docs/custom_validators.md @@ -0,0 +1,158 @@ +# Implementing custom validators + +With custom validators, you can implement business logic in Python. Schema-enforcer will automatically +load your plugins from the `validator_directory` and run them against your host data. + +The validator plugin provides two base classes: ModelValidation and JmesPathModelValidation. The former can be used +when you want to implement all logic and the latter can be used as a shortcut for jmespath validation. + +## BaseValidation + +Use this class to implement arbitrary validation logic in Python. In order to work correctly, your Python script must meet +the following criteria: + +1. Exist in the `validator_directory` dir. +2. Include a subclass of the BaseValidation class to correctly register with schema-enforcer. +3. Ensure you call `super().__init__()` in your class `__init__` if you override. +4. Provide a class method in your subclass with the following signature: +`def validate(data: dict, strict: bool):` + + * Data is a dictionary of variables on a per-host basis. + * Strict is set to true when the strict flag is set via the CLI. You can use this to offer strict validation behavior + or ignore it if not needed. + +The name of your class will be used as the schema-id for mapping purposes. You can override the default schema ID +by providing a class-level `id` variable. + +Helper functions are provided to add pass/fail results: + +``` +def add_validation_error(self, message: str, **kwargs): + """Add validator error to results. + Args: + message (str): error message + kwargs (optional): additional arguments to add to ValidationResult when required + """ + +def add_validation_pass(self, **kwargs): + """Add validator pass to results. + Args: + kwargs (optional): additional arguments to add to ValidationResult when required + """ +``` +In most cases, you will not need to provide kwargs. However, if you find a use case that requires updating other fields +in the ValidationResult, you can send the key/value pairs to update the result directly. This is for advanced users only. + +## JmesPathModelValidation + +Use this class for basic validation using [jmespath](https://jmespath.org/) expressions to query specific values in your data. In order to work correctly, your Python script must meet +the following criteria: + +1. Exist in the `validator_directory` dir. +2. Include a subclass of the JmesPathModelValidation class to correctly register with schema-enforcer. +3. Provide the following class level variables: + + * `top_level_properties`: Field for mapping of validator to data + * `id`: Schema ID to use for reporting purposes (optional - defaults to class name) + * `left`: Jmespath expression to query your host data + * `right`: Value or a compiled jmespath expression + * `operator`: Operator to use for comparison between left and right hand side of expression + * `error`: Message to report when validation fails + +### Supported operators: + +The class provides the following operators for basic use cases: + +``` +"gt": int(left) > int(right), +"gte": int(left) >= int(right), +"eq": left == right, +"lt": int(left) < int(right), +"lte": int(left) <= int(right), +"contains": right in left, +``` + +If you require additional logic or need to compare other types, use the BaseValidation class and create your own validate method. + +### Examples: + +#### Basic +``` +from schema_enforcer.schemas.validator import JmesPathModelValidation + +class CheckInterface(JmesPathModelValidation): # pylint: disable=too-few-public-methods + top_level_properties = ["interfaces"] + id = "CheckInterface" # pylint: disable=invalid-name + left = "interfaces.*[@.type=='core'][] | length([?@])" + right = 2 + operator = "gte" + error = "Less than two core interfaces" +``` + +#### With compiled jmespath expression +``` +import jmespath +from schema_enforcer.schemas.validator import JmesPathModelValidation + + +class CheckInterfaceIPv4(JmesPathModelValidation): # pylint: disable=too-few-public-methods + top_level_properties = ["interfaces"] + id = "CheckInterfaceIPv4" # pylint: disable=invalid-name + left = "interfaces.*[@.type=='core'][] | length([?@])" + right = jmespath.compile("interfaces.* | length([?@.type=='core'][].ipv4)") + operator = "eq" + error = "All core interfaces do not have IPv4 addresses" +``` + +## Running validators + +Custom validators are run with `schema-enforcer validate` and `schema-enforcer ansible` commands. + +You map validators to keys in your data with `top_level_properties` in your subclass or with `schema_enforcer_schema_ids` +in your data. Schema-enforcer uses the same process to map custom validators and schemas. Refer to the "Mapping Schemas" documentation +for more details. + +### Example - top_level_properties + +The CheckInterface validator has a top_level_properties of "interfaces": + +``` +class CheckInterface(JmesPathModelValidation): # pylint: disable=too-few-public-methods + top_level_properties = ["interfaces"] +``` + +With automapping enabled, this validator will apply to any host with a top-level `interfaces` key in the Ansible host_vars data: + +``` +--- +hostname: "az-phx-pe01" +pair_rtr: "az-phx-pe02" +interfaces: + MgmtEth0/0/CPU0/0: + ipv4: "172.16.1.1" + Loopback0: + ipv4: "192.168.1.1" + ipv6: "2001:db8:1::1" + GigabitEthernet0/0/0/0: + ipv4: "10.1.0.1" + ipv6: "2001:db8::" + peer: "az-phx-pe02" + peer_int: "GigabitEthernet0/0/0/0" + type: "core" + GigabitEthernet0/0/0/1: + ipv4: "10.1.0.37" + ipv6: "2001:db8::12" + peer: "co-den-p01" + peer_int: "GigabitEthernet0/0/0/2" + type: "core" +``` + +### Example - manual mapping + +Alternatively, you can manually map a validator in your Ansible host vars or other data files. + +``` +schema_enforcer_automap_default: false +schema_enforcer_schema_ids: + - "CheckInterface" +``` diff --git a/examples/ansible3/host_vars/az_phx_pe01/base.yml b/examples/ansible3/host_vars/az_phx_pe01/base.yml new file mode 100644 index 0000000..e8adf5e --- /dev/null +++ b/examples/ansible3/host_vars/az_phx_pe01/base.yml @@ -0,0 +1,22 @@ +--- +hostname: "az-phx-pe01" +pair_rtr: "az-phx-pe02" +upstreams: [] +interfaces: + MgmtEth0/0/CPU0/0: + ipv4: "172.16.1.1" + Loopback0: + ipv4: "192.168.1.1" + ipv6: "2001:db8:1::1" + GigabitEthernet0/0/0/0: + ipv4: "10.1.0.1" + ipv6: "2001:db8::" + peer: "az-phx-pe02" + peer_int: "GigabitEthernet0/0/0/0" + type: "core" + GigabitEthernet0/0/0/1: + ipv4: "10.1.0.37" + ipv6: "2001:db8::12" + peer: "co-den-p01" + peer_int: "GigabitEthernet0/0/0/2" + type: "core" diff --git a/examples/ansible3/host_vars/az_phx_pe02/base.yml b/examples/ansible3/host_vars/az_phx_pe02/base.yml new file mode 100644 index 0000000..3cfbd09 --- /dev/null +++ b/examples/ansible3/host_vars/az_phx_pe02/base.yml @@ -0,0 +1,22 @@ +--- +hostname: "az-phx-pe02" +pair_rtr: "az-phx-pe01" +upstreams: [] +interfaces: + MgmtEth0/0/CPU0/0: + ipv4: "172.16.1.2" + Loopback0: + ipv4: "192.168.1.2" + ipv6: "2001:db8:1::2" + GigabitEthernet0/0/0/0: + ipv4: "10.1.0.2" + ipv6: "2001:db8::1" + peer: "az-phx-pe01" + peer_int: "GigabitEthernet0/0/0/0" + type: "core" + GigabitEthernet0/0/0/1: + ipv4: "10.1.0.41" + ipv6: "2001:db8::14" + peer: "co-den-p02" + peer_int: "GigabitEthernet0/0/0/2" + type: "access" diff --git a/examples/ansible3/inventory.yml b/examples/ansible3/inventory.yml new file mode 100644 index 0000000..072655b --- /dev/null +++ b/examples/ansible3/inventory.yml @@ -0,0 +1,15 @@ +--- +all: + vars: + ansible_network_os: "iosxr" + ansible_user: "cisco" + ansible_password: "cisco" + ansible_connection: "netconf" + ansible_netconf_ssh_config: true + children: + pe_rtrs: + hosts: + az_phx_pe01: + ansible_host: "172.16.1.1" + az_phx_pe02: + ansible_host: "172.16.1.2" diff --git a/examples/ansible3/pyproject.toml b/examples/ansible3/pyproject.toml new file mode 100644 index 0000000..b4bd005 --- /dev/null +++ b/examples/ansible3/pyproject.toml @@ -0,0 +1,2 @@ +[tool.schema_enforcer] +ansible_inventory = "inventory.yml" \ No newline at end of file diff --git a/examples/ansible3/validators/check_interfaces.py b/examples/ansible3/validators/check_interfaces.py new file mode 100644 index 0000000..2c69fbf --- /dev/null +++ b/examples/ansible3/validators/check_interfaces.py @@ -0,0 +1,13 @@ +"""Example validator plugin.""" +from schema_enforcer.schemas.validator import JmesPathModelValidation + + +class CheckInterface(JmesPathModelValidation): # pylint: disable=too-few-public-methods + """Check that each device has more than one core uplink.""" + + top_level_properties = ["interfaces"] + id = "CheckInterface" # pylint: disable=invalid-name + left = "interfaces.*[@.type=='core'][] | length([?@])" + right = 2 + operator = "gte" + error = "Less than two core interfaces" diff --git a/poetry.lock b/poetry.lock index 6962904..157e3a2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -281,6 +281,14 @@ MarkupSafe = ">=0.23" [package.extras] i18n = ["Babel (>=0.8)"] +[[package]] +name = "jmespath" +version = "0.10.0" +description = "JSON Matching Expressions" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "jsonref" version = "0.2" @@ -712,7 +720,7 @@ testing = ["pytest (>=3.5,<3.7.3 || >3.7.3)", "pytest-checkdocs (>=1.2.3)", "pyt [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "e3b938e5ec45670a319811698f9d448d070508b91b89ac1b758a509ddc118d96" +content-hash = "8aa94287267973c7c07897ccdcc66fe93c3e433adaeb03e970b78cdfaae0ec9f" [metadata.files] ansible = [ @@ -771,11 +779,13 @@ cffi = [ {file = "cffi-1.14.4-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:a7711edca4dcef1a75257b50a2fbfe92a65187c47dab5a0f1b9b332c5919a3fb"}, {file = "cffi-1.14.4-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:00e28066507bfc3fe865a31f325c8391a1ac2916219340f87dfad602c3e48e5d"}, {file = "cffi-1.14.4-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:798caa2a2384b1cbe8a2a139d80734c9db54f9cc155c99d7cc92441a23871c03"}, + {file = "cffi-1.14.4-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:a5ed8c05548b54b998b9498753fb9cadbfd92ee88e884641377d8a8b291bcc01"}, {file = "cffi-1.14.4-cp37-cp37m-win32.whl", hash = "sha256:00a1ba5e2e95684448de9b89888ccd02c98d512064b4cb987d48f4b40aa0421e"}, {file = "cffi-1.14.4-cp37-cp37m-win_amd64.whl", hash = "sha256:9cc46bc107224ff5b6d04369e7c595acb700c3613ad7bcf2e2012f62ece80c35"}, {file = "cffi-1.14.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:df5169c4396adc04f9b0a05f13c074df878b6052430e03f50e68adf3a57aa28d"}, {file = "cffi-1.14.4-cp38-cp38-manylinux1_i686.whl", hash = "sha256:9ffb888f19d54a4d4dfd4b3f29bc2c16aa4972f1c2ab9c4ab09b8ab8685b9c2b"}, {file = "cffi-1.14.4-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:8d6603078baf4e11edc4168a514c5ce5b3ba6e3e9c374298cb88437957960a53"}, + {file = "cffi-1.14.4-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:d5ff0621c88ce83a28a10d2ce719b2ee85635e85c515f12bac99a95306da4b2e"}, {file = "cffi-1.14.4-cp38-cp38-win32.whl", hash = "sha256:b4e248d1087abf9f4c10f3c398896c87ce82a9856494a7155823eb45a892395d"}, {file = "cffi-1.14.4-cp38-cp38-win_amd64.whl", hash = "sha256:ec80dc47f54e6e9a78181ce05feb71a0353854cc26999db963695f950b5fb375"}, {file = "cffi-1.14.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:840793c68105fe031f34d6a086eaea153a0cd5c491cde82a74b420edd0a2b909"}, @@ -900,6 +910,10 @@ jinja2 = [ {file = "Jinja2-2.11.2-py2.py3-none-any.whl", hash = "sha256:f0a4641d3cf955324a89c04f3d94663aa4d638abe8f733ecd3582848e1c37035"}, {file = "Jinja2-2.11.2.tar.gz", hash = "sha256:89aab215427ef59c34ad58735269eb58b1a5808103067f7bb9d5836c651b3bb0"}, ] +jmespath = [ + {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"}, + {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"}, +] jsonref = [ {file = "jsonref-0.2-py3-none-any.whl", hash = "sha256:b1e82fa0b62e2c2796a13e5401fe51790b248f6d9bf9d7212a3e31a3501b291f"}, {file = "jsonref-0.2.tar.gz", hash = "sha256:f3c45b121cf6257eafabdc3a8008763aed1cd7da06dbabc59a9e4d2a5e4e6697"}, diff --git a/pyproject.toml b/pyproject.toml index d48b7cc..b5a332d 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ jsonref = "^0.2" pydantic = "^1.6.1" rich = "^9.5.1" ansible = "^2.8.0" +jmespath = "^0.10.0" [tool.poetry.dev-dependencies] pytest = "^5.4.1" @@ -75,6 +76,9 @@ notes = """, XXX, """ +[tool.pylint.SIMILARITIES] +min-similarity-lines = 15 + [tool.pytest.ini_options] testpaths = [ "tests" diff --git a/schema_enforcer/cli.py b/schema_enforcer/cli.py index 2ebbbe2..3eba5b1 100644 --- a/schema_enforcer/cli.py +++ b/schema_enforcer/cli.py @@ -303,8 +303,9 @@ def ansible( data = hostvars # Validate host vars against schema - for result in schema_obj.validate(data=data, strict=strict): + schema_obj.validate(data=data, strict=strict) + for result in schema_obj.get_results(): result.instance_type = "HOST" result.instance_hostname = host.name @@ -314,6 +315,7 @@ def ansible( elif result.passed() and show_pass: result.print() + schema_obj.clear_results() if not error_exists: print(colored("ALL SCHEMA VALIDATION CHECKS PASSED", "green")) diff --git a/schema_enforcer/config.py b/schema_enforcer/config.py index 8909241..9c98c7c 100644 --- a/schema_enforcer/config.py +++ b/schema_enforcer/config.py @@ -27,6 +27,7 @@ class Settings(BaseSettings): # pylint: disable=too-few-public-methods main_directory: str = "schema" definition_directory: str = "definitions" schema_directory: str = "schemas" + validator_directory: str = "validators" test_directory: str = "tests" # Settings specific to the schema files diff --git a/schema_enforcer/instances/file.py b/schema_enforcer/instances/file.py index 83e7185..483910a 100644 --- a/schema_enforcer/instances/file.py +++ b/schema_enforcer/instances/file.py @@ -128,6 +128,9 @@ def validate(self, schema_manager, strict=False): for schema_id, schema in schema_manager.iter_schemas(): if schema_id not in self.matches: continue - errs = itertools.chain(errs, schema.validate(self.get_content(), strict)) + schema.validate(self.get_content(), strict) + results = schema.get_results() + errs = itertools.chain(errs, results) + schema.clear_results() return errs diff --git a/schema_enforcer/schemas/jsonschema.py b/schema_enforcer/schemas/jsonschema.py index 48e94cd..227ecf0 100644 --- a/schema_enforcer/schemas/jsonschema.py +++ b/schema_enforcer/schemas/jsonschema.py @@ -3,6 +3,7 @@ import pkgutil import json from jsonschema import Draft7Validator # pylint: disable=import-self +from schema_enforcer.schemas.validator import BaseValidation from schema_enforcer.validation import ValidationResult, RESULT_FAIL, RESULT_PASS # TODO do we need to catch a possible exception here ? @@ -10,7 +11,7 @@ v7schema = json.loads(v7data.decode("utf-8")) -class JsonSchema: +class JsonSchema(BaseValidation): """class to manage jsonschema type schemas.""" schematype = "jsonchema" @@ -23,6 +24,7 @@ def __init__(self, schema, filename, root): filename (string): Name of the schema file on the filesystem. root (string): Absolute path to the directory where the schema file is located. """ + super().__init__() self.filename = filename self.root = root self.data = schema @@ -56,14 +58,11 @@ def validate(self, data, strict=False): for err in validator.iter_errors(data): has_error = True - yield ValidationResult( - schema_id=self.id, result=RESULT_FAIL, message=err.message, absolute_path=list(err.absolute_path) - ) + self.add_validation_error(err.message, absolute_path=list(err.absolute_path)) if not has_error: - yield ValidationResult( - schema_id=self.id, result=RESULT_PASS, - ) + self.add_validation_pass() + return self.get_results() def validate_to_dict(self, data, strict=False): """Return a list of ValidationResult objects. diff --git a/schema_enforcer/schemas/manager.py b/schema_enforcer/schemas/manager.py index 107b263..c44f8fa 100644 --- a/schema_enforcer/schemas/manager.py +++ b/schema_enforcer/schemas/manager.py @@ -13,6 +13,7 @@ from schema_enforcer.exceptions import SchemaNotDefined from schema_enforcer.utils import error, warn from schema_enforcer.schemas.jsonschema import JsonSchema +from schema_enforcer.schemas.validator import load_validators class SchemaManager: @@ -43,6 +44,10 @@ def __init__(self, config): schema = self.create_schema_from_file(root, filename) self.schemas[schema.get_id()] = schema + # Load validators + validators = load_validators(config.validator_directory) + self.schemas.update(validators) + def create_schema_from_file(self, root, filename): # pylint: disable=no-self-use """Create a new JsonSchema object for a given file. diff --git a/schema_enforcer/schemas/validator.py b/schema_enforcer/schemas/validator.py new file mode 100644 index 0000000..7e4337e --- /dev/null +++ b/schema_enforcer/schemas/validator.py @@ -0,0 +1,110 @@ +"""Classes for custom validator plugins.""" +# pylint: disable=no-member, too-few-public-methods +# See PEP585 (https://www.python.org/dev/peps/pep-0585/) +from __future__ import annotations +import pkgutil +import inspect +import jmespath +from schema_enforcer.validation import ValidationResult + + +class BaseValidation: + """Base class for Validation classes.""" + + def __init__(self): + """Base init for all validation classes.""" + self._results: list[ValidationResult] = [] + + def add_validation_error(self, message: str, **kwargs): + """Add validator error to results. + + Args: + message (str): error message + kwargs (optional): additional arguments to add to ValidationResult when required + """ + self._results.append(ValidationResult(result="FAIL", schema_id=self.id, message=message, **kwargs)) + + def add_validation_pass(self, **kwargs): + """Add validator pass to results. + + Args: + kwargs (optional): additional arguments to add to ValidationResult when required + """ + self._results.append(ValidationResult(result="PASS", schema_id=self.id, **kwargs)) + + def get_results(self) -> list[ValidationResult]: + """Return all validation results for this validator.""" + if not self._results: + self._results.append(ValidationResult(result="PASS", schema_id=self.id)) + + return self._results + + def clear_results(self): + """Reset results for validator instance.""" + self._results = [] + + def validate(self, data: dict, strict: bool): + """Required function for custom validator. + + Args: + data (dict): variables to be validated by validator + strict (bool): true when --strict cli option is used to request strict validation (if provided) + + Returns: + None + + Use add_validation_error and add_validation_pass to report results. + """ + raise NotImplementedError + + +class JmesPathModelValidation(BaseValidation): + """Base class for JmesPathModelValidation classes.""" + + def validate(self, data: dict, strict: bool): # pylint: disable=W0613 + """Validate data using custom jmespath validator plugin.""" + operators = { + "gt": lambda r, v: int(r) > int(v), + "gte": lambda r, v: int(r) >= int(v), + "eq": lambda r, v: r == v, + "lt": lambda r, v: int(r) < int(v), + "lte": lambda r, v: int(r) <= int(v), + "contains": lambda r, v: v in r, + } + lhs = jmespath.search(self.left, data) + valid = True + if lhs: + # Check rhs for compiled jmespath expression + if isinstance(self.right, jmespath.parser.ParsedResult): + rhs = self.right.search(data) + else: + rhs = self.right + valid = operators[self.operator](lhs, rhs) + if not valid: + self.add_validation_error(self.error) + + +def is_validator(obj) -> bool: + """Returns True if the object is a BaseValidation or JmesPathModelValidation subclass.""" + try: + return issubclass(obj, BaseValidation) and obj not in (JmesPathModelValidation, BaseValidation) + except TypeError: + return False + + +def load_validators(validator_path: str) -> dict[str, BaseValidation]: + """Load all validator plugins from validator_path.""" + validators = dict() + for importer, module_name, _ in pkgutil.iter_modules([validator_path]): + module = importer.find_module(module_name).load_module(module_name) + for name, cls in inspect.getmembers(module, is_validator): + # Default to class name if id doesn't exist + if not hasattr(cls, "id"): + cls.id = name + if cls.id in validators: + print( + f"Unable to load the validator {cls.id}, there is already a validator with the same name ({name})." + ) + else: + validators[cls.id] = cls() + return validators diff --git a/tasks.py b/tasks.py index 5f28510..9087e51 100644 --- a/tasks.py +++ b/tasks.py @@ -160,7 +160,7 @@ def pytest(context, name=NAME, image_ver=IMAGE_VER, local=INVOKE_LOCAL): # pty is set to true to properly run the docker commands due to the invocation process of docker # https://docs.pyinvoke.org/en/latest/api/runners.html - Search for pty for more information # Install python module - exec_cmd = 'find tests/ -name "*.py" -a -not -name "test_cli_ansible_not_exists.py" | xargs pytest -vv' + exec_cmd = 'find tests/ -name "test_*.py" -a -not -name "test_cli_ansible_not_exists.py" | xargs pytest -vv' run_cmd(context, exec_cmd, name, image_ver, local) @@ -226,6 +226,7 @@ def pylint(context, name=NAME, image_ver=IMAGE_VER, local=INVOKE_LOCAL): """ # pty is set to true to properly run the docker commands due to the invocation process of docker # https://docs.pyinvoke.org/en/latest/api/runners.html - Search for pty for more information + # Examples directory excluded due to pylint duplicate-code errors exec_cmd = 'find . -name "*.py" | xargs pylint' run_cmd(context, exec_cmd, name, image_ver, local) diff --git a/tests/fixtures/test_validators/inventory/host_vars/az_phx_pe01/base.yml b/tests/fixtures/test_validators/inventory/host_vars/az_phx_pe01/base.yml new file mode 100644 index 0000000..e8adf5e --- /dev/null +++ b/tests/fixtures/test_validators/inventory/host_vars/az_phx_pe01/base.yml @@ -0,0 +1,22 @@ +--- +hostname: "az-phx-pe01" +pair_rtr: "az-phx-pe02" +upstreams: [] +interfaces: + MgmtEth0/0/CPU0/0: + ipv4: "172.16.1.1" + Loopback0: + ipv4: "192.168.1.1" + ipv6: "2001:db8:1::1" + GigabitEthernet0/0/0/0: + ipv4: "10.1.0.1" + ipv6: "2001:db8::" + peer: "az-phx-pe02" + peer_int: "GigabitEthernet0/0/0/0" + type: "core" + GigabitEthernet0/0/0/1: + ipv4: "10.1.0.37" + ipv6: "2001:db8::12" + peer: "co-den-p01" + peer_int: "GigabitEthernet0/0/0/2" + type: "core" diff --git a/tests/fixtures/test_validators/inventory/host_vars/az_phx_pe02/base.yml b/tests/fixtures/test_validators/inventory/host_vars/az_phx_pe02/base.yml new file mode 100644 index 0000000..3cfbd09 --- /dev/null +++ b/tests/fixtures/test_validators/inventory/host_vars/az_phx_pe02/base.yml @@ -0,0 +1,22 @@ +--- +hostname: "az-phx-pe02" +pair_rtr: "az-phx-pe01" +upstreams: [] +interfaces: + MgmtEth0/0/CPU0/0: + ipv4: "172.16.1.2" + Loopback0: + ipv4: "192.168.1.2" + ipv6: "2001:db8:1::2" + GigabitEthernet0/0/0/0: + ipv4: "10.1.0.2" + ipv6: "2001:db8::1" + peer: "az-phx-pe01" + peer_int: "GigabitEthernet0/0/0/0" + type: "core" + GigabitEthernet0/0/0/1: + ipv4: "10.1.0.41" + ipv6: "2001:db8::14" + peer: "co-den-p02" + peer_int: "GigabitEthernet0/0/0/2" + type: "access" diff --git a/tests/fixtures/test_validators/inventory/host_vars/co_den_p01/base.yml b/tests/fixtures/test_validators/inventory/host_vars/co_den_p01/base.yml new file mode 100644 index 0000000..d9d2692 --- /dev/null +++ b/tests/fixtures/test_validators/inventory/host_vars/co_den_p01/base.yml @@ -0,0 +1,19 @@ +--- +hostname: "co-den-p01" +pair_rtr: "co-den-p02" +interfaces: + MgmtEth0/0/CPU0/0: + ipv4: "172.16.1.5" + Loopback0: + ipv4: "192.168.1.5" + ipv6: "2001:db8:1::5" + GigabitEthernet0/0/0/2: + ipv4: "10.1.0.38" + ipv6: "2001:db8::13" + peer: "ut-slc-pe01" + peer_int: "GigabitEthernet0/0/0/2" + GigabitEthernet0/0/0/3: + ipv6: "2001:db8::16" + peer: "ut-slc-pe01" + peer_int: "GigabitEthernet0/0/0/1" + type: "core" diff --git a/tests/fixtures/test_validators/inventory/inventory.yml b/tests/fixtures/test_validators/inventory/inventory.yml new file mode 100644 index 0000000..55f9820 --- /dev/null +++ b/tests/fixtures/test_validators/inventory/inventory.yml @@ -0,0 +1,19 @@ +--- +all: + vars: + ansible_network_os: "iosxr" + ansible_user: "cisco" + ansible_password: "cisco" + ansible_connection: "netconf" + ansible_netconf_ssh_config: true + children: + pe_rtrs: + hosts: + az_phx_pe01: + ansible_host: "172.16.1.1" + az_phx_pe02: + ansible_host: "172.16.1.2" + p_rtrs: + hosts: + co_den_p01: + ansible_host: "172.16.1.3" diff --git a/tests/fixtures/test_validators/validators/check_interfaces.py b/tests/fixtures/test_validators/validators/check_interfaces.py new file mode 100644 index 0000000..960a9db --- /dev/null +++ b/tests/fixtures/test_validators/validators/check_interfaces.py @@ -0,0 +1,13 @@ +"""Test validator for JmesPathModelValidation class""" +from schema_enforcer.schemas.validator import JmesPathModelValidation + + +class CheckInterface(JmesPathModelValidation): # pylint: disable=too-few-public-methods + """Test validator for JmesPathModelValidation class""" + + top_level_properties = ["interfaces"] + id = "CheckInterface" # pylint: disable=invalid-name + left = "interfaces.*[@.type=='core'][] | length([?@])" + right = 2 + operator = "gte" + error = "Less than two core interfaces" diff --git a/tests/fixtures/test_validators/validators/check_interfaces_ipv4.py b/tests/fixtures/test_validators/validators/check_interfaces_ipv4.py new file mode 100644 index 0000000..37348b6 --- /dev/null +++ b/tests/fixtures/test_validators/validators/check_interfaces_ipv4.py @@ -0,0 +1,14 @@ +"""Test validator for JmesPathModelValidation class""" +import jmespath +from schema_enforcer.schemas.validator import JmesPathModelValidation + + +class CheckInterfaceIPv4(JmesPathModelValidation): # pylint: disable=too-few-public-methods + """Test validator for JmesPathModelValidation class""" + + top_level_properties = ["interfaces"] + id = "CheckInterfaceIPv4" # pylint: disable=invalid-name + left = "interfaces.*[@.type=='core'][] | length([?@])" + right = jmespath.compile("interfaces.* | length([?@.type=='core'][].ipv4)") + operator = "eq" + error = "All core interfaces do not have IPv4 addresses" diff --git a/tests/fixtures/test_validators/validators/check_peers.py b/tests/fixtures/test_validators/validators/check_peers.py new file mode 100644 index 0000000..ccea521 --- /dev/null +++ b/tests/fixtures/test_validators/validators/check_peers.py @@ -0,0 +1,42 @@ +"""Test validator for ModelValidation class""" +from schema_enforcer.schemas.validator import BaseValidation + + +def ansible_hostname(hostname: str): + """Convert hostname to ansible format""" + return hostname.replace("-", "_") + + +def normal_hostname(hostname: str): + """Convert ansible hostname to normal format""" + return hostname.replace("_", "-") + + +class CheckPeers(BaseValidation): # pylint: disable=too-few-public-methods + """ + Validate that peer and peer_int are defined properly on both sides of a connection + + Requires full Ansible host_vars as data which is currently unsupported in schema-enforcer + """ + + id = "CheckPeers" + + def validate(self, data: dict, strict: bool): + for host in data: + for interface, int_cfg in data[host]["interfaces"].items(): + if "peer" not in int_cfg: + continue + peer = int_cfg["peer"] + if "peer_int" not in int_cfg: + self.add_validation_error("Peer interface is not defined") + continue + peer_int = int_cfg["peer_int"] + peer = ansible_hostname(peer) + if peer not in data: + continue + peer_match = data[peer]["interfaces"][peer_int]["peer"] == normal_hostname(host) + peer_int_match = data[peer]["interfaces"][peer_int]["peer_int"] == interface + if peer_match and peer_int_match: + self.add_validation_pass() + else: + self.add_validation_error("Peer information does not match.") diff --git a/tests/test_jsonschema.py b/tests/test_jsonschema.py index 60d5f30..255d859 100644 --- a/tests/test_jsonschema.py +++ b/tests/test_jsonschema.py @@ -71,28 +71,36 @@ def test_validate(schema_instance, valid_instance_data, invalid_instance_data, s Args: schema_instance (JsonSchema): Instance of JsonSchema class """ - validation_results = list(schema_instance.validate(data=valid_instance_data)) + schema_instance.validate(data=valid_instance_data) + validation_results = schema_instance.get_results() assert len(validation_results) == 1 assert validation_results[0].schema_id == LOADED_SCHEMA_DATA.get("$id") assert validation_results[0].result == RESULT_PASS assert validation_results[0].message is None + schema_instance.clear_results() - validation_results = list(schema_instance.validate(data=invalid_instance_data)) + schema_instance.validate(data=invalid_instance_data) + validation_results = schema_instance.get_results() assert len(validation_results) == 1 assert validation_results[0].schema_id == LOADED_SCHEMA_DATA.get("$id") assert validation_results[0].result == RESULT_FAIL assert validation_results[0].message == "True is not of type 'string'" assert validation_results[0].absolute_path == ["dns_servers", "0", "address"] + schema_instance.clear_results() - validation_results = list(schema_instance.validate(data=strict_invalid_instance_data, strict=False)) + schema_instance.validate(data=strict_invalid_instance_data, strict=False) + validation_results = schema_instance.get_results() assert validation_results[0].result == RESULT_PASS + schema_instance.clear_results() - validation_results = list(schema_instance.validate(data=strict_invalid_instance_data, strict=True)) + schema_instance.validate(data=strict_invalid_instance_data, strict=True) + validation_results = schema_instance.get_results() assert validation_results[0].result == RESULT_FAIL assert ( validation_results[0].message == "Additional properties are not allowed ('fun_extr_attribute' was unexpected)" ) + schema_instance.clear_results() @staticmethod def test_validate_to_dict(schema_instance, valid_instance_data): diff --git a/tests/test_schemas_validator.py b/tests/test_schemas_validator.py new file mode 100644 index 0000000..cac4be4 --- /dev/null +++ b/tests/test_schemas_validator.py @@ -0,0 +1,155 @@ +"""Tests for validator plugin support.""" +# pylint: disable=redefined-outer-name +import os +import pytest +from schema_enforcer.ansible_inventory import AnsibleInventory +import schema_enforcer.schemas.validator as v + +FIXTURE_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "fixtures", "test_validators") + + +@pytest.fixture +def inventory(): + """Fixture for Ansible inventory used in tests.""" + inventory_dir = os.path.join(FIXTURE_DIR, "inventory") + + inventory = AnsibleInventory(inventory_dir) + return inventory + + +@pytest.fixture +def host_vars(inventory): + """Fixture for providing Ansible host_vars as a consolidated dict.""" + hosts = inventory.get_hosts_containing() + host_vars = dict() + for host in hosts: + hostname = host.get_vars()["inventory_hostname"] + host_vars[hostname] = inventory.get_host_vars(host) + return host_vars + + +@pytest.fixture(scope="session") +def validators(): + """Test that validator files are loaded and appended to base class validator list.""" + validator_path = os.path.join(FIXTURE_DIR, "validators") + return v.load_validators(validator_path) + + +def test_jmespathvalidation_pass(host_vars, validators): + """ + Validator: "interfaces.*[@.type=='core'][] | length([?@])" gte 2 + Test expected to pass for az_phx_pe01 with two core interfaces: + interfaces: + GigabitEthernet0/0/0/0: + type: "core" + GigabitEthernet0/0/0/1: + type: "core" + """ + validator = validators["CheckInterface"] + validator.validate(host_vars["az_phx_pe01"], False) + result = validator.get_results() + assert result[0].passed() + validator.clear_results() + + +def test_jmespathvalidation_fail(host_vars, validators): + """ + Validator: "interfaces.*[@.type=='core'][] | length([?@])" gte 2 + Test expected to fail for az_phx_pe02 with one core interface: + interfaces: + GigabitEthernet0/0/0/0: + type: "core" + GigabitEthernet0/0/0/1: + type: "access" + """ + validator = validators["CheckInterface"] + validator.validate(host_vars["az_phx_pe02"], False) + result = validator.get_results() + assert not result[0].passed() + validator.clear_results() + + +def test_jmespathvalidation_with_compile_pass(host_vars, validators): + """ + Validator: "interfaces.*[@.type=='core'][] | length([?@])" eq jmespath.compile("interfaces.* | length([?@.type=='core'][].ipv4)") + Test expected to pass for az_phx_pe01 where all core interfaces have IPv4 addresses: + GigabitEthernet0/0/0/0: + ipv4: "10.1.0.1" + ipv6: "2001:db8::" + peer: "az-phx-pe02" + peer_int: "GigabitEthernet0/0/0/0" + type: "core" + GigabitEthernet0/0/0/1: + ipv4: "10.1.0.37" + ipv6: "2001:db8::12" + peer: "co-den-p01" + peer_int: "GigabitEthernet0/0/0/2" + type: "core" + """ + validator = validators["CheckInterfaceIPv4"] + validator.validate(host_vars["az_phx_pe01"], False) + result = validator.get_results() + assert result[0].passed() + validator.clear_results() + + +def test_jmespathvalidation_with_compile_fail(host_vars, validators): + """ + Validator: "interfaces.*[@.type=='core'][] | length([?@])" eq jmespath.compile("interfaces.* | length([?@.type=='core'][].ipv4)") + Test expected to fail for co_den_p01 where core interface is missing an IPv4 addresses: + GigabitEthernet0/0/0/3: + ipv6: "2001:db8::16" + peer: "ut-slc-pe01" + peer_int: "GigabitEthernet0/0/0/1" + type: "core" + """ + validator = validators["CheckInterfaceIPv4"] + validator.validate(host_vars["co_den_p01"], False) + result = validator.get_results() + assert not result[0].passed() + validator.clear_results() + + +def test_modelvalidation_pass(host_vars, validators): + """ + Validator: Checks that peer and peer_int match between peers + Test expected to pass for az_phx_pe01/az_phx_pe02: + + az_phx_pe01: + GigabitEthernet0/0/0/0: + peer: "az-phx-pe02" + peer_int: "GigabitEthernet0/0/0/0" + + az_phx_pe02: + GigabitEthernet0/0/0/0: + peer: "az-phx-pe01" + peer_int: "GigabitEthernet0/0/0/0" + """ + validator = validators["CheckPeers"] + validator.validate(host_vars, False) + result = validator.get_results() + assert result[0].passed() + assert result[2].passed() + validator.clear_results() + + +def test_modelvalidation_fail(host_vars, validators): + """ + Validator: Checks that peer and peer_int match between peers + + Test expected to fail for az_phx_pe01/co_den_p01: + + az_phx_pe01: + GigabitEthernet0/0/0/1: + peer: "co-den-p01" + peer_int: "GigabitEthernet0/0/0/2" + + co_den_p01: + GigabitEthernet0/0/0/2: + peer: ut-slc-pe01 + peer_int: GigabitEthernet0/0/0/2 + """ + validator = validators["CheckPeers"] + validator.validate(host_vars, False) + result = validator.get_results() + assert not result[1].passed()