diff --git a/src/cfnlint/conditions/_rule.py b/src/cfnlint/conditions/_rule.py new file mode 100644 index 0000000000..2f624f6ac3 --- /dev/null +++ b/src/cfnlint/conditions/_rule.py @@ -0,0 +1,135 @@ +""" +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from __future__ import annotations + +from typing import Any, Dict + +from sympy import And, Implies, Symbol +from sympy.logic.boolalg import BooleanFunction + +from cfnlint.conditions._condition import ( + ConditionAnd, + ConditionList, + ConditionNamed, + ConditionNot, + ConditionOr, +) +from cfnlint.conditions._equals import Equal +from cfnlint.helpers import FUNCTION_CONDITIONS + +# we leave the type hinting here +_RULE = Dict[str, Any] + + +class _Assertion: + def __init__(self, condition: Any, all_conditions: dict[str, dict]) -> None: + self._fn_equals: Equal | None = None + self._condition: ConditionList | ConditionNamed | None = None + + if len(condition) == 1: + for k, v in condition.items(): + if k in FUNCTION_CONDITIONS: + if not isinstance(v, list): + raise ValueError(f"{k} value should be an array") + if k == "Fn::Equals": + self._fn_equals = Equal(v) + elif k == "Fn::And": + self._condition = ConditionAnd(v, all_conditions) + elif k == "Fn::Or": + self._condition = ConditionOr(v, all_conditions) + elif k == "Fn::Not": + self._condition = ConditionNot(v, all_conditions) + elif k == "Condition": + if not isinstance(v, str): + raise ValueError(f"Condition value {v!r} must be a string") + self._condition = ConditionNamed(v, all_conditions) + else: + raise ValueError(f"Unknown key ({k}) in condition") + else: + raise ValueError("Condition value must be an object of length 1") + + def build_cnf(self, params: dict[str, Symbol]) -> BooleanFunction | Symbol | None: + if self._fn_equals: + return self._fn_equals.hash + + if self._condition: + return self._condition.build_cnf(params) + + return None + + @property + def equals(self) -> list[Equal]: + if self._fn_equals: + return [self._fn_equals] + if self._condition: + return self._condition.equals + return [] + + +class _Assertions: + def __init__(self, assertions: list[dict], all_conditions: dict[str, dict]) -> None: + self._assertions: list[_Assertion] = [] + for assertion in assertions: + assert_ = assertion.get("Assert", {}) + self._assertions.append(_Assertion(assert_, all_conditions)) + + def build_cnf(self, params: dict[str, Symbol]) -> BooleanFunction | Symbol | None: + + assertions = [] + for assertion in self._assertions: + assertions.append(assertion.build_cnf(params)) + + return And(*assertions) + + @property + def equals(self) -> list[Equal]: + + results = [] + for assertion in self._assertions: + results.extend(assertion.equals) + return results + + +class Rule: + + def __init__(self, rule: _RULE, all_conditions: dict[str, dict]) -> None: + self._condition: _Assertion | None = None + self._assertions: _Assertions | None = None + self._init_rule(rule, all_conditions) + + def _init_rule( + self, + rule: _RULE, + all_conditions: dict[str, dict], + ) -> None: + condition = rule.get("RuleCondition") + if condition: + self._condition = _Assertion(condition, all_conditions) + + assertions = rule.get("Assertions") + if not assertions: + raise ValueError("Rule must have Assertions") + self._assertions = _Assertions(assertions, all_conditions) + + @property + def equals(self) -> list[Equal]: + result = [] + if self._condition: + result.extend(self._condition.equals) + if self._assertions: + result.extend(self._assertions.equals) + return result + + def build_cnf(self, params: dict[str, Symbol]) -> BooleanFunction | Symbol | None: + + if self._assertions: + if self._condition: + return Implies( + self._condition.build_cnf(params), + self._assertions.build_cnf(params), + ) + return self._assertions.build_cnf(params) + return None diff --git a/src/cfnlint/conditions/conditions.py b/src/cfnlint/conditions/conditions.py index 915932361c..5bc2ec64ae 100644 --- a/src/cfnlint/conditions/conditions.py +++ b/src/cfnlint/conditions/conditions.py @@ -19,6 +19,7 @@ from cfnlint.conditions._condition import ConditionNamed from cfnlint.conditions._equals import Equal, EqualParameter from cfnlint.conditions._errors import UnknownSatisfisfaction +from cfnlint.conditions._rule import Rule from cfnlint.conditions._utils import get_hash LOGGER = logging.getLogger(__name__) @@ -32,8 +33,10 @@ class Conditions: def __init__(self, cfn) -> None: self._conditions: dict[str, ConditionNamed] = {} self._parameters: dict[str, list[str]] = {} + self._rules: list[Rule] = [] self._init_conditions(cfn=cfn) self._init_parameters(cfn=cfn) + self._init_rules(cfn=cfn) self._cnf, self._solver_params = self._build_cnf(list(self._conditions.keys())) def _init_conditions(self, cfn): @@ -74,6 +77,29 @@ def _init_parameters(self, cfn: Any) -> None: if isinstance(allowed_value, (str, int, float, bool)): self._parameters[param_hash].append(get_hash(str(allowed_value))) + def _init_rules(self, cfn: Any) -> None: + rules = cfn.template.get("Rules") + conditions = cfn.template.get("Conditions") + if not isinstance(rules, dict) or not isinstance(conditions, dict): + return + for k, v in rules.items(): + if not isinstance(rules, dict): + continue + try: + self._rules.append(Rule(v, conditions)) + except ValueError as e: + LOGGER.debug("Captured error while building rule %s: %s", k, str(e)) + except Exception as e: # pylint: disable=broad-exception-caught + if LOGGER.getEffectiveLevel() == logging.DEBUG: + error_message = traceback.format_exc() + else: + error_message = str(e) + LOGGER.debug( + "Captured unknown error while building rule %s: %s", + k, + error_message, + ) + def get(self, name: str, default: Any = None) -> ConditionNamed: """Return the conditions""" return self._conditions.get(name, default) @@ -155,6 +181,9 @@ def _build_cnf( if prop is not None: cnf.add_prop(Not(prop)) + for rule in self._rules: + cnf.add_prop(rule.build_cnf(equal_vars)) + return (cnf, equal_vars) def build_scenarios( diff --git a/src/cfnlint/context/conditions/_conditions.py b/src/cfnlint/context/conditions/_conditions.py index b6d1db3220..4537ffacaf 100644 --- a/src/cfnlint/context/conditions/_conditions.py +++ b/src/cfnlint/context/conditions/_conditions.py @@ -34,7 +34,7 @@ class Conditions: @classmethod def create_from_instance( - cls, conditions: Any, parameters: dict[str, "Parameter"] + cls, conditions: Any, rules: dict[str, dict], parameters: dict[str, "Parameter"] ) -> "Conditions": obj: dict[str, Condition] = {} if not isinstance(conditions, dict): diff --git a/src/cfnlint/context/context.py b/src/cfnlint/context/context.py index 14fe3a938d..0773d798b7 100644 --- a/src/cfnlint/context/context.py +++ b/src/cfnlint/context/context.py @@ -431,10 +431,12 @@ def create_context_for_template(cfn): try: conditions = Conditions.create_from_instance( - cfn.template.get("Conditions", {}), parameters + cfn.template.get("Conditions", {}), + cfn.template.get("Rules", {}), + parameters, ) except (ValueError, AttributeError): - conditions = Conditions.create_from_instance({}, {}) + conditions = Conditions.create_from_instance({}, {}, {}) mappings = Mappings.create_from_dict(cfn.template.get("Mappings", {})) diff --git a/test/integration/__init__.py b/test/integration/__init__.py index 9d57623163..0f1d640d36 100644 --- a/test/integration/__init__.py +++ b/test/integration/__init__.py @@ -113,7 +113,6 @@ def run_module_integration_scenarios(self, config): runner = Runner(scenario_config) - print(f"Running test for {filename!r}") with patch("sys.exit") as exit: with patch("sys.stdout", new=StringIO()) as out: runner.cli() diff --git a/test/unit/module/conditions/test_rules.py b/test/unit/module/conditions/test_rules.py new file mode 100644 index 0000000000..4b5becf6be --- /dev/null +++ b/test/unit/module/conditions/test_rules.py @@ -0,0 +1,306 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from unittest import TestCase + +from cfnlint.conditions._rule import _Assertion +from cfnlint.decode import decode_str +from cfnlint.template import Template + + +class TestConditionsWithRules(TestCase): + + def test_conditions_with_rules(self): + template = decode_str( + """ + Conditions: + IsProd: !Equals [!Ref Environment, "prod"] + IsUsEast1: !Equals [!Ref "AWS::Region", "us-east-1"] + Rules: + Rule1: + Assertions: + - Assert: + Fn::And: + - !Condition IsProd + - !Condition IsUsEast1 + Rule2: + Assertions: + - Assert: + Fn::Or: + - !Condition IsProd + - !Condition IsUsEast1 + """ + )[0] + + cfn = Template("", template) + self.assertEqual(len(cfn.conditions._conditions), 2) + self.assertEqual(len(cfn.conditions._rules), 2) + + self.assertListEqual( + [equal.hash for equal in cfn.conditions._rules[0].equals], + [ + "d0f5e92fc5233a6b011342df171f191838491056", + "362a2ca660fa34c91feeee4681e8433101d2a687", + ], + ) + + self.assertTrue( + cfn.conditions.satisfiable( + {"IsProd": True, "IsUsEast1": True}, + {"AWS::Region": "us-east-1", "Environment": "prod"}, + ) + ) + self.assertFalse( + cfn.conditions.satisfiable( + {"IsProd": True, "IsUsEast1": False}, + {"AWS::Region": "us-west-2", "Environment": "prod"}, + ) + ) + self.assertFalse( + cfn.conditions.satisfiable( + {"IsProd": False, "IsUsEast1": True}, + {"AWS::Region": "us-east-1", "Environment": "dev"}, + ) + ) + self.assertFalse( + cfn.conditions.satisfiable( + {"IsProd": False, "IsUsEast1": False}, + {"AWS::Region": "us-west-2", "Environment": "dev"}, + ) + ) + + def test_conditions_with_rules_implies(self): + template = decode_str( + """ + Conditions: + IsProd: !Equals [!Ref Environment, "prod"] + IsUsEast1: !Equals [!Ref "AWS::Region", "us-east-1"] + Rules: + Rule: + RuleCondition: !Condition IsProd + Assertions: + - Assert: !Condition IsUsEast1 + + """ + )[0] + + cfn = Template("", template) + self.assertEqual(len(cfn.conditions._conditions), 2) + self.assertEqual(len(cfn.conditions._rules), 1) + + self.assertListEqual( + [equal.hash for equal in cfn.conditions._rules[0].equals], + [ + "d0f5e92fc5233a6b011342df171f191838491056", + "362a2ca660fa34c91feeee4681e8433101d2a687", + ], + ) + + self.assertTrue( + cfn.conditions.satisfiable( + {"IsProd": True, "IsUsEast1": True}, + {"AWS::Region": "us-east-1", "Environment": "prod"}, + ) + ) + self.assertFalse( + cfn.conditions.satisfiable( + {"IsProd": True, "IsUsEast1": False}, + {"AWS::Region": "us-west-2", "Environment": "prod"}, + ) + ) + self.assertTrue( + cfn.conditions.satisfiable( + {"IsProd": False, "IsUsEast1": True}, + {"AWS::Region": "us-east-1", "Environment": "dev"}, + ) + ) + self.assertTrue( + cfn.conditions.satisfiable( + {"IsProd": False, "IsUsEast1": False}, + {"AWS::Region": "us-west-2", "Environment": "dev"}, + ) + ) + + def test_conditions_with_multiple_rules(self): + template = decode_str( + """ + Parameters: + Environment: + Type: String + Default: dev + AllowedValues: + - dev + - stage + - prod + Conditions: + IsProd: !Equals [!Ref Environment, "prod"] + IsDev: !Equals [!Ref Environment, "dev"] + IsUsEast1: !Equals [!Ref "AWS::Region", "us-east-1"] + IsNotUsEast1: !Not [!Condition IsUsEast1] + Rules: + Rule1: + RuleCondition: !Equals [!Ref Environment, "prod"] + Assertions: + - Assert: !Condition IsUsEast1 + Rule2: + RuleCondition: !Equals [!Ref Environment, "dev"] + Assertions: + - Assert: !Not [!Condition IsUsEast1] + """ + )[0] + + cfn = Template("", template) + self.assertEqual(len(cfn.conditions._conditions), 4) + self.assertEqual(len(cfn.conditions._rules), 2) + + self.assertListEqual( + [equal.hash for equal in cfn.conditions._rules[0].equals], + [ + "d0f5e92fc5233a6b011342df171f191838491056", + "362a2ca660fa34c91feeee4681e8433101d2a687", + ], + ) + self.assertListEqual( + [equal.hash for equal in cfn.conditions._rules[1].equals], + [ + "d2dab653475dd270354fe84c4f80b54883e958bb", + "362a2ca660fa34c91feeee4681e8433101d2a687", + ], + ) + + self.assertTrue( + cfn.conditions.satisfiable( + { + "IsProd": True, + "IsUsEast1": True, + "IsDev": False, + "IsNotUsEast1": False, + }, + {"AWS::Region": "us-east-1", "Environment": "prod"}, + ) + ) + self.assertFalse( + cfn.conditions.satisfiable( + { + "IsProd": True, + "IsUsEast1": False, + "IsDev": False, + "IsNotUsEast1": False, + }, + {"AWS::Region": "us-west-2", "Environment": "prod"}, + ) + ) + self.assertFalse( + cfn.conditions.satisfiable( + { + "IsProd": False, + "IsUsEast1": True, + "IsDev": True, + "IsNotUsEast1": False, + }, + {"AWS::Region": "us-east-1", "Environment": "dev"}, + ) + ) + self.assertTrue( + cfn.conditions.satisfiable( + { + "IsProd": False, + "IsUsEast1": False, + "IsDev": True, + "IsNotUsEast1": True, + }, + {"AWS::Region": "us-west-2", "Environment": "dev"}, + ) + ) + self.assertTrue( + cfn.conditions.satisfiable( + { + "IsProd": False, + "IsUsEast1": True, + "IsDev": False, + "IsNotUsEast1": False, + }, + {"AWS::Region": "us-east-1", "Environment": "stage"}, + ) + ) + self.assertTrue( + cfn.conditions.satisfiable( + { + "IsProd": False, + "IsUsEast1": False, + "IsDev": False, + "IsNotUsEast1": True, + }, + {"AWS::Region": "us-west-2", "Environment": "stage"}, + ) + ) + + +class TestAssertion(TestCase): + def test_assertion_errors(self): + with self.assertRaises(ValueError): + _Assertion({"A": "B", "C": "D"}, {}) + + with self.assertRaises(ValueError): + _Assertion({"Fn::Not": {"C": "D"}}, {}) + + with self.assertRaises(ValueError): + _Assertion({"Not": {"C": "D"}}, {}) + + with self.assertRaises(ValueError): + _Assertion({"Condition": {"C": "D"}}, {}) + + def test_init_rules_with_list(self): + template = decode_str( + """ + Conditions: + IsUsEast1: !Equals [!Ref "AWS::Region", "us-east-1"] + IsNotUsEast1: !Not [!Condition IsUsEast1] + Rules: [] + """ + )[0] + + cfn = Template("", template) + self.assertListEqual(cfn.conditions._rules, []) + + def test_init_rules_with_wrong_assertions_type(self): + template = decode_str( + """ + Conditions: + IsUsEast1: !Equals [!Ref "AWS::Region", "us-east-1"] + IsNotUsEast1: !Not [!Condition IsUsEast1] + Rules: + Rule1: + Assertions: {"Foo": "Bar"} + Rule2: + Assertions: + - Assert: !Condition IsUsEast1 + """ + )[0] + + cfn = Template("", template) + self.assertEqual(len(cfn.conditions._rules), 1) + + def test_init_rules_with_no_keys(self): + template = decode_str( + """ + Conditions: + IsUsEast1: !Equals [!Ref "AWS::Region", "us-east-1"] + IsNotUsEast1: !Not [!Condition IsUsEast1] + Rules: + Rule1: + Foo: Bar + Rule2: + Assertions: + - Assert: + Fn::Or: + - !Condition IsNotUsEast1 + - !Condition IsUsEast1 + Rule3: [] + """ + )[0] + + cfn = Template("", template) + self.assertEqual(len(cfn.conditions._rules), 1) diff --git a/test/unit/module/context/conditions/test_conditions.py b/test/unit/module/context/conditions/test_conditions.py index 21e32d00bf..a5e44e0482 100644 --- a/test/unit/module/context/conditions/test_conditions.py +++ b/test/unit/module/context/conditions/test_conditions.py @@ -296,4 +296,4 @@ def test_evolve_from_instance(current_status, instance, expected): def test_condition_failures(): with pytest.raises(ValueError): - Conditions.create_from_instance([], {}) + Conditions.create_from_instance([], {}, {}) diff --git a/test/unit/module/context/test_context.py b/test/unit/module/context/test_context.py index 422bbf3a4d..557be9b341 100644 --- a/test/unit/module/context/test_context.py +++ b/test/unit/module/context/test_context.py @@ -18,6 +18,7 @@ def test_class(self): "Foo": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-east-1"]}, "Bar": {"Fn::Equals": [{"Ref": "AWS::Region"}, "us-west-2"]}, }, + rules={}, parameters={}, ), )