From 9c33b3f5a9a9342aa676beab05025ac121f2e8b8 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Fri, 28 Feb 2025 15:39:48 +0545 Subject: [PATCH] refactor(stats): Use Finding instead of Check_Report (#7053) Co-authored-by: pedrooot --- prowler/__main__.py | 26 ++-- prowler/lib/outputs/outputs.py | 51 ++++--- tests/lib/outputs/outputs_test.py | 241 ++++++++++++++++++++---------- 3 files changed, 203 insertions(+), 115 deletions(-) diff --git a/prowler/__main__.py b/prowler/__main__.py index f3643deb8a..bd36894506 100644 --- a/prowler/__main__.py +++ b/prowler/__main__.py @@ -305,8 +305,20 @@ def prowler(): print(f"{Style.BRIGHT}{Fore.GREEN}\nNo findings to fix!{Style.RESET_ALL}\n") sys.exit() + # Outputs + # TODO: this part is needed since the checks generates a Check_Report_XXX and the output uses Finding + # This will be refactored for the outputs generate directly the Finding + finding_outputs = [] + for finding in findings: + try: + finding_outputs.append( + Finding.generate_output(global_provider, finding, output_options) + ) + except Exception: + continue + # Extract findings stats - stats = extract_findings_statistics(findings) + stats = extract_findings_statistics(finding_outputs) if args.slack: # TODO: this should be also in a config file @@ -329,18 +341,6 @@ def prowler(): ) sys.exit(1) - # Outputs - # TODO: this part is needed since the checks generates a Check_Report_XXX and the output uses Finding - # This will be refactored for the outputs generate directly the Finding - finding_outputs = [] - for finding in findings: - try: - finding_outputs.append( - Finding.generate_output(global_provider, finding, output_options) - ) - except Exception: - continue - generated_outputs = {"regular": [], "compliance": []} if args.output_formats: diff --git a/prowler/lib/outputs/outputs.py b/prowler/lib/outputs/outputs.py index b4699157d7..dfd37429dd 100644 --- a/prowler/lib/outputs/outputs.py +++ b/prowler/lib/outputs/outputs.py @@ -1,7 +1,10 @@ from colorama import Fore, Style from prowler.config.config import orange_color +from prowler.lib.check.models import Severity from prowler.lib.logger import logger +from prowler.lib.outputs.common import Status +from prowler.lib.outputs.finding import Finding def stdout_report(finding, color, verbose, status, fix): @@ -89,7 +92,7 @@ def set_report_color(status: str, muted: bool = False) -> str: return color -def extract_findings_statistics(findings: list) -> dict: +def extract_findings_statistics(findings: list[Finding]) -> dict: """ extract_findings_statistics takes a list of findings and returns the following dict with the aggregated statistics { @@ -121,38 +124,46 @@ def extract_findings_statistics(findings: list) -> dict: medium_severity_fail = 0 low_severity_pass = 0 low_severity_fail = 0 + informational_severity_pass = 0 + informational_severity_fail = 0 for finding in findings: - # Save the resource_id - resources.add(finding.resource_id) + resources.add(finding.resource_uid) - if finding.status == "PASS": - if finding.check_metadata.Severity == "critical": + if finding.status == Status.PASS: + findings_count += 1 + total_pass += 1 + if finding.metadata.Severity == Severity.critical: critical_severity_pass += 1 - if finding.check_metadata.Severity == "high": + if finding.metadata.Severity == Severity.high: high_severity_pass += 1 - if finding.check_metadata.Severity == "medium": + if finding.metadata.Severity == Severity.medium: medium_severity_pass += 1 - if finding.check_metadata.Severity == "low": + if finding.metadata.Severity == Severity.low: low_severity_pass += 1 - total_pass += 1 - findings_count += 1 + if finding.metadata.Severity == Severity.informational: + informational_severity_pass += 1 + if finding.muted is True: muted_pass += 1 - if finding.status == "FAIL": - if finding.check_metadata.Severity == "critical": + if finding.status == Status.FAIL: + findings_count += 1 + total_fail += 1 + if finding.metadata.Severity == Severity.critical: critical_severity_fail += 1 - if finding.check_metadata.Severity == "high": + if finding.metadata.Severity == Severity.high: high_severity_fail += 1 - if finding.check_metadata.Severity == "medium": + if finding.metadata.Severity == Severity.medium: medium_severity_fail += 1 - if finding.check_metadata.Severity == "low": + if finding.metadata.Severity == Severity.low: low_severity_fail += 1 - total_fail += 1 - findings_count += 1 + if finding.metadata.Severity == Severity.informational: + informational_severity_fail += 1 + if finding.muted is True: muted_fail += 1 + if not finding.muted and all_fails_are_muted: all_fails_are_muted = False @@ -168,8 +179,10 @@ def extract_findings_statistics(findings: list) -> dict: stats["total_high_severity_pass"] = high_severity_pass stats["total_medium_severity_fail"] = medium_severity_fail stats["total_medium_severity_pass"] = medium_severity_pass - stats["total_low_severity_fail"] = medium_severity_fail - stats["total_low_severity_pass"] = medium_severity_pass + stats["total_low_severity_fail"] = low_severity_fail + stats["total_low_severity_pass"] = low_severity_pass + stats["total_informational_severity_pass"] = informational_severity_pass + stats["total_informational_severity_fail"] = informational_severity_fail stats["all_fails_are_muted"] = all_fails_are_muted return stats diff --git a/tests/lib/outputs/outputs_test.py b/tests/lib/outputs/outputs_test.py index dbf7d5c8bd..f61f28d1ca 100644 --- a/tests/lib/outputs/outputs_test.py +++ b/tests/lib/outputs/outputs_test.py @@ -18,6 +18,7 @@ unroll_list, unroll_tags, ) +from tests.lib.outputs.fixtures.fixtures import generate_finding_output class TestOutputs: @@ -246,65 +247,121 @@ def test_parse_json_tags(self): assert parse_json_tags([{}]) == {} assert parse_json_tags(None) == {} + +class TestExtractFindingStats: def test_extract_findings_statistics_different_resources(self): - finding_1 = mock.MagicMock() - finding_1.status = "PASS" - finding_1.resource_id = "test_resource_1" - finding_2 = mock.MagicMock() - finding_2.status = "FAIL" - finding_2.resource_id = "test_resource_2" + finding_1 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="critical", + muted=False, + ) + finding_2 = generate_finding_output( + status="FAIL", + resource_uid="test_resource_2", + severity="critical", + muted=False, + ) + findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 1 - assert stats["total_fail"] == 1 assert stats["total_muted_pass"] == 0 + assert stats["total_fail"] == 1 assert stats["total_muted_fail"] == 0 assert stats["resources_count"] == 2 assert stats["findings_count"] == 2 + assert stats["total_critical_severity_fail"] == 1 + assert stats["total_critical_severity_pass"] == 1 + assert stats["total_high_severity_fail"] == 0 + assert stats["total_high_severity_pass"] == 0 + assert stats["total_medium_severity_fail"] == 0 + assert stats["total_medium_severity_pass"] == 0 + assert stats["total_low_severity_fail"] == 0 + assert stats["total_low_severity_pass"] == 0 + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is False def test_extract_findings_statistics_same_resources(self): - finding_1 = mock.MagicMock() - finding_1.status = "PASS" - finding_1.resource_id = "test_resource_1" - finding_2 = mock.MagicMock() - finding_2.status = "PASS" - finding_2.resource_id = "test_resource_1" + finding_1 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="critical", + muted=False, + ) + finding_2 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="critical", + muted=False, + ) + findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 2 - assert stats["total_fail"] == 0 assert stats["total_muted_pass"] == 0 + assert stats["total_fail"] == 0 assert stats["total_muted_fail"] == 0 assert stats["resources_count"] == 1 assert stats["findings_count"] == 2 + assert stats["total_critical_severity_fail"] == 0 + assert stats["total_critical_severity_pass"] == 2 + assert stats["total_high_severity_fail"] == 0 + assert stats["total_high_severity_pass"] == 0 + assert stats["total_medium_severity_fail"] == 0 + assert stats["total_medium_severity_pass"] == 0 + assert stats["total_low_severity_fail"] == 0 + assert stats["total_low_severity_pass"] == 0 + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is True def test_extract_findings_statistics_manual_resources(self): - finding_1 = mock.MagicMock() - finding_1.status = "MANUAL" - finding_1.resource_id = "test_resource_1" - finding_2 = mock.MagicMock() - finding_2.status = "PASS" - finding_2.resource_id = "test_resource_1" + finding_1 = generate_finding_output( + status="MANUAL", + resource_uid="test_resource_1", + severity="critical", + muted=False, + ) + finding_2 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="critical", + muted=False, + ) + findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 1 - assert stats["total_fail"] == 0 assert stats["total_muted_pass"] == 0 + assert stats["total_fail"] == 0 assert stats["total_muted_fail"] == 0 assert stats["resources_count"] == 1 assert stats["findings_count"] == 1 + assert stats["total_critical_severity_fail"] == 0 + assert stats["total_critical_severity_pass"] == 1 + assert stats["total_high_severity_fail"] == 0 + assert stats["total_high_severity_pass"] == 0 + assert stats["total_medium_severity_fail"] == 0 + assert stats["total_medium_severity_pass"] == 0 + assert stats["total_low_severity_fail"] == 0 + assert stats["total_low_severity_pass"] == 0 + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is True def test_extract_findings_statistics_no_findings(self): - findings = [] - - stats = extract_findings_statistics(findings) + stats = extract_findings_statistics([]) assert stats["total_pass"] == 0 - assert stats["total_fail"] == 0 assert stats["total_muted_pass"] == 0 + assert stats["total_fail"] == 0 assert stats["total_muted_fail"] == 0 + assert stats["resources_count"] == 0 + assert stats["findings_count"] == 0 assert stats["total_critical_severity_fail"] == 0 assert stats["total_critical_severity_pass"] == 0 assert stats["total_high_severity_fail"] == 0 @@ -313,28 +370,30 @@ def test_extract_findings_statistics_no_findings(self): assert stats["total_medium_severity_pass"] == 0 assert stats["total_low_severity_fail"] == 0 assert stats["total_low_severity_pass"] == 0 - assert stats["resources_count"] == 0 - assert stats["findings_count"] == 0 + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is True def test_extract_findings_statistics_all_fail_are_muted(self): - finding_1 = mock.MagicMock() - finding_1.status = "FAIL" - finding_1.muted = True - finding_1.resource_id = "test_resource_1" - finding_1.check_metadata.Severity = "medium" - finding_1.check_metadata.CheckID = "glue_etl_jobs_amazon_s3_encryption_enabled" - finding_2 = mock.MagicMock() - finding_2.status = "FAIL" - finding_2.muted = True - finding_2.resource_id = "test_resource_2" - finding_2.check_metadata.Severity = "low" - finding_2.check_metadata.CheckID = "lightsail_static_ip_unused" + finding_1 = generate_finding_output( + status="FAIL", + resource_uid="test_resource_1", + severity="medium", + muted=True, + ) + finding_2 = generate_finding_output( + status="FAIL", + resource_uid="test_resource_2", + severity="low", + muted=True, + ) + findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 0 - assert stats["total_fail"] == 2 assert stats["total_muted_pass"] == 0 + assert stats["total_fail"] == 2 assert stats["total_muted_fail"] == 2 assert stats["resources_count"] == 2 assert stats["findings_count"] == 2 @@ -346,31 +405,33 @@ def test_extract_findings_statistics_all_fail_are_muted(self): assert stats["total_medium_severity_pass"] == 0 assert stats["total_low_severity_fail"] == 1 assert stats["total_low_severity_pass"] == 0 - assert stats["all_fails_are_muted"] + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is True def test_extract_findings_statistics_all_fail_are_not_muted(self): - finding_1 = mock.MagicMock() - finding_1.status = "FAIL" - finding_1.muted = True - finding_1.resource_id = "test_resource_1" - finding_1.check_metadata.Severity = "critical" - finding_1.check_metadata.CheckID = "rds_instance_certificate_expiration" - finding_2 = mock.MagicMock() - finding_2.status = "FAIL" - finding_2.muted = False - finding_2.resource_id = "test_resource_1" - finding_2.check_metadata.Severity = "critical" - finding_2.check_metadata.CheckID = ( - "autoscaling_find_secrets_ec2_launch_configuration" + finding_1 = generate_finding_output( + status="FAIL", + resource_uid="test_resource_1", + severity="critical", + muted=True, + ) + finding_2 = generate_finding_output( + status="FAIL", + resource_uid="test_resource_1", + severity="critical", + muted=False, ) + findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 0 - assert stats["total_fail"] == 2 assert stats["total_muted_pass"] == 0 + assert stats["total_fail"] == 2 assert stats["total_muted_fail"] == 1 assert stats["resources_count"] == 1 + assert stats["findings_count"] == 2 assert stats["total_critical_severity_fail"] == 2 assert stats["total_critical_severity_pass"] == 0 assert stats["total_high_severity_fail"] == 0 @@ -379,33 +440,33 @@ def test_extract_findings_statistics_all_fail_are_not_muted(self): assert stats["total_medium_severity_pass"] == 0 assert stats["total_low_severity_fail"] == 0 assert stats["total_low_severity_pass"] == 0 - assert stats["findings_count"] == 2 - assert not stats["all_fails_are_muted"] + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is False def test_extract_findings_statistics_all_passes_are_not_muted(self): - finding_1 = mock.MagicMock() - finding_1.status = "PASS" - finding_1.muted = True - finding_1.resource_id = "test_resource_1" - finding_1.check_metadata.Severity = "critical" - finding_1.check_metadata.CheckID = ( - "autoscaling_find_secrets_ec2_launch_configuration" + finding_1 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="critical", + muted=True, + ) + finding_2 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="high", + muted=False, ) - finding_2 = mock.MagicMock() - finding_2.status = "PASS" - finding_2.muted = False - finding_2.resource_id = "test_resource_1" - finding_2.check_metadata.Severity = "high" - finding_2.check_metadata.CheckID = "acm_certificates_expiration_check" findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 2 - assert stats["total_fail"] == 0 assert stats["total_muted_pass"] == 1 + assert stats["total_fail"] == 0 assert stats["total_muted_fail"] == 0 assert stats["resources_count"] == 1 + assert stats["findings_count"] == 2 assert stats["total_critical_severity_fail"] == 0 assert stats["total_critical_severity_pass"] == 1 assert stats["total_high_severity_fail"] == 0 @@ -414,33 +475,47 @@ def test_extract_findings_statistics_all_passes_are_not_muted(self): assert stats["total_medium_severity_pass"] == 0 assert stats["total_low_severity_fail"] == 0 assert stats["total_low_severity_pass"] == 0 - assert stats["findings_count"] == 2 + assert stats["total_informational_severity_fail"] == 0 + assert stats["total_informational_severity_pass"] == 0 + assert stats["all_fails_are_muted"] is True def test_extract_findings_statistics_all_passes_are_muted(self): - finding_1 = mock.MagicMock() - finding_1.status = "PASS" - finding_1.muted = True - finding_1.check_metadata.Severity = "critical" - finding_1.check_metadata.CheckID = "rds_instance_certificate_expiration" - finding_1.resource_id = "test_resource_1" - findings = [finding_1] + finding_1 = generate_finding_output( + status="PASS", + resource_uid="test_resource_1", + severity="informational", + muted=True, + ) + finding_2 = generate_finding_output( + status="FAIL", + resource_uid="test_resource_1", + severity="informational", + muted=True, + ) + + findings = [finding_1, finding_2] stats = extract_findings_statistics(findings) assert stats["total_pass"] == 1 - assert stats["total_fail"] == 0 assert stats["total_muted_pass"] == 1 - assert stats["total_muted_fail"] == 0 + assert stats["total_fail"] == 1 + assert stats["total_muted_fail"] == 1 assert stats["resources_count"] == 1 + assert stats["findings_count"] == 2 assert stats["total_critical_severity_fail"] == 0 - assert stats["total_critical_severity_pass"] == 1 + assert stats["total_critical_severity_pass"] == 0 assert stats["total_high_severity_fail"] == 0 assert stats["total_high_severity_pass"] == 0 assert stats["total_medium_severity_fail"] == 0 assert stats["total_medium_severity_pass"] == 0 assert stats["total_low_severity_fail"] == 0 assert stats["total_low_severity_pass"] == 0 - assert stats["findings_count"] == 1 + assert stats["total_informational_severity_fail"] == 1 + assert stats["total_informational_severity_pass"] == 1 + assert stats["all_fails_are_muted"] is True + +class TestReport: def test_report_with_aws_provider_not_muted_pass(self): # Mocking check_findings and provider finding_1 = MagicMock()