Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CPLYTM-421 create the validation component from the rules #414

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 152 additions & 146 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ module = "ruamel"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "ssg.products"
module = "ssg.*"
ignore_missing_imports = true

43 changes: 43 additions & 0 deletions tests/trestlebot/cli/test_sync_cac_content_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,46 @@ def test_sync_product_name(tmp_repo: Tuple[str, Repo]) -> None:
with open(component_definition, "r", encoding="utf-8") as file:
content = file.read()
assert '"title": "ocp4"' in content


def test_sync_cac_validation_comp(tmp_repo: Tuple[str, Repo]) -> None:
"""Tests sync Cac content product name to OSCAL component title ."""
repo_dir, _ = tmp_repo
repo_path = pathlib.Path(repo_dir)
setup_for_catalog(repo_path, test_cat, "catalog")
setup_for_profile(repo_path, test_prof, "profile")

runner = CliRunner()
result = runner.invoke(
sync_cac_content_cmd,
[
"--product",
test_product,
"--repo-path",
str(repo_path.resolve()),
"--cac-content-root",
cac_content_test_data,
"--cac-profile",
"cac-profile",
"--oscal-profile",
test_prof,
"--committer-email",
"[email protected]",
"--committer-name",
"test name",
"--branch",
"test",
"--dry-run",
"--component-definition-type",
"validation",
],
)
# Check the CLI sync-cac-content is successful
assert result.exit_code == 0
# Check if the component definition is created
component_definition = repo_path.joinpath(test_comp_path)
assert component_definition.exists()
# Check if it populates the product name as the component title
with open(component_definition, "r", encoding="utf-8") as file:
content = file.read()
assert '"type": "validation"' in content
60 changes: 31 additions & 29 deletions trestlebot/cli/commands/sync_cac_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

"""Module for sync cac content command"""
import logging
import os
import sys
import traceback
from typing import Any, List

import click

from trestlebot.cli.options.common import common_options, git_options, handle_exceptions
from trestlebot.cli.options.common import common_options, git_options
from trestlebot.cli.utils import run_bot
from trestlebot.const import ERROR_EXIT_CODE
from trestlebot.tasks.authored.compdef import AuthoredComponentDefinition
from trestlebot.tasks.base_task import TaskBase
from trestlebot.tasks.sync_cac_content_task import SyncCacContentTask
Expand Down Expand Up @@ -49,41 +53,39 @@
)
@click.option(
"--component-definition-type",
type=click.Choice(["service", "validation"]),
type=click.Choice(["service", "validation", "software"]),
help="Type of component definition. Default: service",
required=False,
default="service",
)
@handle_exceptions
def sync_cac_content_cmd(ctx: click.Context, **kwargs: Any) -> None:
"""Transform CaC content to OSCAL component definition."""
# Steps:
# 1. Check options, logger errors if any and exit.
# 2. Initial product component definition with product name
# 3. Create a new task to run the data transformation.
# 4. Initialize a Trestlebot object and run the task(s).

pre_tasks: List[TaskBase] = []

product = kwargs["product"]
cac_content_root = kwargs["cac_content_root"]
component_definition_type = kwargs.get("component_definition_type", "service")
working_dir = kwargs["repo_path"]

authored_comp: AuthoredComponentDefinition = AuthoredComponentDefinition(
trestle_root=working_dir,
)
authored_comp.create_update_cac_compdef(
comp_type=component_definition_type,
product=product,
cac_content_root=cac_content_root,
working_dir=working_dir,
)

sync_cac_content_task: SyncCacContentTask = SyncCacContentTask(
working_dir=working_dir
)

pre_tasks.append(sync_cac_content_task)
component_definition_type = kwargs["component_definition_type"]
cac_profile = os.path.join(cac_content_root, kwargs["cac_profile"])
oscal_profile = kwargs["oscal_profile"]
working_dir = str(kwargs["repo_path"].resolve())

run_bot(pre_tasks, kwargs)
try:
pre_tasks: List[TaskBase] = []
authored_comp: AuthoredComponentDefinition = AuthoredComponentDefinition(
trestle_root=working_dir,
)
sync_cac_content_task = SyncCacContentTask(
product,
cac_profile,
cac_content_root,
component_definition_type,
oscal_profile,
authored_comp,
)
pre_tasks.append(sync_cac_content_task)
results = run_bot(pre_tasks, kwargs)
logger.debug(f"Trestlebot results: {results}")
except Exception as e:
traceback_str = traceback.format_exc()
logger.error(f"Trestle-bot Error: {str(e)}")
logger.debug(traceback_str)
sys.exit(ERROR_EXIT_CODE)
62 changes: 0 additions & 62 deletions trestlebot/tasks/authored/compdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

"""Trestle Bot functions for component definition authoring"""

import json
import logging
import os
import pathlib
Expand All @@ -15,20 +14,14 @@
from trestle.common.err import TrestleError
from trestle.common.model_utils import ModelUtils
from trestle.core.catalog.catalog_interface import CatalogInterface
from trestle.core.generators import generate_sample_model
from trestle.core.profile_resolver import ProfileResolver
from trestle.core.repository import AgileAuthoring
from trestle.oscal.component import ComponentDefinition, DefinedComponent

from trestlebot.const import RULE_PREFIX, RULES_VIEW_DIR, YAML_EXTENSION
from trestlebot.tasks.authored.base_authored import (
AuthoredObjectBase,
AuthoredObjectException,
)
from trestlebot.transformers.cac_transformer import (
get_component_info,
update_component_definition,
)
from trestlebot.transformers.trestle_rule import (
ComponentInfo,
Control,
Expand Down Expand Up @@ -169,61 +162,6 @@ def create_new_default(
)
rules_view_builder.write_to_yaml(rule_dir)

def create_update_cac_compdef(
self,
comp_type: str,
product: str,
cac_content_root: str,
working_dir: str,
) -> None:
"""Create component definition for cac content

Args:
comp_description: Description of the component
comp_type: Type of the component
product: Product name for the component
cac_content_root: ComplianceAsCode repo path
working_dir: workplace repo path
"""
# Initial component definition fields
component_definition = generate_sample_model(ComponentDefinition)
component_definition.metadata.title = f"Component definition for {product}"
component_definition.metadata.version = "1.0"
component_definition.components = list()
oscal_component = generate_sample_model(DefinedComponent)
product_name, full_name = get_component_info(product, cac_content_root)
oscal_component.title = product_name
oscal_component.description = full_name
oscal_component.type = comp_type

# Create all of the component properties for rules
# This part will be updated in CPLYTM-218
"""
rules: List[RuleInfo] = self.rules_transformer.get_all_rules()
all_rule_properties: List[Property] = self.rules_transformer.transform(rules)
oscal_component.props = none_if_empty(all_rule_properties)
"""
repo_path = pathlib.Path(working_dir)
out_path = repo_path.joinpath(f"{const.MODEL_DIR_COMPDEF}/{product}/")
oname = "component-definition.json"
ofile = out_path / oname
if ofile.exists():
logger.info(f"The component for product {product} exists.")
with open(ofile, "r", encoding="utf-8") as f:
data = json.load(f)
for component in data["component-definition"]["components"]:
if component.get("title") == oscal_component.title:
logger.info("Update the exsisting component definition.")
# Need to update props parts if the rules updated
# Update the version and last modify time
update_component_definition(ofile)
else:
logger.info(f"Creating component definition for product {product}")
out_path.mkdir(exist_ok=True, parents=True)
ofile = out_path / oname
component_definition.components.append(oscal_component)
component_definition.oscal_write(ofile)


class RulesViewBuilder:
"""Write TrestleRule objects to YAML files in rules view."""
Expand Down
147 changes: 131 additions & 16 deletions trestlebot/tasks/sync_cac_content_task.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,150 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 Red Hat, Inc.
# Copyright (c) 2024 Red Hat, Inc.


"""Trestle Bot Rule Transform Tasks"""
"""Trestle Bot Sync CaC Content Tasks"""

from typing import Optional
import datetime
import logging
import pathlib
from typing import Any, Dict, List

import trestlebot.const as const
from trestlebot.tasks.base_task import ModelFilter, TaskBase
from trestle.common import const as trestle_const
from trestle.core.generators import generate_sample_model
from trestle.oscal.component import ComponentDefinition, DefinedComponent

from trestlebot import const
from trestlebot.tasks.authored.base_authored import AuthoredObjectBase
from trestlebot.tasks.base_task import TaskBase
from trestlebot.transformers.cac_transformer import (
get_component_info,
get_validation_component_mapping,
)


logger = logging.getLogger(__name__)


class SyncCacContentTask(TaskBase):
"""
Transform rules into OSCAL content.
Sync CaC content to OSCAL component definition task.
"""

def __init__(
self,
working_dir: str,
model_filter: Optional[ModelFilter] = None,
product: str,
cac_profile: str,
cac_content_root: str,
comp_type: str,
oscal_profile: str,
authored_object: AuthoredObjectBase,
) -> None:
"""
Initialize transform task.

Args:
working_dir: Working directory to complete operations in
model_filter: Optional filter to apply to the task to include or exclude models
from processing.
Initialize CaC content sync task.
"""
super().__init__(working_dir, model_filter)
self.product: str = product
self.cac_profile: str = cac_profile
self.cac_content_root: str = cac_content_root
self.comp_type: str = comp_type
self.rules_json_path: str = ""
self.env_yaml: Dict[str, Any] = {}
self.selected: List[str] = []

self._authored_object = authored_object
working_dir = self._authored_object.get_trestle_root()
super().__init__(working_dir, None)

def _create_or_update_compdef(self, comp_type: str = "service") -> None:
"""Create or update a component definition for specified product."""

component_definition = generate_sample_model(ComponentDefinition)
component_definition.metadata.title = f"Component definition for {self.product}"
component_definition.metadata.version = "1.0"
component_definition.components = list()

oscal_component = generate_sample_model(DefinedComponent)
product_name, full_name = get_component_info(
self.product, self.cac_content_root
)
# Collect rules and parameters as the props will be updated in CPLYTM-218
# rules_transformer.add_rules(self.selected)
# rules: List[RuleInfo] = rules_transformer.get_all_rules()
# all_rule_properties: List[Property] = rules_transformer.transform(rules)
# props = none_if_empty(all_rule_properties)
# Assumption of having obtained the rules as follows
props = [
{
"name": "Rule_Id",
"ns": "http://ibm.github.io/compliance-trestle/schemas/oscal/cd",
"value": "file_permissions_kube_apiserver",
"remarks": "rule_set_201",
},
{
"name": "Rule_Description",
"ns": "http://ibm.github.io/compliance-trestle/schemas/oscal/cd",
"value": "Ensure",
"remarks": "rule_set_201",
},
]
oscal_component.type = self.comp_type
if oscal_component.type == "validation":
oscal_component.title = "openscap"
oscal_component.description = "openscap"
oscal_component.props = get_validation_component_mapping(props)
else:
oscal_component.title = product_name
oscal_component.description = full_name
oscal_component.props = props
repo_path = pathlib.Path(self.working_dir)
cd_dir = repo_path.joinpath(f"{trestle_const.MODEL_DIR_COMPDEF}/{self.product}")
cd_json = cd_dir / "component-definition.json"
if cd_json.exists():
logger.info(f"The component definition for product {self.product} exists.")
compdef = ComponentDefinition.oscal_read(cd_json)
components_titles = []
updated = False
for index, component in enumerate(compdef.components):
components_titles.append(component.title)
# If the component exists and the props need to be updated
if component.title == oscal_component.title:
if component.props != oscal_component.props:
logger.info(
f"Start to update the props of the component {component.title}"
)
compdef.components[index].props = oscal_component.props
updated = True
compdef.oscal_write(cd_json)
break
# If the component doesn't exist, append this component
if oscal_component.title not in components_titles:
logger.info(f"{oscal_component.title}")
logger.info(f"Start to append the component {oscal_component.title}")
compdef.components.append(oscal_component)
compdef.oscal_write(cd_json)
updated = True
if updated:
logger.info(f"Update component definition: {cd_json}")
compdef.metadata.version = str(
"{:.1f}".format(float(compdef.metadata.version) + 0.1)
)
compdef.metadata.last_modified = (
datetime.datetime.now(datetime.timezone.utc)
.replace(microsecond=0)
.isoformat()
)
compdef.oscal_write(cd_json)
else:
logger.info(f"Creating component definition for product {self.product}")
cd_dir.mkdir(exist_ok=True, parents=True)
cd_json = cd_dir / "component-definition.json"
component_definition.components.append(oscal_component)
component_definition.oscal_write(cd_json)

def execute(self) -> int:
"""Execute task"""
"""Execute task to create or update product component definition."""

# Collect all product rules selected in product profile
# self._collect_rules()
# Create or update product component definition
self._create_or_update_compdef()
return const.SUCCESS_EXIT_CODE
Loading
Loading