Skip to content

Commit

Permalink
feat(athena): add new check athena_workgroup_logging_enabled (#5468)
Browse files Browse the repository at this point in the history
  • Loading branch information
puchy22 authored Oct 18, 2024
1 parent 50cb79e commit d997ebb
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 39 deletions.
75 changes: 36 additions & 39 deletions prowler/providers/aws/services/athena/athena_service.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from typing import Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field

from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService


################## Athena
class Athena(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.workgroups = {}
self.__threading_call__(self._list_workgroups)
self._get_workgroups()
self.__threading_call__(self._get_workgroups, self.workgroups.values())
self._list_query_executions()
self._list_tags_for_resource()

Expand Down Expand Up @@ -44,45 +43,42 @@ def _list_workgroups(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _get_workgroups(self):
def _get_workgroups(self, workgroup):
logger.info("Athena - Getting WorkGroups...")
try:
for workgroup in self.workgroups.values():
try:
wg = self.regional_clients[workgroup.region].get_work_group(
WorkGroup=workgroup.name
)
wg = self.regional_clients[workgroup.region].get_work_group(
WorkGroup=workgroup.name
)

wg_configuration = wg.get("WorkGroup").get("Configuration")
self.workgroups[workgroup.arn].enforce_workgroup_configuration = (
wg_configuration.get("EnforceWorkGroupConfiguration", False)
)
wg_configuration = wg.get("WorkGroup").get("Configuration")
self.workgroups[workgroup.arn].enforce_workgroup_configuration = (
wg_configuration.get("EnforceWorkGroupConfiguration", False)
)

# We include an empty EncryptionConfiguration to handle if the workgroup does not have encryption configured
encryption = (
wg_configuration.get(
"ResultConfiguration",
{"EncryptionConfiguration": {}},
)
.get(
"EncryptionConfiguration",
{"EncryptionOption": ""},
)
.get("EncryptionOption")
)
# We include an empty EncryptionConfiguration to handle if the workgroup does not have encryption configured
encryption = (
wg_configuration.get(
"ResultConfiguration",
{"EncryptionConfiguration": {}},
)
.get(
"EncryptionConfiguration",
{"EncryptionOption": ""},
)
.get("EncryptionOption")
)

if encryption in ["SSE_S3", "SSE_KMS", "CSE_KMS"]:
encryption_configuration = EncryptionConfiguration(
encryption_option=encryption, encrypted=True
)
self.workgroups[workgroup.arn].encryption_configuration = (
encryption_configuration
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if encryption in ["SSE_S3", "SSE_KMS", "CSE_KMS"]:
encryption_configuration = EncryptionConfiguration(
encryption_option=encryption, encrypted=True
)
self.workgroups[workgroup.arn].encryption_configuration = (
encryption_configuration
)

workgroup.cloudwatch_logging = wg_configuration.get(
"PublishCloudWatchMetricsEnabled", False
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand Down Expand Up @@ -117,10 +113,10 @@ def _list_tags_for_resource(self):
regional_client = self.regional_clients[workgroup.region]
workgroup.tags = regional_client.list_tags_for_resource(
ResourceARN=workgroup.arn
)["Tags"]
).get("Tags", [])
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{workgroup.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
Expand All @@ -143,4 +139,5 @@ class WorkGroup(BaseModel):
enforce_workgroup_configuration: bool = False
queries: bool = False
region: str
tags: Optional[list] = []
cloudwatch_logging: bool = False
tags: Optional[list] = Field(default_factory=list)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "athena_workgroup_logging_enabled",
"CheckTitle": "Ensure that logging is enabled for Amazon Athena workgroups to capture query activity.",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices/Logging"
],
"ServiceName": "athena",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:athena:region:account-id:workgroup/resource-id",
"Severity": "medium",
"ResourceType": "AwsAthenaWorkGroup",
"Description": "Enabling logging for a workgroup provides valuable insights into query activity, including user actions, query execution details, and potential security events.",
"Risk": "Without logging enabled, it can be difficult to track and investigate potential security incidents or unauthorized access to Athena data. This can lead to data breaches, compliance violations, and increased security risks.",
"RelatedUrl": "https://docs.aws.amazon.com/athena/latest/ug/security-logging-monitoring.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/athena-controls.html#athena-4",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable logging for your Athena workgroups to capture query activity and enhance security monitoring. Configure the output location for logs in a secure S3 bucket and ensure appropriate encryption is applied.",
"Url": "https://docs.aws.amazon.com/athena/latest/ug/athena-cloudwatch-metrics-enable.html"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import List

from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.athena.athena_client import athena_client


class athena_workgroup_logging_enabled(Check):
"""Check if there are Athena workgroups with logging disabled."""

def execute(self) -> List[Check_Report_AWS]:
"""Execute the Athena workgroup logging enabled check.
Iterates over all Athena workgroups and checks if is publishing logs to CloudWatch.
Returns:
List of reports object with the findings of each workgroup.
"""
findings = []
for workgroup in athena_client.workgroups.values():
# Only check for enabled and used workgroups (has recent queries)
if (
workgroup.state == "ENABLED" and workgroup.queries
) or athena_client.provider.scan_unused_services:
report = Check_Report_AWS(self.metadata())
report.resource_id = workgroup.name
report.resource_arn = workgroup.arn
report.region = workgroup.region
report.resource_tags = workgroup.tags
report.status = "PASS"
report.status_extended = (
f"Athena WorkGroup {workgroup.name} has CloudWatch logging enabled."
)

if not workgroup.cloudwatch_logging:
report.status = "FAIL"
report.status_extended = f"Athena WorkGroup {workgroup.name} does not have CloudWatch logging enabled."

findings.append(report)

return findings
2 changes: 2 additions & 0 deletions tests/providers/aws/services/athena/athena_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def mock_make_api_call(self, operation_name, kwarg):
},
},
"EnforceWorkGroupConfiguration": True,
"PublishCloudWatchMetricsEnabled": True,
},
}
}
Expand Down Expand Up @@ -110,3 +111,4 @@ def test_get_workgroupsencrypted(self):
== "SSE_S3"
)
assert athena.workgroups[workgroup_arn].enforce_workgroup_configuration is True
assert athena.workgroups[workgroup_arn].cloudwatch_logging
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from unittest.mock import patch

from boto3 import client
from moto import mock_aws

from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)

ATHENA_PRIMARY_WORKGROUP = "primary"
ATHENA_PRIMARY_WORKGROUP_ARN = f"arn:aws:athena:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:workgroup/{ATHENA_PRIMARY_WORKGROUP}"


class Test_athena_workgroup_logging_enabled:
@mock_aws
def test_primary_workgroup_logging_disabled(self):
from prowler.providers.aws.services.athena.athena_service import Athena

aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])

with patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), patch(
"prowler.providers.aws.services.athena.athena_workgroup_logging_enabled.athena_workgroup_logging_enabled.athena_client",
new=Athena(aws_provider),
):
from prowler.providers.aws.services.athena.athena_workgroup_logging_enabled.athena_workgroup_logging_enabled import (
athena_workgroup_logging_enabled,
)

check = athena_workgroup_logging_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} does not have CloudWatch logging enabled."
)
assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP
assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []

@mock_aws
def test_primary_workgroup_logging_disabled_ignoring(self):
from prowler.providers.aws.services.athena.athena_service import Athena

aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
aws_provider._scan_unused_services = False

with patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), patch(
"prowler.providers.aws.services.athena.athena_workgroup_logging_enabled.athena_workgroup_logging_enabled.athena_client",
new=Athena(aws_provider),
):
from prowler.providers.aws.services.athena.athena_workgroup_logging_enabled.athena_workgroup_logging_enabled import (
athena_workgroup_logging_enabled,
)

check = athena_workgroup_logging_enabled()
result = check.execute()

assert len(result) == 0

@mock_aws
def test_primary_workgroup_logging_enabled(self):
athena_client = client("athena", region_name=AWS_REGION_EU_WEST_1)

# Delete and recreate the primary workgroup with logging enabled
athena_client.delete_work_group(WorkGroup=ATHENA_PRIMARY_WORKGROUP)

athena_client.create_work_group(
Name=ATHENA_PRIMARY_WORKGROUP,
Configuration={
"ResultConfiguration": {
"OutputLocation": f"s3://aws-athena-query-results-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}/",
"EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
},
"EnforceWorkGroupConfiguration": False,
"PublishCloudWatchMetricsEnabled": True,
"BytesScannedCutoffPerQuery": 100000000,
"RequesterPaysEnabled": False,
"EngineVersion": {
"SelectedEngineVersion": "Athena engine version 2",
"EffectiveEngineVersion": "Athena engine version 2",
},
},
Description="Primary WorkGroup",
)

from prowler.providers.aws.services.athena.athena_service import Athena

aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])

with patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), patch(
"prowler.providers.aws.services.athena.athena_workgroup_logging_enabled.athena_workgroup_logging_enabled.athena_client",
new=Athena(aws_provider),
):
from prowler.providers.aws.services.athena.athena_workgroup_logging_enabled.athena_workgroup_logging_enabled import (
athena_workgroup_logging_enabled,
)

check = athena_workgroup_logging_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Athena WorkGroup {ATHENA_PRIMARY_WORKGROUP} has CloudWatch logging enabled."
)
assert result[0].resource_id == ATHENA_PRIMARY_WORKGROUP
assert result[0].resource_arn == ATHENA_PRIMARY_WORKGROUP_ARN
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []

0 comments on commit d997ebb

Please sign in to comment.