Skip to content

Commit

Permalink
Add Condition logic for template Rules (#3634)
Browse files Browse the repository at this point in the history
* Add rule logic to v0 condition logic
  • Loading branch information
kddejong committed Sep 2, 2024
1 parent 44b7aa1 commit c8560af
Show file tree
Hide file tree
Showing 8 changed files with 477 additions and 5 deletions.
135 changes: 135 additions & 0 deletions src/cfnlint/conditions/_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from typing import Any, Dict

from sympy import And, Implies, Symbol
from sympy.logic.boolalg import BooleanFunction

from cfnlint.conditions._condition import (
ConditionAnd,
ConditionList,
ConditionNamed,
ConditionNot,
ConditionOr,
)
from cfnlint.conditions._equals import Equal
from cfnlint.helpers import FUNCTION_CONDITIONS

# we leave the type hinting here
_RULE = Dict[str, Any]


class _Assertion:
def __init__(self, condition: Any, all_conditions: dict[str, dict]) -> None:
self._fn_equals: Equal | None = None
self._condition: ConditionList | ConditionNamed | None = None

if len(condition) == 1:
for k, v in condition.items():
if k in FUNCTION_CONDITIONS:
if not isinstance(v, list):
raise ValueError(f"{k} value should be an array")
if k == "Fn::Equals":
self._fn_equals = Equal(v)
elif k == "Fn::And":
self._condition = ConditionAnd(v, all_conditions)
elif k == "Fn::Or":
self._condition = ConditionOr(v, all_conditions)
elif k == "Fn::Not":
self._condition = ConditionNot(v, all_conditions)
elif k == "Condition":
if not isinstance(v, str):
raise ValueError(f"Condition value {v!r} must be a string")
self._condition = ConditionNamed(v, all_conditions)
else:
raise ValueError(f"Unknown key ({k}) in condition")
else:
raise ValueError("Condition value must be an object of length 1")

def build_cnf(self, params: dict[str, Symbol]) -> BooleanFunction | Symbol | None:
if self._fn_equals:
return self._fn_equals.hash

if self._condition:
return self._condition.build_cnf(params)

return None

@property
def equals(self) -> list[Equal]:
if self._fn_equals:
return [self._fn_equals]
if self._condition:
return self._condition.equals
return []


class _Assertions:
def __init__(self, assertions: list[dict], all_conditions: dict[str, dict]) -> None:
self._assertions: list[_Assertion] = []
for assertion in assertions:
assert_ = assertion.get("Assert", {})
self._assertions.append(_Assertion(assert_, all_conditions))

def build_cnf(self, params: dict[str, Symbol]) -> BooleanFunction | Symbol | None:

assertions = []
for assertion in self._assertions:
assertions.append(assertion.build_cnf(params))

return And(*assertions)

@property
def equals(self) -> list[Equal]:

results = []
for assertion in self._assertions:
results.extend(assertion.equals)
return results


class Rule:

def __init__(self, rule: _RULE, all_conditions: dict[str, dict]) -> None:
self._condition: _Assertion | None = None
self._assertions: _Assertions | None = None
self._init_rule(rule, all_conditions)

def _init_rule(
self,
rule: _RULE,
all_conditions: dict[str, dict],
) -> None:
condition = rule.get("RuleCondition")
if condition:
self._condition = _Assertion(condition, all_conditions)

assertions = rule.get("Assertions")
if not assertions:
raise ValueError("Rule must have Assertions")
self._assertions = _Assertions(assertions, all_conditions)

@property
def equals(self) -> list[Equal]:
result = []
if self._condition:
result.extend(self._condition.equals)
if self._assertions:
result.extend(self._assertions.equals)
return result

def build_cnf(self, params: dict[str, Symbol]) -> BooleanFunction | Symbol | None:

if self._assertions:
if self._condition:
return Implies(
self._condition.build_cnf(params),
self._assertions.build_cnf(params),
)
return self._assertions.build_cnf(params)
return None
29 changes: 29 additions & 0 deletions src/cfnlint/conditions/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from cfnlint.conditions._condition import ConditionNamed
from cfnlint.conditions._equals import Equal, EqualParameter
from cfnlint.conditions._errors import UnknownSatisfisfaction
from cfnlint.conditions._rule import Rule
from cfnlint.conditions._utils import get_hash

LOGGER = logging.getLogger(__name__)
Expand All @@ -32,8 +33,10 @@ class Conditions:
def __init__(self, cfn) -> None:
self._conditions: dict[str, ConditionNamed] = {}
self._parameters: dict[str, list[str]] = {}
self._rules: list[Rule] = []
self._init_conditions(cfn=cfn)
self._init_parameters(cfn=cfn)
self._init_rules(cfn=cfn)
self._cnf, self._solver_params = self._build_cnf(list(self._conditions.keys()))

def _init_conditions(self, cfn):
Expand Down Expand Up @@ -74,6 +77,29 @@ def _init_parameters(self, cfn: Any) -> None:
if isinstance(allowed_value, (str, int, float, bool)):
self._parameters[param_hash].append(get_hash(str(allowed_value)))

def _init_rules(self, cfn: Any) -> None:
rules = cfn.template.get("Rules")
conditions = cfn.template.get("Conditions")
if not isinstance(rules, dict) or not isinstance(conditions, dict):
return
for k, v in rules.items():
if not isinstance(rules, dict):
continue
try:
self._rules.append(Rule(v, conditions))
except ValueError as e:
LOGGER.debug("Captured error while building rule %s: %s", k, str(e))
except Exception as e: # pylint: disable=broad-exception-caught
if LOGGER.getEffectiveLevel() == logging.DEBUG:
error_message = traceback.format_exc()
else:
error_message = str(e)
LOGGER.debug(
"Captured unknown error while building rule %s: %s",
k,
error_message,
)

def get(self, name: str, default: Any = None) -> ConditionNamed:
"""Return the conditions"""
return self._conditions.get(name, default)
Expand Down Expand Up @@ -155,6 +181,9 @@ def _build_cnf(
if prop is not None:
cnf.add_prop(Not(prop))

for rule in self._rules:
cnf.add_prop(rule.build_cnf(equal_vars))

return (cnf, equal_vars)

def build_scenarios(
Expand Down
2 changes: 1 addition & 1 deletion src/cfnlint/context/conditions/_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Conditions:

@classmethod
def create_from_instance(
cls, conditions: Any, parameters: dict[str, "Parameter"]
cls, conditions: Any, rules: dict[str, dict], parameters: dict[str, "Parameter"]
) -> "Conditions":
obj: dict[str, Condition] = {}
if not isinstance(conditions, dict):
Expand Down
6 changes: 4 additions & 2 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,12 @@ def create_context_for_template(cfn):

try:
conditions = Conditions.create_from_instance(
cfn.template.get("Conditions", {}), parameters
cfn.template.get("Conditions", {}),
cfn.template.get("Rules", {}),
parameters,
)
except (ValueError, AttributeError):
conditions = Conditions.create_from_instance({}, {})
conditions = Conditions.create_from_instance({}, {}, {})

mappings = Mappings.create_from_dict(cfn.template.get("Mappings", {}))

Expand Down
1 change: 0 additions & 1 deletion test/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def run_module_integration_scenarios(self, config):

runner = Runner(scenario_config)

print(f"Running test for {filename!r}")
with patch("sys.exit") as exit:
with patch("sys.stdout", new=StringIO()) as out:
runner.cli()
Expand Down
Loading

0 comments on commit c8560af

Please sign in to comment.