From f20fb22818f0149f05173f022ccb8a5d468583bd Mon Sep 17 00:00:00 2001 From: Shoshi Wolpe <158214649+shoshiGit@users.noreply.github.com> Date: Thu, 4 Jul 2024 11:40:30 +0300 Subject: [PATCH] feat(arm): AzureSearchSLAIndex (#6530) initial commit --- .../checks/resource/AzureSearchSLAIndex.py | 38 +++++++++++++++++ .../example_AzureSearchSLAIndex/fail.json | 17 ++++++++ .../example_AzureSearchSLAIndex/fail2.json | 17 ++++++++ .../example_AzureSearchSLAIndex/pass.json | 19 +++++++++ .../resource/test_AzureSearchSLAIndex.py | 42 +++++++++++++++++++ 5 files changed, 133 insertions(+) create mode 100644 checkov/arm/checks/resource/AzureSearchSLAIndex.py create mode 100644 tests/arm/checks/resource/example_AzureSearchSLAIndex/fail.json create mode 100644 tests/arm/checks/resource/example_AzureSearchSLAIndex/fail2.json create mode 100644 tests/arm/checks/resource/example_AzureSearchSLAIndex/pass.json create mode 100644 tests/arm/checks/resource/test_AzureSearchSLAIndex.py diff --git a/checkov/arm/checks/resource/AzureSearchSLAIndex.py b/checkov/arm/checks/resource/AzureSearchSLAIndex.py new file mode 100644 index 00000000000..c13eda686f4 --- /dev/null +++ b/checkov/arm/checks/resource/AzureSearchSLAIndex.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import Any, Dict + +from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.arm.base_resource_check import BaseResourceCheck + + +class AzureSearchSLAIndex(BaseResourceCheck): + def __init__(self) -> None: + # Cognitive Search services support indexing and querying. Indexing is the process of loading content into + # the service to make it searchable. Querying is the process where a client searches for content + # by sending queries to the index. + # Cognitive Search supports a configurable number of replicas. Having multiple replicas allows queries and + # index updates to load balance across multiple replicas. + # + # To receive a Service Level Agreement (SLA) for Search index updates a minimum of 3 replicas is required. + name = "Ensure that Azure Cognitive Search maintains SLA for index updates" + id = "CKV_AZURE_208" + supported_resources = ["Microsoft.Search/searchServices", ] + categories = [CheckCategories.GENERAL_SECURITY, ] + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: Dict[str, Any]) -> CheckResult: + properties = conf.get("properties", {}) + if not isinstance(properties, dict): + return CheckResult.FAILED + replica_count = properties.get("replicaCount") + if replica_count and isinstance(replica_count, int): + if replica_count >= 3: + return CheckResult.PASSED + else: + return CheckResult.FAILED + else: + return CheckResult.FAILED + + +check = AzureSearchSLAIndex() diff --git a/tests/arm/checks/resource/example_AzureSearchSLAIndex/fail.json b/tests/arm/checks/resource/example_AzureSearchSLAIndex/fail.json new file mode 100644 index 00000000000..3f8e6849fd4 --- /dev/null +++ b/tests/arm/checks/resource/example_AzureSearchSLAIndex/fail.json @@ -0,0 +1,17 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "resources": [ + { + "type": "Microsoft.Search/searchServices", + "apiVersion": "2020-08-01-Preview", + "name": "fail", + "sku": { + "name": "standard" + }, + "properties": { + "publicNetworkAccess": "Enabled" + } + } + ] +} \ No newline at end of file diff --git a/tests/arm/checks/resource/example_AzureSearchSLAIndex/fail2.json b/tests/arm/checks/resource/example_AzureSearchSLAIndex/fail2.json new file mode 100644 index 00000000000..791f340331b --- /dev/null +++ b/tests/arm/checks/resource/example_AzureSearchSLAIndex/fail2.json @@ -0,0 +1,17 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "resources": [ + { + "type": "Microsoft.Search/searchServices", + "apiVersion": "2020-08-01", + "name": "fail2", + "sku": { + "name": "standard" + }, + "properties": { + "replicaCount": 2 + } + } + ] +} \ No newline at end of file diff --git a/tests/arm/checks/resource/example_AzureSearchSLAIndex/pass.json b/tests/arm/checks/resource/example_AzureSearchSLAIndex/pass.json new file mode 100644 index 00000000000..97efe485f93 --- /dev/null +++ b/tests/arm/checks/resource/example_AzureSearchSLAIndex/pass.json @@ -0,0 +1,19 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "resources": [ + { + "type": "Microsoft.Search/searchServices", + "apiVersion": "2024-03-01-preview", + "name": "pass", + "location": "string", + "sku": { + "name": "standard" + }, + "properties": { + "publicNetworkAccess": "Disabled", + "replicaCount": 3 + } + } + ] +} \ No newline at end of file diff --git a/tests/arm/checks/resource/test_AzureSearchSLAIndex.py b/tests/arm/checks/resource/test_AzureSearchSLAIndex.py new file mode 100644 index 00000000000..daed8bc0956 --- /dev/null +++ b/tests/arm/checks/resource/test_AzureSearchSLAIndex.py @@ -0,0 +1,42 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.arm.runner import Runner +from checkov.arm.checks.resource.AzureSearchSLAIndex import check + + +class TestAzureSearchSLAIndex(unittest.TestCase): + + def test(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = os.path.join(current_dir, "example_AzureSearchSLAIndex") + report = runner.run(root_folder=test_files_dir, + runner_filter=RunnerFilter(checks=[check.id])) + summary = report.get_summary() + + passing_resources = { + 'Microsoft.Search/searchServices.pass' + } + failing_resources = { + 'Microsoft.Search/searchServices.fail', + 'Microsoft.Search/searchServices.fail2' + } + skipped_resources = {} + + passed_check_resources = set([c.resource for c in report.passed_checks]) + failed_check_resources = set([c.resource for c in report.failed_checks]) + + self.assertEqual(summary['passed'], len(passing_resources)) + self.assertEqual(summary['failed'], len(failing_resources)) + self.assertEqual(summary['skipped'], len(skipped_resources)) + self.assertEqual(summary['parsing_errors'], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == '__main__': + unittest.main()