Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dockerfiles #81

Merged
merged 13 commits into from
Apr 16, 2018
Merged
96 changes: 90 additions & 6 deletions colin/checks/abstract/dockerfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,68 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import re

from ..check_utils import check_label
from ..result import CheckResult
from .abstract_check import AbstractCheck

logger = logging.getLogger(__name__)


def get_instructions_from_dockerfile_parse(dfp, instruction):
"""
Get the list of instruction dictionary for given instruction name.
(Subset of DockerfileParser.structure only for given instruction.)

:param dfp: DockerfileParser
:param instruction: str
:return: list
"""
result = []
for inst in dfp.structure:
if inst["instruction"] == instruction:
result.append(inst)
return result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you know list comprehensions, I'm just wondering why you don't use it here:

return [inst for inst in dfp.structure if inst["instruction"] == instruction]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither am I...;-) Thanks!



class DockerfileCheck(AbstractCheck):
pass


class InstructionCheck(AbstractCheck):
class InstructionCheck(DockerfileCheck):

def __init__(self, name, message, description, reference_url, tags, instruction, regex, required):
def __init__(self, name, message, description, reference_url, tags, instruction, value_regex, required):
super().__init__(name, message, description, reference_url, tags)
self.instruction = instruction
self.regex = regex
self.value_regex = value_regex
self.required = required

def check(self, target):
pass
instructions = get_instructions_from_dockerfile_parse(target.instance, self.instruction)
pattern = re.compile(self.value_regex)
logs = []
passed = True
for inst in instructions:
match = bool(pattern.match(inst["value"]))
passed = match == self.required
log = "Value for instruction {} {}mach regex: '{}'.".format(inst["content"],
"" if match else "does not ",
self.value_regex)
logs.append(log)
logger.debug(log)

return CheckResult(ok=passed,
severity=self.severity,
description=self.description,
message=self.message,
reference_url=self.reference_url,
check_name=self.name,
logs=logs)

class InstructionCountCheck(AbstractCheck):

class InstructionCountCheck(DockerfileCheck):

def __init__(self, name, message, description, reference_url, tags, instruction, min_count=None, max_count=None):
super().__init__(name, message, description, reference_url, tags)
Expand All @@ -42,4 +83,47 @@ def __init__(self, name, message, description, reference_url, tags, instruction,
self.max_count = max_count

def check(self, target):
pass
count = len(get_instructions_from_dockerfile_parse(target.instance, self.instruction))

log = "Found {} occurrences of the {} instruction. Needed: min {} | max {}".format(count,
self.instruction,
self.min_count,
self.max_count)
logger.debug(log)
passed = True
if self.min_count:
passed = passed and self.min_count <= count
if self.max_count:
passed = passed and count <= self.max_count

return CheckResult(ok=passed,
severity=self.severity,
description=self.description,
message=self.message,
reference_url=self.reference_url,
check_name=self.name,
logs=[log])


class DockerfileLabelCheck(DockerfileCheck):

def __init__(self, name, message, description, reference_url, tags, label, required, value_regex=None):
super().__init__(name, message, description, reference_url, tags)
self.label = label
self.required = required
self.value_regex = value_regex

def check(self, target):
labels = target.instance.labels
passed = check_label(label=self.label,
required=self.required,
value_regex=self.value_regex,
labels=labels)

return CheckResult(ok=passed,
severity=self.severity,
description=self.description,
message=self.message,
reference_url=self.reference_url,
check_name=self.name,
logs=[])
30 changes: 10 additions & 20 deletions colin/checks/abstract/labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import re

from ..check_utils import get_labels_from_target, check_label
from .dockerfile import DockerfileCheck
from .containers import ContainerCheck
from .images import ImageCheck
from ..result import CheckResult


class LabelCheck(ContainerCheck, ImageCheck):
class LabelCheck(ContainerCheck, ImageCheck, DockerfileCheck):

def __init__(self, name, message, description, reference_url, tags, label, required, value_regex=None):
super().__init__(name, message, description, reference_url, tags)
Expand All @@ -30,20 +29,11 @@ def __init__(self, name, message, description, reference_url, tags, label, requi
self.value_regex = value_regex

def check(self, target):
labels = target.instance.get_metadata()["Config"]["Labels"]
present = labels is not None and self.label in labels

if present:
if self.required and not self.value_regex:
passed = True
elif self.value_regex:
pattern = re.compile(self.value_regex)
passed = bool(pattern.match(labels[self.label]))
else:
passed = False

else:
passed = not self.required
labels = get_labels_from_target(target=target)
passed = check_label(label=self.label,
required=self.required,
value_regex=self.value_regex,
labels=labels)

return CheckResult(ok=passed,
severity=self.severity,
Expand All @@ -54,15 +44,15 @@ def check(self, target):
logs=[])


class DeprecatedLabelCheck(ContainerCheck, ImageCheck):
class DeprecatedLabelCheck(ContainerCheck, ImageCheck, DockerfileCheck):

def __init__(self, name, message, description, reference_url, tags, old_label, new_label):
super().__init__(name, message, description, reference_url, tags)
self.old_label = old_label
self.new_label = new_label

def check(self, target):
labels = target.instance.get_metadata()["Config"]["Labels"]
labels = get_labels_from_target(target=target)
old_present = labels is not None and self.old_label in labels

passed = (not old_present) or (self.new_label in labels)
Expand Down
41 changes: 41 additions & 0 deletions colin/checks/check_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import re

from ..core.target import TargetType


def check_label(label, required, value_regex, labels):
"""
Check if the label is required and match the regex

:param label: str
:param required: bool (if the presence means pass or not)
:param value_regex: str
:param labels: [str]
:return: bool (required==True: True if the label is present and match the regex if specified)
(required==False: True if the label is not present)
"""
present = labels is not None and label in labels

if present:
if required and not value_regex:
return True
elif value_regex:
pattern = re.compile(value_regex)
return bool(pattern.match(labels[label]))
else:
return False

else:
return not required


def get_labels_from_target(target):
"""
Get list of labels from the target instance.

:param target: instance of the Target
:return: [str]
"""
if target.target_type == TargetType.DOCKERFILE:
return target.instance.labels
return target.instance.get_metadata()["Config"]["Labels"]
14 changes: 14 additions & 0 deletions colin/checks/dockerfile/label_maintainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from colin.checks.abstract.dockerfile import DockerfileLabelCheck


class DockerfileLabelMaintainerCheck(DockerfileLabelCheck):

def __init__(self):
super().__init__(name="maintainer_label_required",
message="Label 'maintainer' has to be specified.",
description="The name and email of the maintainer (usually the submitter).",
reference_url="https://fedoraproject.org/wiki/Container:Guidelines#LABELS",
tags=["maintainer", "label", "required"],
label="maintainer",
required=True,
value_regex=None)
4 changes: 2 additions & 2 deletions colin/core/colin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def run(target, group=None, severity=None, tags=None, ruleset_name=None, ruleset
"""
Runs the sanity checks for the target.

:param target: str or Image/Container (name of the container or image or Image/Container instance from conu,
dockerfile will be added in the future)
:param target: str or Image/Container (name of the container/image or Image/Container instance from conu
or path to dockerfile)
:param group: str (name of the folder with group of checks, if None, all of them will be checked.)
:param severity: str (if not None, only those checks will be run -- optional x required x warn ...)
:param tags: list of str (if not None, the checks will be filtered by tags.)
Expand Down
104 changes: 58 additions & 46 deletions colin/core/ruleset/ruleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
import logging
import os

import six
from six import iteritems

from ..exceptions import ColinRulesetException
from ..constant import RULESET_DIRECTORY, JSON
from ..loader import load_check_implementation
from ..target import is_compatible
from ...checks.abstract.containers import ContainerCheck
from ...checks.abstract.dockerfile import DockerfileCheck
from ...checks.abstract.images import ImageCheck
from ..target import TargetType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,19 +74,21 @@ def get_checks(self, target_type, group=None, severity=None, tags=None):
:param tags: list of str
:return: list of check instances
"""
check_files = self._get_check_files(group=group,
severity=severity)
groups = {}
for (group, check_files) in iteritems(check_files):
checks = []
for severity, check_file in check_files:
for g in self._get_check_groups(group):
logger.debug("Getting checks for group '{}'.".format(g))
check_files = []
for sev, rules in iteritems(self.ruleset_dict[g]):

check_classes = load_check_implementation(path=check_file, severity=severity)
for check_class in check_classes:
if is_compatible(target_type, check_class, severity, tags):
checks.append(check_class)
if severity and severity != sev:
continue

groups[group] = checks
check_files += get_checks_from_rules(rules=rules,
group=g,
target_type=target_type,
severity=sev,
tags=tags)
groups[g] = check_files
return groups

@staticmethod
Expand All @@ -96,23 +102,6 @@ def get_check_file(group, name):
"""
return os.path.join(get_checks_path(), group, name + ".py")

@staticmethod
def get_check_files(group, names, severity):
"""
Get the check files from given group with given names.

:param severity: str
:param group: str
:param names: list of str
:return: list of str (paths)
"""
check_files = []
for f in names:
check_file = Ruleset.get_check_file(group=group,
name=f)
check_files.append((severity, check_file))
return check_files

def _get_check_groups(self, group=None):
"""
Get check group to validate
Expand All @@ -131,25 +120,48 @@ def _get_check_groups(self, group=None):
logger.debug("Found groups: {}.".format(check_groups))
return check_groups

def _get_check_files(self, group=None, severity=None):
"""
Get file names with checks filtered by group and severity.

:param group: str (if None, all groups will be used)
:param severity: str (if None, all severities will be used)
:return: list of str (absolute paths)
"""
groups = {}
for g in self._get_check_groups(group):
logger.debug("Getting checks for group '{}'.".format(g))
check_files = []
for sev, files in iteritems(self.ruleset_dict[g]):
if (not severity) or severity == sev:
check_files += Ruleset.get_check_files(group=g,
names=files,
severity=sev)
groups[g] = check_files
return groups
def is_compatible(target_type, check_instance):
if not target_type:
return True
return (target_type == TargetType.DOCKERFILE and isinstance(check_instance, DockerfileCheck)) \
or (target_type == TargetType.CONTAINER and isinstance(check_instance, ContainerCheck)) \
or (target_type == TargetType.CONTAINER_IMAGE and isinstance(check_instance, ImageCheck))


def get_checks_from_rules(rules, group, target_type, severity, tags):
rule_items = []
for rule in rules:

if isinstance(rule, six.string_types):
rule_items.append(rule)
elif isinstance(rule, dict):
if target_type and target_type.name not in rule["type"]:
continue

rule_items += rule["checks"]

check_instances = []
for r in rule_items:
logger.debug("Loading check instance for {}".format(r))
check_instances += load_check_implementation(path=Ruleset.get_check_file(group, r),
severity=severity)
result = []
for check_instance in check_instances:
if not is_compatible(target_type=target_type, check_instance=check_instance):
logger.debug(
"Check {} not compatible with the target type: {}".format(check_instance.name, target_type.name))
continue

if tags:
for t in tags:
if t not in check_instance.tags:
logger.debug("Check not passed the tag control: {}".format(r))
continue
result.append(check_instance)
logger.debug("Check instance {} added.".format(check_instance.name))

return result


def get_checks_path():
Expand Down
Loading