From fabe08f18cb0fe39db05ec748a58b9af8f20d2d8 Mon Sep 17 00:00:00 2001 From: Max Amelchenko Date: Wed, 4 Sep 2024 11:37:06 +0300 Subject: [PATCH] feat(terraform_plan): add support for provider in tf_plan framework (#6690) * add support for provider in tf_plan framework * add new plan registry * fix test and remove import * fix tests due to an added passed provider check * skip loop in case of empty resource address * add a failing test * remove new plan registry and make tf_plan provider definition adhere to tf provider definition * change test back to original * fix test --------- Co-authored-by: Max Amelchenko --- checkov/terraform/plan_parser.py | 16 ++++++++++++++-- checkov/terraform/plan_runner.py | 18 +++++++++++------- checkov/terraform/plan_utils.py | 15 ++++++++------- .../graph/graph_builder/test_graph_builder.py | 4 ++-- .../resources/aws/batch_tfplan.json | 3 +++ .../test_plan_runner_aws_resources.py | 10 +++++----- tests/terraform/parser/test_plan_parser.py | 4 ++-- tests/terraform/runner/test_plan_runner.py | 2 +- 8 files changed, 46 insertions(+), 26 deletions(-) diff --git a/checkov/terraform/plan_parser.py b/checkov/terraform/plan_parser.py index 7f2b103087d..041b442cb5f 100644 --- a/checkov/terraform/plan_parser.py +++ b/checkov/terraform/plan_parser.py @@ -8,10 +8,12 @@ from checkov.common.graph.graph_builder import CustomAttributes from checkov.common.parsers.node import ListNode -from checkov.common.util.consts import LINE_FIELD_NAMES, TRUE_AFTER_UNKNOWN +from checkov.common.util.consts import LINE_FIELD_NAMES, TRUE_AFTER_UNKNOWN, START_LINE, END_LINE from checkov.common.util.type_forcers import force_list from checkov.terraform.context_parsers.tf_plan import parse +from hcl2 import START_LINE as start_line, END_LINE as end_line + SIMPLE_TYPES = (str, int, float, bool) TF_PLAN_RESOURCE_ADDRESS = CustomAttributes.TF_RESOURCE_ADDRESS TF_PLAN_RESOURCE_CHANGE_ACTIONS = "__change_actions__" @@ -318,12 +320,22 @@ def _get_provider(template: dict[str, dict[str, Any]]) -> dict[str, dict[str, An # Not a provider, skip continue provider_map[provider_key] = {} + provider_alias = provider_data.get("alias", "default") + provider_map_entry = provider_map[provider_key] for field, value in provider_data.get('expressions', {}).items(): if field in LINE_FIELD_NAMES or not isinstance(value, dict): continue # don't care about line #s or non dicts expression_value = value.get('constant_value', None) if expression_value: - provider_map[provider_key][field] = expression_value + if isinstance(expression_value, str): + expression_value = [expression_value] + provider_map_entry[field] = expression_value + provider_map_entry['start_line'] = [provider_data.get(START_LINE, 1) - 1] + provider_map_entry['end_line'] = [provider_data.get(END_LINE, 1)] + provider_map_entry[start_line] = [provider_data.get(START_LINE, 1) - 1] + provider_map_entry[end_line] = [provider_data.get(END_LINE, 1)] + provider_map_entry['alias'] = [provider_alias] + provider_map_entry[TF_PLAN_RESOURCE_ADDRESS] = f"{provider_key}.{provider_alias}" return provider_map diff --git a/checkov/terraform/plan_runner.py b/checkov/terraform/plan_runner.py index e666ccc8ce7..eab251c33f6 100644 --- a/checkov/terraform/plan_runner.py +++ b/checkov/terraform/plan_runner.py @@ -26,6 +26,7 @@ from checkov.runner_filter import RunnerFilter from checkov.terraform.base_runner import BaseTerraformRunner from checkov.terraform.checks.data.registry import data_registry +from checkov.terraform.checks.provider.registry import provider_registry from checkov.terraform.checks.resource.registry import resource_registry from checkov.terraform.context_parsers.registry import parser_registry from checkov.terraform.plan_parser import TF_PLAN_RESOURCE_ADDRESS @@ -95,6 +96,7 @@ def __init__(self, graph_class: Type[TerraformLocalGraph] = TerraformLocalGraph, block_type_registries = { # noqa: CCE003 # a static attribute 'resource': resource_registry, 'data': data_registry, + 'provider': provider_registry } def run( @@ -199,8 +201,8 @@ def check_tf_definition( logging.debug(f"Scanning file: {scanned_file}") for block_type in definition.keys(): if block_type in self.block_type_registries.keys(): - self.run_block(definition[block_type], None, full_file_path, root_folder, report, scanned_file, - block_type, runner_filter) + self.run_block(definition[block_type], self.context, full_file_path, root_folder, + report, scanned_file, block_type, runner_filter) @staticmethod def _get_file_path(full_file_path: TFDefinitionKeyType, root_folder: str | pathlib.Path) -> tuple[str, str]: @@ -237,8 +239,11 @@ def run_block( entity_context = self.get_entity_context(definition_path, full_file_path, entity) entity_lines_range = [entity_context.get('start_line', 1), entity_context.get('end_line', 1)] entity_code_lines = entity_context.get('code_lines', []) - entity_address = entity_context['address'] _, _, entity_config = registry.extract_entity_details(entity) + entity_address = entity_context.get('address') or entity_context.get(CustomAttributes.TF_RESOURCE_ADDRESS) + if not entity_address: + logging.warning('tf plan resource address should not be empty') + continue self._assign_graph_to_registry(registry) results = registry.scan(scanned_file, entity, [], runner_filter, report_type=CheckType.TERRAFORM_PLAN) @@ -290,16 +295,15 @@ def get_entity_context_and_evaluations(self, entity: dict[str, Any]) -> dict[str raw_context['definition_path'] = entity[CustomAttributes.BLOCK_NAME].split('.') return raw_context - def get_entity_context( - self, definition_path: list[str], full_file_path: str, entity: dict[str, Any] - ) -> dict[str, Any]: + def get_entity_context(self, definition_path: list[str], full_file_path: str, entity: dict[str, Any]) -> dict[str, Any]: if not self.context: return {} if len(definition_path) > 1: resource_type = definition_path[0] resource_name = definition_path[1] - entity_id = entity.get(resource_type, {}).get(resource_name, {}).get(TF_PLAN_RESOURCE_ADDRESS) + resource_type_dict = entity.get(resource_type, {}) + entity_id = resource_type_dict.get(resource_name, resource_type_dict).get(TF_PLAN_RESOURCE_ADDRESS) else: entity_id = definition_path[0] return cast("dict[str, Any]", self.context.get(full_file_path, {}).get(entity_id, {})) diff --git a/checkov/terraform/plan_utils.py b/checkov/terraform/plan_utils.py index d2003662555..e94fda49b2f 100644 --- a/checkov/terraform/plan_utils.py +++ b/checkov/terraform/plan_utils.py @@ -65,7 +65,7 @@ def build_definitions_context( definitions_raw: Dict[str, List[Tuple[int, str]]] ) -> Dict[str, Dict[str, Any]]: definitions_context: dict[str, dict[str, Any]] = defaultdict(dict) - supported_block_types = ("data", "resource") + supported_block_types = ("data", "resource", "provider") for full_file_path, definition in definitions.items(): for block_type in supported_block_types: entities = definition.get(block_type, []) @@ -76,7 +76,8 @@ def build_definitions_context( if len(definition_path) > 1: resource_type = definition_path[0] resource_name = definition_path[1] - entity_id = entity.get(resource_type, {}).get(resource_name, {}).get(TF_PLAN_RESOURCE_ADDRESS) + resource_type_dict = entity.get(resource_type, {}) + entity_id = resource_type_dict.get(resource_name, resource_type_dict).get(TF_PLAN_RESOURCE_ADDRESS) else: entity_id = definition_path[0] @@ -114,14 +115,14 @@ def get_entity_context( if not resource_type_dict: continue resource_name = definition_path[1] - resource_defintion = resource_type_dict.get(resource_name, {}) - if resource_defintion and resource_defintion.get(TF_PLAN_RESOURCE_ADDRESS) == entity_id: - entity_context['start_line'] = resource_defintion['start_line'][0] - entity_context['end_line'] = resource_defintion['end_line'][0] + resource_definition = resource_type_dict.get(resource_name, resource_type_dict) + if resource_definition and resource_definition.get(TF_PLAN_RESOURCE_ADDRESS) == entity_id: + entity_context['start_line'] = resource_definition['start_line'][0] + entity_context['end_line'] = resource_definition['end_line'][0] entity_context["code_lines"] = definitions_raw[full_file_path][ entity_context["start_line"] : entity_context["end_line"] ] - entity_context['address'] = resource_defintion[TF_PLAN_RESOURCE_ADDRESS] + entity_context['address'] = resource_definition[TF_PLAN_RESOURCE_ADDRESS] return entity_context return entity_context diff --git a/tests/terraform/graph/graph_builder/test_graph_builder.py b/tests/terraform/graph/graph_builder/test_graph_builder.py index 70f30e6a916..882ae9208d7 100644 --- a/tests/terraform/graph/graph_builder/test_graph_builder.py +++ b/tests/terraform/graph/graph_builder/test_graph_builder.py @@ -373,13 +373,13 @@ def test_build_rustworkx_graph(self): self.check_edge(graph, resource_node, var_region_node, 'region') self.check_edge(graph, provider_node, var_aws_profile_node, 'profile') self.check_edge(graph, local_node, var_bucket_name_node, 'bucket_name') - + def test_multiple_modules_with_connected_resources(self): valid_plan_path = os.path.realpath(os.path.join(TEST_DIRNAME, '../resources/modules_edges_tfplan/tfplan.json')) definitions, definitions_raw = create_definitions(root_folder=None, files=[valid_plan_path]) graph_manager = TerraformGraphManager(db_connector=RustworkxConnector()) tf_plan_local_graph = graph_manager.build_graph_from_definitions(definitions, render_variables=False) - self.assertTrue(tf_plan_local_graph.in_edges[2]) + self.assertTrue(tf_plan_local_graph.in_edges[3]) def build_new_key_for_tf_definition(key): diff --git a/tests/terraform/image_referencer/resources/aws/batch_tfplan.json b/tests/terraform/image_referencer/resources/aws/batch_tfplan.json index 1d94098dd2b..cf420499403 100644 --- a/tests/terraform/image_referencer/resources/aws/batch_tfplan.json +++ b/tests/terraform/image_referencer/resources/aws/batch_tfplan.json @@ -81,6 +81,9 @@ }, "region": { "constant_value": "us-west-2" + }, + "access_key": { + "constant_value": "AKIAIOSFODNN7EXAMPLE" } } } diff --git a/tests/terraform/image_referencer/test_plan_runner_aws_resources.py b/tests/terraform/image_referencer/test_plan_runner_aws_resources.py index b44e2b327f8..4516223952d 100644 --- a/tests/terraform/image_referencer/test_plan_runner_aws_resources.py +++ b/tests/terraform/image_referencer/test_plan_runner_aws_resources.py @@ -50,7 +50,7 @@ def test_apprunner_resources(mocker: MockerFixture, graph_framework): sca_image_report = next(report for report in reports if report.check_type == CheckType.SCA_IMAGE) assert len(tf_report.resources) == 1 - assert len(tf_report.passed_checks) == 0 + assert len(tf_report.passed_checks) == 1 assert len(tf_report.failed_checks) == 0 assert len(tf_report.skipped_checks) == 0 assert len(tf_report.parsing_errors) == 0 @@ -110,7 +110,7 @@ def test_batch_resources(mocker: MockerFixture, graph_framework): assert len(tf_report.resources) == 1 assert len(tf_report.passed_checks) == 1 - assert len(tf_report.failed_checks) == 0 + assert len(tf_report.failed_checks) == 1 assert len(tf_report.skipped_checks) == 0 assert len(tf_report.parsing_errors) == 0 @@ -151,7 +151,7 @@ def test_codebuild_resources(mocker: MockerFixture, graph_framework): sca_image_report = next(report for report in reports if report.check_type == CheckType.SCA_IMAGE) assert len(tf_report.resources) == 3 - assert len(tf_report.passed_checks) == 8 + assert len(tf_report.passed_checks) == 9 assert len(tf_report.failed_checks) == 2 assert len(tf_report.skipped_checks) == 0 assert len(tf_report.parsing_errors) == 0 @@ -195,7 +195,7 @@ def test_ecs_resources(mocker: MockerFixture, graph_framework): sca_image_report = next(report for report in reports if report.check_type == CheckType.SCA_IMAGE) assert len(tf_report.resources) == 1 - assert len(tf_report.passed_checks) == 4 + assert len(tf_report.passed_checks) == 5 assert len(tf_report.failed_checks) == 1 assert len(tf_report.skipped_checks) == 0 assert len(tf_report.parsing_errors) == 0 @@ -240,7 +240,7 @@ def test_lightsail_resources(mocker: MockerFixture, graph_framework): sca_image_report = next(report for report in reports if report.check_type == CheckType.SCA_IMAGE) assert len(tf_report.resources) == 2 - assert len(tf_report.passed_checks) == 0 + assert len(tf_report.passed_checks) == 1 assert len(tf_report.failed_checks) == 0 assert len(tf_report.skipped_checks) == 0 assert len(tf_report.parsing_errors) == 0 diff --git a/tests/terraform/parser/test_plan_parser.py b/tests/terraform/parser/test_plan_parser.py index bead3b186b3..ad7e343c2c0 100644 --- a/tests/terraform/parser/test_plan_parser.py +++ b/tests/terraform/parser/test_plan_parser.py @@ -26,8 +26,8 @@ def test_provider_is_included(self): valid_plan_path = current_dir + "/resources/plan_tags/tfplan.json" tf_definition, _ = parse_tf_plan(valid_plan_path, {}) file_provider_definition = tf_definition['provider'] - self.assertTrue(file_provider_definition) # assert a provider exists - assert file_provider_definition[0].get('aws',{}).get('region', None) == 'us-west-2' + self.assertTrue(file_provider_definition) # assert a provider exists + assert file_provider_definition[0].get('aws', {}).get('region', None) == ['us-west-2'] def test_more_tags_values_are_flattened(self): current_dir = os.path.dirname(os.path.realpath(__file__)) diff --git a/tests/terraform/runner/test_plan_runner.py b/tests/terraform/runner/test_plan_runner.py index be2513c186c..fdfcd858815 100644 --- a/tests/terraform/runner/test_plan_runner.py +++ b/tests/terraform/runner/test_plan_runner.py @@ -297,7 +297,7 @@ def test_runner_child_modules(self): self.assertEqual(report.get_exit_code({'soft_fail': True, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 0) self.assertEqual(report.get_summary()["failed"], 3) - self.assertEqual(report.get_summary()["passed"], 4) + self.assertEqual(report.get_summary()["passed"], 5) def test_runner_nested_child_modules(self): current_dir = os.path.dirname(os.path.realpath(__file__))