diff --git a/checkov/common/typing.py b/checkov/common/typing.py index 99d6ce0f76c..7a11261d56e 100644 --- a/checkov/common/typing.py +++ b/checkov/common/typing.py @@ -133,4 +133,3 @@ class _EntityContext(TypedDict, total=False): policy: str code_lines: list[tuple[int, str]] skipped_checks: list[_SkippedCheck] - origin_relative_path: str diff --git a/checkov/kubernetes/kubernetes_utils.py b/checkov/kubernetes/kubernetes_utils.py index 6b0710c03e6..5bb46e4d938 100644 --- a/checkov/kubernetes/kubernetes_utils.py +++ b/checkov/kubernetes/kubernetes_utils.py @@ -138,25 +138,34 @@ def build_definitions_context( resource_id = get_resource_id(resource) if not resource_id: continue - - relative_resource_path = None - if 'metadata' in resource: - metadata = resource['metadata'] - if 'annotations' in metadata and metadata['annotations'] is not None\ - and 'config.kubernetes.io/origin' in metadata['annotations']: - metadata_path = metadata['annotations']['config.kubernetes.io/origin'] - if 'path:' in metadata_path: - relative_resource_path = metadata_path.split('path:')[1].strip() - - resource_start_line = resource[START_LINE] - resource_end_line = min(resource[END_LINE], len(definitions_raw[file_path])) - raw_code = definitions_raw[file_path] - code_lines, start_line, end_line = calculate_code_lines(raw_code, resource_start_line, resource_end_line) + start_line = resource[START_LINE] + end_line = min(resource[END_LINE], len(definitions_raw[file_path])) + first_line_index = 0 + # skip empty lines + while not str.strip(definitions_raw[file_path][first_line_index][1]): + first_line_index += 1 + # check if the file is a json file + if str.strip(definitions_raw[file_path][first_line_index][1])[0] == "{": + start_line += 1 + end_line += 1 + else: + # add resource comments to definition lines + current_line = str.strip(definitions_raw[file_path][start_line - 1][1]) + while not current_line or current_line[0] == YAML_COMMENT_MARK: + start_line -= 1 + current_line = str.strip(definitions_raw[file_path][start_line - 1][1]) + + # remove next resource comments from definition lines + current_line = str.strip(definitions_raw[file_path][end_line - 1][1]) + while not current_line or current_line[0] == YAML_COMMENT_MARK: + end_line -= 1 + current_line = str.strip(definitions_raw[file_path][end_line - 1][1]) + + code_lines = definitions_raw[file_path][start_line - 1: end_line] dpath.new( definitions_context, [file_path, resource_id], - {"start_line": start_line, "end_line": end_line, "code_lines": code_lines, - "origin_relative_path": relative_resource_path}, + {"start_line": start_line, "end_line": end_line, "code_lines": code_lines}, ) skipped_checks = get_skipped_checks(resource) @@ -168,32 +177,6 @@ def build_definitions_context( return definitions_context -def calculate_code_lines(raw_code: list[tuple[int, str]], start_line: int, end_line: int) \ - -> tuple[list[tuple[int, str]], int, int]: - first_line_index = 0 - # skip empty lines - while not str.strip(raw_code[first_line_index][1]): - first_line_index += 1 - # check if the file is a json file - if str.strip(raw_code[first_line_index][1])[0] == "{": - start_line += 1 - end_line += 1 - else: - # add resource comments to definition lines - current_line = str.strip(raw_code[start_line - 1][1]) - while not current_line or current_line[0] == YAML_COMMENT_MARK: - start_line -= 1 - current_line = str.strip(raw_code[start_line - 1][1]) - - # remove next resource comments from definition lines - current_line = str.strip(raw_code[end_line - 1][1]) - while not current_line or current_line[0] == YAML_COMMENT_MARK: - end_line -= 1 - current_line = str.strip(raw_code[end_line - 1][1]) - code_lines = raw_code[start_line - 1: end_line] - return code_lines, start_line, end_line - - def is_invalid_k8_definition(definition: Dict[str, Any]) -> bool: return ( not isinstance(definition, dict) diff --git a/checkov/kubernetes/runner.py b/checkov/kubernetes/runner.py index 2c6d5e03da3..22cb7375f00 100644 --- a/checkov/kubernetes/runner.py +++ b/checkov/kubernetes/runner.py @@ -174,8 +174,7 @@ def check_definitions( # TODO? - Variable Eval Message! variable_evaluations: "dict[str, Any]" = {} - report = self.mutate_kubernetes_results(results, report, k8_file, k8_file_path, file_abs_path, - entity_conf, variable_evaluations, root_folder) + report = self.mutate_kubernetes_results(results, report, k8_file, k8_file_path, file_abs_path, entity_conf, variable_evaluations) self.pbar.update() self.pbar.close() return report @@ -195,7 +194,6 @@ def mutate_kubernetes_results( file_abs_path: str, entity_conf: dict[str, Any], variable_evaluations: dict[str, Any], - root_folder: str | None = None ) -> Report: # Moves report generation logic out of run() method in Runner class. # Allows function overriding of a much smaller function than run() for other "child" frameworks such as Kustomize, Helm diff --git a/checkov/kustomize/runner.py b/checkov/kustomize/runner.py index 6c2ad4ce086..39cbd46139f 100644 --- a/checkov/kustomize/runner.py +++ b/checkov/kustomize/runner.py @@ -21,11 +21,9 @@ from checkov.common.output.report import Report from checkov.common.bridgecrew.check_type import CheckType from checkov.common.runners.base_runner import BaseRunner, filter_ignored_paths -from checkov.common.typing import _CheckResult, _EntityContext -from checkov.common.util.consts import START_LINE, END_LINE +from checkov.common.typing import _CheckResult from checkov.common.util.data_structures_utils import pickle_deepcopy -from checkov.common.util.type_forcers import convert_str_to_bool -from checkov.kubernetes.kubernetes_utils import create_check_result, get_resource_id, calculate_code_lines +from checkov.kubernetes.kubernetes_utils import create_check_result, get_resource_id from checkov.kubernetes.runner import Runner as K8sRunner from checkov.kubernetes.runner import _get_entity_abs_path from checkov.kustomize.image_referencer.manager import KustomizeImageReferencerManager @@ -58,10 +56,6 @@ def __init__( self.original_root_dir: str = '' self.pbar.turn_off_progress_bar() - # Allows using kustomize commands to directly edit the user's kustomization.yaml configurations - self.checkov_allow_kustomize_file_edits = convert_str_to_bool(os.getenv("CHECKOV_ALLOW_KUSTOMIZE_FILE_EDITS", - False)) - def set_external_data( self, definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] | None, @@ -85,7 +79,6 @@ def mutate_kubernetes_results( file_abs_path: str, entity_conf: dict[str, Any], variable_evaluations: dict[str, Any], - root_folder: str | None = None ) -> Report: # Moves report generation logic out of checkov.kubernetes.runner.run() def. # Allows us to overriding report file information for "child" frameworks such as Kustomize, Helm @@ -117,10 +110,6 @@ def mutate_kubernetes_results( external_run_indicator = "Bc" file_path = realKustomizeEnvMetadata['filePath'] - - caller_file_path = None - caller_file_line_range = None - # means this scan originated in the platform if type(self.graph_manager).__name__.startswith(external_run_indicator): absolute_file_path = file_abs_path @@ -129,22 +118,15 @@ def mutate_kubernetes_results( # Fix file path to repo relative path if self.original_root_dir: repo_dir = str(pathlib.Path(self.original_root_dir).resolve()) - if realKustomizeEnvMetadata['filePath'].startswith(repo_dir): file_path = realKustomizeEnvMetadata['filePath'][len(repo_dir):] - if self.checkov_allow_kustomize_file_edits: - caller_file_line_range, caller_file_path = self._get_caller_file_info(entity_context, k8_file, - k8_file_path, resource_id, - root_folder) code_lines = entity_context.get("code_lines") file_line_range = self.line_range(code_lines) - record = Record( check_id=check.id, bc_check_id=check.bc_id, check_name=check.name, - check_result=check_result, code_block=code_lines, - file_path=file_path, file_line_range=file_line_range, - caller_file_path=caller_file_path, caller_file_line_range=caller_file_line_range, + check_result=check_result, code_block=code_lines, file_path=file_path, + file_line_range=file_line_range, resource=kustomizeResourceID, evaluations=variable_evaluations, check_class=check.__class__.__module__, file_abs_path=absolute_file_path, severity=check.severity) record.set_guideline(check.guideline) @@ -152,79 +134,6 @@ def mutate_kubernetes_results( return report - def _get_caller_file_info(self, entity_context: _EntityContext, k8_file: str, k8_file_path: str, resource_id: str, - root_folder: str | None) -> tuple[tuple[int, int] | None, str | None]: - origin_relative_path = entity_context.get('origin_relative_path') - if origin_relative_path is None: - return None, None - k8s_file_dir = pathlib.Path(k8_file_path).parent - raw_file_path = k8s_file_dir / origin_relative_path - caller_file_path = self._get_caller_file_path(k8s_file_dir, origin_relative_path, raw_file_path) - if root_folder is None: - return None, caller_file_path - caller_file_line_range = self._get_caller_line_range(root_folder, k8_file, origin_relative_path, - resource_id) - return caller_file_line_range, caller_file_path - - @staticmethod - def _get_caller_file_path(k8s_file_dir: pathlib.Path, origin_relative_path: str, raw_file_path: pathlib.Path)\ - -> str: - """ - Creates the correct file path based on the collection of metadata locations we have. - - Example for expected input: - - k8s_fil_dir - Path('/resources/image_referencer/overlays/prod') - - origin_relative_path - '../../base/deployment.yaml' - - raw_file_path - Path('/resources/image_referencer/overlays/prod/../../base/deployment.yaml') - """ - amount_of_parents = str.count(origin_relative_path, '..') - directory_prefix_path = k8s_file_dir - if amount_of_parents: - directory_prefix_path = k8s_file_dir.parents[amount_of_parents - 1] - - directory_prefix = str(directory_prefix_path) - resolved_path = str(raw_file_path.resolve()) - # Make sure the resolved path starts with the root folder, as pathlib.Path.resolve() might change it - if directory_prefix in resolved_path and not resolved_path.startswith(directory_prefix): - resolved_path = K8sKustomizeRunner._remove_extra_path_parts(resolved_path, directory_prefix) - - return resolved_path[len(str(directory_prefix)):] - - @staticmethod - def _remove_extra_path_parts(resolved_path: str, prefix: str) -> str: - """ - Some pathlib paths can "add" extra arguments at the beginning after running `Path.resolve()`. - For example, running `Path('/var/example.txt').resolve` might result in `//var/example.txt`. - The purpose of this function is to remove any unintentional additions like this one. - """ - resolved_path_parts = resolved_path.split(prefix) - if len(resolved_path_parts) > 1: - resolved_path = f'{prefix}{"".join(resolved_path_parts[1:])}' - else: - resolved_path = f'{prefix}{"".join(resolved_path_parts)}' - return resolved_path - - def _get_caller_line_range(self, root_folder: str, k8_file: str, origin_relative_path: str, - resource_id: str) -> tuple[int, int] | None: - raw_caller_directory = (pathlib.Path(k8_file.lstrip(os.path.sep)).parent / - pathlib.Path(origin_relative_path.lstrip(os.path.sep)).parent) - caller_directory = str(pathlib.Path(f'{os.path.sep}{raw_caller_directory}').resolve()) - caller_directory = K8sKustomizeRunner._remove_extra_path_parts(caller_directory, root_folder) - file_ending = pathlib.Path(origin_relative_path).suffix - caller_file_path = f'{str(pathlib.Path(caller_directory) / resource_id.replace(".", "-"))}{file_ending}' - - if caller_file_path not in self.definitions: - return None - caller_resource = self.definitions[caller_file_path][0] - raw_caller_resource = self.definitions_raw[caller_file_path] - - caller_raw_start_line = caller_resource[START_LINE] - caller_raw_end_line = min(caller_resource[END_LINE], len(raw_caller_resource)) - - _, caller_start_line, caller_end_line = calculate_code_lines(raw_caller_resource, caller_raw_start_line, - caller_raw_end_line) - return (caller_start_line, caller_end_line) - def line_range(self, code_lines: list[tuple[int, str]]) -> list[int]: num_of_lines = len(code_lines) file_line_range = [0, 0] @@ -266,14 +175,6 @@ def mutate_kubernetes_graph_results( else: logging.warning(f"couldn't find {entity_file_abs_path} path in kustomizeFileMappings") continue - - caller_file_path = None - caller_file_line_range = None - if self.checkov_allow_kustomize_file_edits: - caller_file_line_range, caller_file_path = self._get_caller_file_info(entity_context, - entity_file_path, - entity_file_path, entity_id, - root_folder) code_lines = entity_context["code_lines"] file_line_range = self.line_range(code_lines) @@ -286,8 +187,6 @@ def mutate_kubernetes_graph_results( code_block=code_lines, file_path=realKustomizeEnvMetadata['filePath'], file_line_range=file_line_range, - caller_file_path=caller_file_path, - caller_file_line_range=caller_file_line_range, resource=kustomizeResourceID, # entity.get(CustomAttributes.ID), evaluations={}, check_class=check.__class__.__module__, @@ -340,9 +239,6 @@ def __init__(self) -> None: self.templateRendererCommand: str | None = None self.target_folder_path = '' - self.checkov_allow_kustomize_file_edits = convert_str_to_bool(os.getenv("CHECKOV_ALLOW_KUSTOMIZE_FILE_EDITS", - False)) - def get_k8s_target_folder_path(self) -> str: return self.target_folder_path @@ -379,21 +275,8 @@ def _parseKustomization(self, kustomize_dir: str) -> dict[str, Any]: return {} if 'resources' in file_content: - resources = file_content['resources'] - - # We can differentiate between "overlays" and "bases" based on if the `resources` refers to a directory, - # which represents an "overlay", or only files which represents a "base" - resources_representing_directories = [r for r in resources if pathlib.Path(r).suffix == ''] - if resources_representing_directories: - logging.debug( - f"Kustomization contains resources: section with directories. Likely an overlay/env." - f" {kustomization_path}") - metadata['type'] = "overlay" - metadata['referenced_bases'] = resources_representing_directories - else: - logging.debug(f"Kustomization contains resources: section with only files (no dirs). Likley a base." - f" {kustomization_path}") - metadata['type'] = "base" + logging.debug(f"Kustomization contains resources: section. Likley a base. {kustomization_path}") + metadata['type'] = "base" elif 'patchesStrategicMerge' in file_content: logging.debug(f"Kustomization contains patchesStrategicMerge: section. Likley an overlay/env. {kustomization_path}") @@ -523,29 +406,15 @@ def _get_parsed_output( line_num += 1 return cur_writer - def _get_kubectl_output(self, filePath: str, template_renderer_command: str, source_type: str | None) -> bytes: + @staticmethod + def _get_kubectl_output(filePath: str, template_renderer_command: str, source_type: str | None) -> bytes: # Template out the Kustomizations to Kubernetes YAML if template_renderer_command == "kubectl": template_render_command_options = "kustomize" if template_renderer_command == "kustomize": template_render_command_options = "build" - - add_origin_annotations_return_code = None - - if self.checkov_allow_kustomize_file_edits: - add_origin_annotations_command = 'kustomize edit add buildmetadata originAnnotations' - add_origin_annotations_return_code = subprocess.run(add_origin_annotations_command.split(' '), # nosec - cwd=filePath).returncode - - full_command = f'{template_renderer_command} {template_render_command_options}' - proc = subprocess.Popen(full_command.split(' '), cwd=filePath, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # nosec + proc = subprocess.Popen([template_renderer_command, template_render_command_options, filePath], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # nosec output, _ = proc.communicate() - - if self.checkov_allow_kustomize_file_edits and add_origin_annotations_return_code == 0: - # If the return code is not 0, we didn't add the new buildmetadata field, so we shouldn't remove it - remove_origin_annotaions = 'kustomize edit remove buildmetadata originAnnotations' - subprocess.run(remove_origin_annotaions.split(' '), cwd=filePath) # nosec - logging.info( f"Ran kubectl to build Kustomize output. DIR: {filePath}. TYPE: {source_type}.") return output @@ -561,7 +430,7 @@ def _get_env_or_base_path_prefix( env_or_base_path_prefix = "" else: base_path_parts = pathlib.Path(kustomize_processed_folder_and_meta[file_path]['calculated_bases']).parts - most_significant_base_path = f"/{base_path_parts[-3]}/{base_path_parts[-2]}" + most_significant_base_path = f"/{base_path_parts[-3]}/{base_path_parts[-2]}/{base_path_parts[-1]}" env_or_base_path_prefix = f"{most_significant_base_path}/{kustomize_processed_folder_and_meta[file_path]['overlay_name']}" elif kustomize_processed_folder_and_meta[file_path].get('type') == "base": @@ -572,8 +441,8 @@ def _get_env_or_base_path_prefix( return env_or_base_path_prefix + @staticmethod def get_binary_output( - self, file_path: str, kustomize_processed_folder_and_meta: dict[str, dict[str, Any]], template_renderer_command: str, @@ -581,7 +450,7 @@ def get_binary_output( source_type = kustomize_processed_folder_and_meta[file_path].get('type') logging.debug(f"Kustomization at {file_path} likley a {source_type}") try: - output = self._get_kubectl_output(file_path, template_renderer_command, source_type) + output = Runner._get_kubectl_output(file_path, template_renderer_command, source_type) return output, file_path except Exception: logging.warning(f"Error building Kustomize output at dir: {file_path}.", exc_info=True) @@ -609,15 +478,15 @@ def _parse_output( if cur_writer: Runner._curWriterValidateStoreMapAndClose(cur_writer, file_path, shared_kustomize_file_mappings) + @staticmethod def _run_kustomize_parser( - self, file_path: str, shared_kustomize_file_mappings: dict[str, str], kustomize_processed_folder_and_meta: dict[str, dict[str, Any]], template_renderer_command: str, target_folder_path: str, ) -> None: - output, _ = self.get_binary_output(file_path, kustomize_processed_folder_and_meta, template_renderer_command) + output, _ = Runner.get_binary_output(file_path, kustomize_processed_folder_and_meta, template_renderer_command) if not output: return Runner._parse_output(output, file_path, kustomize_processed_folder_and_meta, target_folder_path, shared_kustomize_file_mappings) @@ -644,7 +513,7 @@ def run_kustomize_to_k8s( shared_kustomize_file_mappings: dict[str, str] = {} for file_path in self.kustomizeProcessedFolderAndMeta: - self._run_kustomize_parser( + Runner._run_kustomize_parser( file_path=file_path, shared_kustomize_file_mappings=shared_kustomize_file_mappings, kustomize_processed_folder_and_meta=self.kustomizeProcessedFolderAndMeta, @@ -658,11 +527,10 @@ def run_kustomize_to_k8s( # make sure we have new dict shared_kustomize_file_mappings = pickle_deepcopy(manager.dict()) # type:ignore[arg-type] # works with DictProxy shared_kustomize_file_mappings.clear() - jobs = [] for filePath in self.kustomizeProcessedFolderAndMeta: p = multiprocessing.Process( - target=self._run_kustomize_parser, + target=Runner._run_kustomize_parser, args=( filePath, shared_kustomize_file_mappings, @@ -736,7 +604,6 @@ def _curWriterValidateStoreMapAndClose( with open(currentFileName) as f: currentYamlObject = yaml.safe_load(f) # Validate we have a K8S manifest - if "apiVersion" in currentYamlObject: itemName = [] itemName.append(currentYamlObject['kind']) diff --git a/tests/kustomize/test_runner.py b/tests/kustomize/test_runner.py index 7acb3728398..773b701e3dc 100644 --- a/tests/kustomize/test_runner.py +++ b/tests/kustomize/test_runner.py @@ -1,5 +1,4 @@ import os -import mock import unittest from pathlib import Path @@ -10,18 +9,6 @@ from tests.kustomize.utils import kustomize_exists -def _setup_test_under_example(): - current_dir = os.path.dirname(os.path.realpath(__file__)) - scan_dir_path = os.path.join(current_dir, "runner", "resources", "example") - # this is the relative path to the directory to scan (what would actually get passed to the -d arg) - dir_rel_path = os.path.relpath(scan_dir_path).replace('\\', '/') - runner = Runner() - runner.templateRendererCommand = "kustomize" - runner.templateRendererCommandOptions = "build" - checks_allowlist = ['CKV_K8S_37'] - return checks_allowlist, dir_rel_path, runner - - class TestRunnerValid(unittest.TestCase): @unittest.skipIf(os.name == "nt" or not kustomize_exists(), "kustomize not installed or Windows OS") def test_runner_honors_enforcement_rules(self): @@ -51,25 +38,17 @@ def test_record_relative_path_with_relative_dir(self): # test whether the record's repo_file_path is correct, relative to the CWD (with a / at the start). # this is just constructing the scan dir as normal - checks_allowlist, dir_rel_path, runner = _setup_test_under_example() - report = runner.run(root_folder=dir_rel_path, external_checks_dir=None, - runner_filter=RunnerFilter(framework=['kustomize'], checks=checks_allowlist)) - - all_checks = report.failed_checks + report.passed_checks - self.assertGreater(len(all_checks), 0) # ensure that the assertions below are going to do something - for record in all_checks: - self.assertIn(record.file_path, record.file_abs_path) - self.assertEqual(record.repo_file_path, f'/{dir_rel_path}{record.file_path}') - assert record.file_path.startswith(('/base', '/overlays')) + current_dir = os.path.dirname(os.path.realpath(__file__)) + scan_dir_path = os.path.join(current_dir, "runner", "resources", "example") - @unittest.skipIf(os.name == "nt" or not kustomize_exists(), "kustomize not installed or Windows OS") - def test_record_relative_path_with_relative_dir_with_origin_annotations(self): - # test whether the record's repo_file_path is correct, relative to the CWD (with a / at the start). + # this is the relative path to the directory to scan (what would actually get passed to the -d arg) + dir_rel_path = os.path.relpath(scan_dir_path).replace('\\', '/') - # this is just constructing the scan dir as normal - with mock.patch.dict(os.environ, {"CHECKOV_ALLOW_KUSTOMIZE_FILE_EDITS": "True"}): - checks_allowlist, dir_rel_path, runner = _setup_test_under_example() - report = runner.run(root_folder=dir_rel_path, external_checks_dir=None, + runner = Runner() + runner.templateRendererCommand = "kustomize" + runner.templateRendererCommandOptions = "build" + checks_allowlist = ['CKV_K8S_37'] + report = runner.run(root_folder=dir_rel_path, external_checks_dir=None, runner_filter=RunnerFilter(framework=['kustomize'], checks=checks_allowlist)) all_checks = report.failed_checks + report.passed_checks @@ -78,8 +57,6 @@ def test_record_relative_path_with_relative_dir_with_origin_annotations(self): self.assertIn(record.file_path, record.file_abs_path) self.assertEqual(record.repo_file_path, f'/{dir_rel_path}{record.file_path}') assert record.file_path.startswith(('/base', '/overlays')) - assert record.caller_file_path == '/base/deployment.yaml' or record.caller_file_path == '/deployment.yaml' - assert record.caller_file_line_range == (2, 24) @unittest.skipIf(os.name == "nt" or not kustomize_exists(), "kustomize not installed or Windows OS") def test_record_relative_path_with_direct_oberlay(self): @@ -130,7 +107,6 @@ def test_record_relative_path_with_direct_prod2_oberlay(self): self.assertNotEqual(record.file_path, record.file_abs_path) self.assertIn(record.file_path, record.file_abs_path) self.assertEqual(record.repo_file_path, f'/{dir_rel_path}{record.file_path}') - @unittest.skipIf(os.name == "nt" or not kustomize_exists(), "kustomize not installed or Windows OS") def test_no_file_type_exists(self): diff --git a/tests/kustomize/test_runner_image_referencer.py b/tests/kustomize/test_runner_image_referencer.py index 416b47395c3..2a322294eef 100644 --- a/tests/kustomize/test_runner_image_referencer.py +++ b/tests/kustomize/test_runner_image_referencer.py @@ -3,7 +3,6 @@ import os from pathlib import Path -import mock import pytest from pytest_mock import MockerFixture @@ -21,16 +20,14 @@ @pytest.mark.skipif(os.name == "nt" or not kustomize_exists(), reason="kustomize not installed or Windows OS") -@pytest.mark.parametrize("allow_kustomize_file_edits, code_lines", [ - (True, "18-34"), - (False, "15-31") -]) -def test_deployment_resources(mocker: MockerFixture, allow_kustomize_file_edits: bool, code_lines: str): +def test_deployment_resources(mocker: MockerFixture): from checkov.common.bridgecrew.platform_integration import bc_integration # given + file_name = "kustomization.yaml" image_name = "wordpress:4.8-apache" - test_folder = RESOURCES_PATH / "image_referencer" + code_lines = "15-31" + test_folder = RESOURCES_PATH / "image_referencer/overlays/prod" runner_filter = RunnerFilter(run_image_referencer=True) bc_integration.bc_source = get_source_type("disabled") @@ -44,12 +41,10 @@ def test_deployment_resources(mocker: MockerFixture, allow_kustomize_file_edits: ) # when - - with mock.patch.dict(os.environ, {"CHECKOV_ALLOW_KUSTOMIZE_FILE_EDITS": str(allow_kustomize_file_edits)}): - runner = Runner() - runner.templateRendererCommand = "kustomize" - runner.templateRendererCommandOptions = "build" - reports = runner.run(root_folder=str(test_folder), runner_filter=runner_filter) + runner = Runner() + runner.templateRendererCommand = "kustomize" + runner.templateRendererCommandOptions = "build" + reports = runner.run(root_folder=str(test_folder), runner_filter=runner_filter) # then assert len(reports) == 2 @@ -57,27 +52,21 @@ def test_deployment_resources(mocker: MockerFixture, allow_kustomize_file_edits: kustomize_report = next(report for report in reports if report.check_type == CheckType.KUSTOMIZE) sca_image_report = next(report for report in reports if report.check_type == CheckType.SCA_IMAGE) - assert len(kustomize_report.resources) == 6 - assert len(kustomize_report.passed_checks) == 136 - assert len(kustomize_report.failed_checks) == 42 + assert len(kustomize_report.resources) == 3 + assert len(kustomize_report.passed_checks) == 68 + assert len(kustomize_report.failed_checks) == 21 assert len(kustomize_report.skipped_checks) == 0 assert len(kustomize_report.parsing_errors) == 0 - if allow_kustomize_file_edits: - for record in kustomize_report.failed_checks: - assert record.caller_file_path in ['/base/deployment.yaml', '/base/service.yaml', '/deployment.yaml', - '/service.yaml'] - - assert len(sca_image_report.resources) == 2 + assert len(sca_image_report.resources) == 1 assert sca_image_report.resources == { - f'base/kustomization.yaml (wordpress:4.8-apache lines:{code_lines} (sha256:2460522297)).go', - f'overlays/prod/kustomization.yaml (wordpress:4.8-apache lines:{code_lines} (sha256:2460522297)).go' + f"{file_name} ({image_name} lines:{code_lines} (sha256:2460522297)).go", } assert len(sca_image_report.passed_checks) == 0 - assert len(sca_image_report.failed_checks) == 6 + assert len(sca_image_report.failed_checks) == 3 assert len(sca_image_report.skipped_checks) == 0 assert len(sca_image_report.parsing_errors) == 0 - assert len(sca_image_report.image_cached_results) == 2 + assert len(sca_image_report.image_cached_results) == 1 assert sca_image_report.image_cached_results[0]["dockerImageName"] == image_name assert (