Skip to content

Commit

Permalink
fix(terraform): Don't pass existed resources in non_exists resource c…
Browse files Browse the repository at this point in the history
…hecks (#6653)

* Don't pass existed resources in non_exists resource checks
  • Loading branch information
inbalavital authored Aug 15, 2024
1 parent dd4786d commit a78d2b9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def __init__(self, resource_types: list[str]) -> None:
super().__init__(SolverType.RESOURCE)
self.resource_types = resource_types
self.vertices: list[dict[str, Any]] = []
self._passed_vertices: list[dict[str, Any]] = []
self._failed_vertices: list[dict[str, Any]] = []
self._unknown_vertices: list[dict[str, Any]] = []

@abstractmethod
def get_operation(self, resource_type: str) -> bool:
Expand All @@ -31,38 +34,30 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Callable[..., bool]:
# not needed
return lambda: True

@abstractmethod
def _handle_result(self, result: bool, data: dict[str, str]) -> None:
raise NotImplementedError()

def run(
self, graph_connector: LibraryGraph
self, graph_connector: LibraryGraph
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
executer = ThreadPoolExecutor()
jobs = []
passed_vertices: list[dict[str, Any]] = []
failed_vertices: list[dict[str, Any]] = []
unknown_vertices: list[dict[str, Any]] = []

if isinstance(graph_connector, DiGraph):
for _, data in graph_connector.nodes(data=True):
jobs.append(executer.submit(self._process_node, data, passed_vertices, failed_vertices, unknown_vertices))
jobs.append(executer.submit(self._process_node, data))

concurrent.futures.wait(jobs)
return passed_vertices, failed_vertices, unknown_vertices
return self._passed_vertices, self._failed_vertices, self._unknown_vertices

for _, data in graph_connector.nodes():
result = self.get_operation(resource_type=data.get(CustomAttributes.RESOURCE_TYPE))
if result:
passed_vertices.append(data)
else:
failed_vertices.append(data)
self._handle_result(result, data)

return passed_vertices, failed_vertices, []
return self._passed_vertices, self._failed_vertices, self._unknown_vertices

def _process_node(self, data: dict[str, str], passed_vartices: list[dict[str, Any]],
failed_vertices: list[dict[str, Any]], unknown_vertices: list[dict[str, Any]]) -> None:
def _process_node(self, data: dict[str, str]) -> None:
result = self.get_operation(data.get(CustomAttributes.RESOURCE_TYPE)) # type:ignore[arg-type]
# A None indicate for UNKNOWN result - the vertex shouldn't be added to the passed or the failed vertices
if result is None:
unknown_vertices.append(data)
elif result:
passed_vartices.append(data)
else:
failed_vertices.append(data)
self._handle_result(result, data)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations


from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.checks_infra.solvers.resource_solvers.base_resource_solver import BaseResourceSolver

Expand All @@ -10,3 +9,11 @@ class ExistsResourcerSolver(BaseResourceSolver):

def get_operation(self, resource_type: str | None) -> bool:
return resource_type in self.resource_types

def _handle_result(self, result: bool, data: dict[str, str]) -> None:
# The exists operator means that all resources that are not in the allowlist should fail,
# and existed resources that are in the allowlist should pass, as they are the only resources allowed
if result:
self._passed_vertices.append(data)
else:
self._failed_vertices.append(data)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

from typing import Any

from checkov.common.checks_infra.solvers.resource_solvers.exists_resource_solver import ExistsResourcerSolver
from checkov.common.graph.checks_infra.enums import Operators
Expand All @@ -9,5 +8,13 @@
class NotExistsResourcerSolver(ExistsResourcerSolver):
operator = Operators.NOT_EXISTS # noqa: CCE003 # a static attribute

def get_operation(self, *args: Any, **kwargs: Any) -> bool:
return not super().get_operation(*args, **kwargs)
def get_operation(self, resource_type: str | None) -> bool:
return not super().get_operation(resource_type)

def _handle_result(self, result: bool, data: dict[str, str]) -> None:
# The not_exists operator means that all resources that are not in the denylist should be ignored,
# and existed resources that are in the denylist should fail
if result:
self._unknown_vertices.append(data)
else:
self._failed_vertices.append(data)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


@parameterized_class([{"graph_framework": "NETWORKX"}, {"graph_framework": "IGRAPH"}])
class NotExistsSolver(TestBaseSolver):
class TestNotExistsSolver(TestBaseSolver):
def setUp(self):
self.checks_dir = str(TEST_DIRNAME)
super().setUp()
Expand All @@ -17,17 +17,12 @@ def test_deny_list(self):
# given
root_folder = TEST_DIRNAME.parents[2] / "resources/encryption_test"
check_id = "ResourceDenyList"
should_pass = [
"aws_rds_cluster.rds_cluster_encrypted",
"aws_rds_cluster.rds_cluster_unencrypted",
"aws_neptune_cluster.encrypted_neptune",
"aws_neptune_cluster.unencrypted_neptune",
]

should_fail = [
"aws_s3_bucket.encrypted_bucket",
"aws_s3_bucket.unencrypted_bucket",
]
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}
expected_results = {check_id: {"should_fail": should_fail}}

# when/then
self.run_test(root_folder=str(root_folder), expected_results=expected_results, check_id=check_id)
6 changes: 4 additions & 2 deletions tests/terraform/graph/checks_infra/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ def test_unrendered_variable_source(self):
def verify_report(report, expected_results):
for check_id in expected_results:
found = False
for resource in expected_results[check_id]['should_pass']:
should_pass_checks = expected_results[check_id].get('should_pass', [])
for resource in should_pass_checks:
for record in report.passed_checks:
if record.check_id == check_id and record.resource == resource:
found = True
break
if not found:
return f"expected resource {resource} to pass in check {check_id}"
found = False
for resource in expected_results[check_id]['should_fail']:
should_fail_checks = expected_results[check_id].get('should_fail', [])
for resource in should_fail_checks:
for record in report.failed_checks:
if record.check_id == check_id and record.resource == resource:
found = True
Expand Down

0 comments on commit a78d2b9

Please sign in to comment.