Skip to content

Commit

Permalink
Merge pull request #142 from Skyscanner/load_rules_config_file
Browse files Browse the repository at this point in the history
  • Loading branch information
ignaciobolonio authored Jan 21, 2021
2 parents 3643210 + e50bd3f commit b54da97
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 6 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Changelog
All notable changes to this project will be documented in this file.

## [0.23.0] - 2021-01-19
## [0.23.0] - 2021-01-20
### Breaking changes
- Rule config files using filters must now use `ingress_obj` and not `ingress`.
### Additions
- Rules using IP Address Ranges now export both `ingress_obj` and `ingress_ip` filter fields.
- Add support to load an external rules configuration file

## [0.22.0] - 2020-12-11
### Breaking changes
Expand Down
16 changes: 13 additions & 3 deletions cfripper/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ def setup_logging(level: str) -> None:
logging.basicConfig(level=LOGGING_LEVELS[level], format="%(message)s")


def init_cfripper() -> Tuple[Config, RuleProcessor]:
def init_cfripper(rules_config_file: Optional[str]) -> Tuple[Config, RuleProcessor]:
config = Config(rules=DEFAULT_RULES.keys())
if rules_config_file:
config.load_rules_config_file(rules_config_file)
rule_processor = RuleProcessor(*[DEFAULT_RULES.get(rule)(config) for rule in config.rules])
return config, rule_processor

Expand Down Expand Up @@ -82,15 +84,20 @@ def output_handling(template_name: str, result: str, output_format: str, output_


def process_template(
template, resolve: bool, resolve_parameters: Optional[Dict], output_folder: Optional[str], output_format: str
template,
resolve: bool,
resolve_parameters: Optional[Dict],
output_folder: Optional[str],
output_format: str,
rules_config_file: Optional[str],
) -> None:
logging.info(f"Analysing {template.name}...")

cfmodel = get_cfmodel(template)
if resolve:
cfmodel = cfmodel.resolve(resolve_parameters)

config, rule_processor = init_cfripper()
config, rule_processor = init_cfripper(rules_config_file)

result = analyse_template(cfmodel, rule_processor, config)

Expand Down Expand Up @@ -138,6 +145,9 @@ def process_template(
help="Logging level",
show_default=True,
)
@click.option(
"--rules-config-file", type=click.File("r"), help="Loads rules configuration file (type: [.py, .pyc])",
)
def cli(templates, logging_level, resolve_parameters, **kwargs):
"""Analyse AWS Cloudformation templates passed by parameter."""
try:
Expand Down
35 changes: 34 additions & 1 deletion cfripper/config/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import importlib
import logging
import os
import re
from typing import List
import sys
from typing import Dict, List

from pydantic import BaseModel

from .rule_config import RuleConfig
from .whitelist import AWS_ELASTICACHE_BACKUP_CANONICAL_IDS, AWS_ELB_LOGS_ACCOUNT_IDS
from .whitelist import rule_to_action_whitelist as default_rule_to_action_whitelist
from .whitelist import rule_to_resource_whitelist as default_rule_to_resource_whitelist
from .whitelist import stack_whitelist as default_stack_whitelist

logger = logging.getLogger(__file__)


class Config:
DEFAULT_ALLOWED_WORLD_OPEN_PORTS = [80, 443]
Expand Down Expand Up @@ -159,3 +167,28 @@ def get_whitelisted_rules(self) -> List[str]:
whitelisted_rules += v

return whitelisted_rules

def load_rules_config_file(self, filename: str):
if not os.path.exists(filename):
raise RuntimeError(f"{filename} doesn't exist")

try:
ext = os.path.splitext(filename)[1]
module_name = "__rules_config__"
if ext not in [".py", ".pyc"]:
raise RuntimeError("Configuration file should have a valid Python extension.")
spec = importlib.util.spec_from_file_location(module_name, filename)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
rules_config = vars(module).get("RULES_CONFIG")
# Validate rules_config format
RulesConfigMapping(__root__=rules_config)
self.rules_config = rules_config
except Exception:
logger.exception(f"Failed to read config file: {filename}")
raise


class RulesConfigMapping(BaseModel):
__root__: Dict[str, RuleConfig]
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile
# To update, run:
#
# make freeze-upgrade
# make freeze
#
boto3==1.16.25 # via cfripper (setup.py)
botocore==1.19.25 # via boto3, s3transfer
Expand Down
27 changes: 27 additions & 0 deletions tests/config/test_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from pydantic import ValidationError

from cfripper.config.config import Config

Expand Down Expand Up @@ -159,3 +160,29 @@ def test_stack_to_action_whitelist_stack_without_resources(mock_rule_to_action_w
rule_to_action_whitelist=mock_rule_to_action_whitelist,
)
assert config.get_whitelisted_actions("SecurityGroupOpenToWorldRule") == []


def test_load_rules_config_file_success(test_files_location):
mock_rules = ["RuleThatUsesResourceWhitelists", "SecurityGroupOpenToWorldRule"]
config = Config(stack_name="test_stack", rules=mock_rules, stack_whitelist={})
config.load_rules_config_file(filename=f"{test_files_location}/config/rules_config_CrossAccountTrustRule.py")
rule_config = config.get_rule_config("CrossAccountTrustRule")
assert not rule_config.risk_value
assert not rule_config.rule_mode
assert len(rule_config.filters) == 1


def test_load_rules_config_file_no_file(test_files_location):
mock_rules = ["RuleThatUsesResourceWhitelists", "SecurityGroupOpenToWorldRule"]
config = Config(stack_name="test_stack", rules=mock_rules, stack_whitelist={})

with pytest.raises(RuntimeError):
config.load_rules_config_file(filename=f"{test_files_location}/config/non_existing_file.py")


def test_load_rules_config_file_invalid_file(test_files_location):
mock_rules = ["RuleThatUsesResourceWhitelists", "SecurityGroupOpenToWorldRule"]
config = Config(stack_name="test_stack", rules=mock_rules, stack_whitelist={})

with pytest.raises(ValidationError):
config.load_rules_config_file(filename=f"{test_files_location}/config/rules_config_invalid.py")
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from pathlib import Path

import pytest

Expand All @@ -8,3 +9,8 @@ def default_session_fixture(request):
"""Logging disabled when running tests."""

logging.disable(logging.CRITICAL)


@pytest.fixture
def test_files_location() -> Path:
return Path(__file__).parent / "test_files"
13 changes: 13 additions & 0 deletions tests/rules/test_CrossAccountTrustRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ def test_filter_works_as_expected(template_two_roles_dict, expected_result_two_r
assert result.failed_rules[0] == expected_result_two_roles[-1]


def test_filter_works_as_expected_with_rules_config_file(
template_two_roles_dict, expected_result_two_roles, test_files_location
):
config = Config(rules=["CrossAccountTrustRule"], aws_account_id="123456789", stack_name="mockstack",)
config.load_rules_config_file(filename=f"{test_files_location}/config/rules_config_CrossAccountTrustRule.py")
rules = [DEFAULT_RULES.get(rule)(config) for rule in config.rules]
processor = RuleProcessor(*rules)
result = processor.process_cf_template(template_two_roles_dict, config)

assert not result.valid
assert result.failed_rules[0] == expected_result_two_roles[-1]


def test_whitelisted_stacks_do_not_report_anything(template_two_roles_dict):
mock_stack_whitelist = {"mockstack": ["CrossAccountTrustRule"]}
mock_config = Config(
Expand Down
19 changes: 19 additions & 0 deletions tests/test_files/config/rules_config_CrossAccountTrustRule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from cfripper.config.filter import Filter
from cfripper.config.rule_config import RuleConfig
from cfripper.model.enums import RuleMode

RULES_CONFIG = {
"CrossAccountTrustRule": RuleConfig(
filters=[
Filter(
rule_mode=RuleMode.WHITELISTED,
eval={
"and": [
{"eq": [{"ref": "config.stack_name"}, "mockstack"]},
{"eq": [{"ref": "logical_id"}, "RootRoleOne"]},
]
},
)
],
)
}
20 changes: 20 additions & 0 deletions tests/test_files/config/rules_config_invalid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from cfripper.config.filter import Filter
from cfripper.config.rule_config import RuleConfig
from cfripper.model.enums import RuleMode

# RULES_CONFIG is here a list of RuleConfig instead of a dict with the rule names as keys
RULES_CONFIG = [
RuleConfig(
filters=[
Filter(
rule_mode=RuleMode.WHITELISTED,
eval={
"and": [
{"eq": [{"ref": "config.stack_name"}, "mockstack"]},
{"eq": [{"ref": "logical_id"}, "RootRoleOne"]},
]
},
)
],
)
]

0 comments on commit b54da97

Please sign in to comment.