diff --git a/.gitignore b/.gitignore index 894a44c..a127971 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,6 @@ venv.bak/ # mypy .mypy_cache/ + +# vscode +.vscode/ diff --git a/centrifuge_cli/main.py b/centrifuge_cli/main.py index 3bc6f3d..e00ab78 100644 --- a/centrifuge_cli/main.py +++ b/centrifuge_cli/main.py @@ -393,9 +393,10 @@ def binary_hardening(cli): @report.command(name='check-policy') @click.option('--policy-yaml', metavar='FILE', type=click.Path(), help='Centrifuge policy yaml file.', required=True) +@click.option('--report-template', metavar='FILE', type=click.Path(), help='Policy report template file.', required=False) @click.pass_context @pass_cli -def check_policy(cli, ctx, policy_yaml): +def check_policy(cli, ctx, policy_yaml, report_template): outfmt = cli.outfmt cli.outfmt = 'json' cli.echo_enabled = False @@ -406,19 +407,24 @@ def check_policy(cli, ctx, policy_yaml): guardian_json = json.loads(ctx.invoke(guardian)) code_summary_json = json.loads(ctx.invoke(code_summary)) passhash_json = json.loads(ctx.invoke(passhash)) + info_json = json.loads(ctx.invoke(info)) policy_obj = CentrifugePolicyCheck(certificates_json, private_keys_json, binary_hardening_json, guardian_json, code_summary_json, - passhash_json) + passhash_json, + info_json) policy_obj.check_rules(policy_yaml) - result = policy_obj.generate_csv() - if outfmt == 'json': + if report_template: + result = policy_obj.generate_report(report_template) + elif outfmt == 'json': result = policy_obj.generate_json() + else: + result = policy_obj.generate_csv() cli.echo_enabled = True cli.outfmt = outfmt diff --git a/centrifuge_cli/policy.py b/centrifuge_cli/policy.py index df946f4..79447be 100755 --- a/centrifuge_cli/policy.py +++ b/centrifuge_cli/policy.py @@ -8,38 +8,46 @@ import time import yaml import click +import datetime +import chevron CSV_HEADER = ["Policy Name", "Compliant"] POLICY_DETAIL_MAPPING = { "certificates": { "name": "Expired Certificates", "method": "checkCertificateRule", - "status": "Fail" + "status": "Fail", + "reasons": [] }, "privateKeys": { "name": "Private Keys", "method": "checkPrivateKeyRule", - "status": "Fail" + "status": "Fail", + "reasons": [] }, "passwordHashes": { "name": "Weak Password Algorithms", "method": "checkPasswordHashRule", - "status": "Fail" + "status": "Fail", + "reasons": [] }, "code": { "name": "Code Flaws", "method": "checkCodeFlawsRule", - "status": "Fail" + "status": "Fail", + "reasons": [] }, "guardian": { "name": "CVE Threshold", "method": "checkGuardianRule", - "status": "Fail" + "status": "Fail", + "reasons": [] }, "binaryHardening": { "name": "Binary Hardness", "method": "checkBinaryHardeningRule", - "status": "Fail" + "status": "Fail", + "reasons": [] } } POLICIES = [ @@ -63,14 +71,15 @@ def __init__(self, guardian_json, code_summary_json, passhash_json, - verbose=False): + info_json): self.certificates_json = certificates_json self.private_keys_json = private_keys_json self.binary_hardening_json = binary_hardening_json self.guardian_json = guardian_json self.code_summary_json = code_summary_json self.passhash_json = passhash_json - self.verbose = verbose + self.info_json = info_json + self.verbose = False # set to True to generate debug logging def verboseprint(self, *args): """ @@ -81,69 +90,65 @@ def verboseprint(self, *args): def match_regex_against_path(self, exception_regex, path_list): """ - Method to match the regex with list of paths. Paths are unix system path. + Method to match the list of regexes with list of paths. Paths are unix system path. """ - exception_in_path = [] + exceptions_in_path = [] if exception_regex: for regex in exception_regex: if regex: - exception_in_path = list( - filter(lambda path: re.search(regex, path), - list(map(lambda path: path, path_list))) - ) - return exception_in_path + exceptions = list( + filter(lambda path: re.search(regex, path), path_list)) + exceptions_in_path = exceptions_in_path + exceptions + return exceptions_in_path def checkCertificateRule(self, value): - count = 0 - - self.verboseprint("Checking Certificate Rule..") + self.verboseprint("Checking Certificate Rule...") + rule_passed = True + reasons = [] json_data = self.certificates_json if not value.get('expired', {}).get('allowed'): if json_data.get("count") > 0: # Remove exceptions from results if any for cert in json_data.get("results"): exceptions = value.get("exceptions", []) - exception_in_path = self.match_regex_against_path(exceptions, cert["paths"]) - if exception_in_path: - for path_ex in exception_in_path: + exceptions_in_path = self.match_regex_against_path(exceptions, cert["paths"]) + if exceptions_in_path: + for path_ex in exceptions_in_path: + self.verboseprint(f'...skipping certificate found at {path_ex}') cert["paths"].remove(path_ex) if cert["paths"]: # Check if certificate is expired. cert_expiry_timestamp = int(cert["validityEnd"][:-3]) if int(time.time()) > cert_expiry_timestamp: - count += 1 - else: - raise RuntimeError(f'Certificate {cert["rflid"]} is exception') - if count > 0: - return "Fail" - else: - return "Pass" - else: - return "Pass" + rule_passed = False + for path in cert["paths"]: + reason = f'certificate expired at {path}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + return rule_passed, reasons def checkPrivateKeyRule(self, value): - count = 0 - + self.verboseprint("Checking Private Key Rule...") + rule_passed = True + reasons = [] json_data = self.private_keys_json if not value.get('allowed'): if json_data.get("count") > 0: # Remove exceptions from results if any for pk in json_data.get("results"): exceptions = value.get("exceptions", []) - exception_in_path = self.match_regex_against_path(exceptions, pk["paths"]) - if exception_in_path: - for path_ex in exception_in_path: + exceptions_in_path = self.match_regex_against_path(exceptions, pk["paths"]) + if exceptions_in_path: + for path_ex in exceptions_in_path: + self.verboseprint(f'...skipping key found at {path_ex}') pk["paths"].remove(path_ex) - if not pk["paths"]: - count += 1 - if len(json_data["results"]) != count: - return "Fail" - else: - return "Pass" - else: - return "Pass" - else: - return "Pass" + if pk["paths"]: + rule_passed = False + for path in pk["paths"]: + reason = f'private key found at {path}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + return rule_passed, reasons def check_files_for_binary_hardening(self, obj, value): boolean_feature = ("nx", "canary", "pie", "stripped") @@ -159,80 +164,103 @@ def check_files_for_binary_hardening(self, obj, value): return True def checkBinaryHardeningRule(self, value): - count = 0 + self.verboseprint("Checking Binary Hardening Rule...") + rule_passed = True + reasons = [] + includes = value.get("include", []) json_data = self.binary_hardening_json if json_data.get("count") > 0: for obj in json_data.get("results"): - exceptions = value.get("include", []) - path_included = self.match_regex_against_path(exceptions, obj["paths"]) - if path_included: - rule_passed = self.check_files_for_binary_hardening(obj, value) - if not rule_passed: - count += 1 - else: - # Check each and every file against binary hardening - rule_passed = self.check_files_for_binary_hardening(obj, value) - if not rule_passed: - count += 1 - if count > 0: - return "Fail" - else: - return "Pass" - else: - return "Pass" + if includes: + path_included = self.match_regex_against_path(includes, obj["paths"]) + if not path_included: + continue + if not self.check_files_for_binary_hardening(obj, value): + reason = f'invalid binary hardening settings at {obj["paths"][0]}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False + return rule_passed, reasons def checkGuardianRule(self, value): - count = 0 + self.verboseprint("Checking Guardian Rule...") + rule_passed = True + reasons = [] json_data = self.guardian_json + cvssScoreThreshold = value.get("cvssScoreThreshold", 10.0) + cveAgeThreshold = value.get("cveAgeThreshold", 0) + if cveAgeThreshold > 0 and cveAgeThreshold < 1999: + current_year = int(datetime.datetime.now().year) + cveAgeThreshold = current_year - cveAgeThreshold if json_data["count"] > 0: for guardian_obj in json_data.get("results"): - if float(guardian_obj["severity"]) > value["cvssScoreThreshold"]: - count += 1 - if count > 0: - return "Fail" - else: - return "Pass" - else: - return "Pass" + if value.get("exceptions", []): + matched_path = list( + filter(lambda regex: re.search(regex, guardian_obj["path"]), value.get("exceptions")) + ) + if matched_path: + self.verboseprint(f'...skipping exception {guardian_obj["path"]}') + continue + if float(guardian_obj["severity"]) > cvssScoreThreshold: + reason = f'cvssScoreThreshold {guardian_obj["severity"]} for {guardian_obj["component"]} at {guardian_obj["path"]}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False + cveAge = int(guardian_obj["name"][4:8]) + if cveAge <= cveAgeThreshold: + reason = f'cveAgeThreshold {guardian_obj["name"]} for {guardian_obj["component"]} at {guardian_obj["path"]}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False + return rule_passed, reasons def checkCodeFlawsRule(self, value): - count = 0 + self.verboseprint("Checking Code Flaws Rule...") + rule_passed = True + reasons = [] json_data = self.code_summary_json - if not value.get("flaws", {}).get("allowed"): + allowed = value.get("flaws", {}).get("allowed") + critical = value.get("flaws", {}).get("allowCritical") + if not allowed or not critical: if json_data.get("count") > 0: for code_obj in json_data.get("results"): if value.get("exceptions", []): matched_path = list( filter(lambda regex: re.search(regex, code_obj["path"]), value.get("exceptions")) ) - # Check total flaws if code path is specified in YAML file - if matched_path and code_obj["totalFlaws"] > 0: - count += 1 - else: - if code_obj["totalFlaws"] > 0: - count += 1 - if count > 0: - return "Fail" - else: - return "Pass" - else: - return "Pass" - else: - return "Pass" + if matched_path: + self.verboseprint(f'...skipping exception {code_obj["path"]}') + continue + if not allowed and code_obj["totalFlaws"] > 0: + reason = f'{code_obj["totalFlaws"]} total flaws in {code_obj["path"]}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False + elif not critical and code_obj["emulatedFunctionCount"] > 0: + reason = f'{code_obj["emulatedFunctionCount"]} critical flaws in {code_obj["path"]}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False + return rule_passed, reasons def checkPasswordHashRule(self, value): - count = 0 + self.verboseprint("Checking Password Hash Rule...") + rule_passed = True + reasons = [] json_data = self.passhash_json if json_data.get("count") > 0: + if not value.get("allowUserAccounts", True): + reason = f'allowUserAccounts - identified {json_data.get("count")} accounts' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False for hash_obj in json_data.get("results"): if hash_obj.get("algorithm") in value.get("weakAlgorithms", []): - count += 1 - if count > 0: - return "Fail" - else: - return "Pass" - else: - return "Pass" + reason = f'weakAlgorithm {hash_obj["algorithm"]} for user {hash_obj["username"]}' + reasons.append(reason) + self.verboseprint(f'...failing: {reason}') + rule_passed = False + return rule_passed, reasons def call_policy_method(self, policy_name, ruledef): policy_method = getattr(self, POLICY_DETAIL_MAPPING.get(policy_name).get("method")) @@ -243,57 +271,64 @@ def checkRule(self, name, ruledef): Method which calls the respective rule checking function for rule name and populates the compliant of policy. """ - if name == 'privateKeys': - policy_status = self.call_policy_method(name, ruledef) - POLICY_DETAIL_MAPPING.get(name).update({"status": policy_status}) - elif name == 'certificates': - policy_status = self.call_policy_method(name, ruledef) - POLICY_DETAIL_MAPPING.get(name).update({"status": policy_status}) - elif name == 'passwordHashes': - policy_status = self.call_policy_method(name, ruledef) - POLICY_DETAIL_MAPPING.get(name).update({"status": policy_status}) - elif name == 'code': - policy_status = self.call_policy_method(name, ruledef) - POLICY_DETAIL_MAPPING.get(name).update({"status": policy_status}) - elif name == 'guardian': - policy_status = self.call_policy_method(name, ruledef) - POLICY_DETAIL_MAPPING.get(name).update({"status": policy_status}) - elif name == 'binaryHardening': - policy_status = self.call_policy_method(name, ruledef) + if name in POLICY_DETAIL_MAPPING: + rule_passed, reasons = self.call_policy_method(name, ruledef) + policy_status = "Pass" if rule_passed else "Fail" POLICY_DETAIL_MAPPING.get(name).update({"status": policy_status}) + POLICY_DETAIL_MAPPING.get(name).update({"reasons": reasons}) else: raise RuntimeError(f'Invalid Rule name {name}') def generate_csv(self): output = io.StringIO() - writer = csv.DictWriter(output, fieldnames=CSV_HEADER) + field_names = CSV_HEADER + field_names.append("Reasons") + writer = csv.DictWriter(output, fieldnames=field_names) writer.writeheader() - for policy, policy_detail in POLICY_DETAIL_MAPPING.items(): - writer.writerow({"Policy Name": policy_detail.get("name"), "Compliant": policy_detail.get("status")}) + for _, policy_detail in POLICY_DETAIL_MAPPING.items(): + row_data = {"Policy Name": policy_detail.get("name"), "Compliant": policy_detail.get("status")} + row_data.update({"Reasons": policy_detail.get("reasons")}) + writer.writerow(row_data) return output.getvalue() - def generate_json(self): + def build_json(self): final_result_dict = { "finalResult": "Pass", "results": [] } + final_result_dict.update({ + "info": { + "vendor": self.info_json["vendor"], + "device": self.info_json["device"], + "version": self.info_json["version"] + } + }) for policy, policy_detail in POLICY_DETAIL_MAPPING.items(): if policy_detail.get("status") != "Pass": final_result_dict["finalResult"] = "Fail" - final_result_dict.get("results").append( - { - "rule": policy, - "name": policy_detail.get("name"), - "compliant": policy_detail.get("status") - } - ) - return json.dumps(final_result_dict, indent=2, sort_keys=True) + final_result = { + "rule": policy, + "name": policy_detail.get("name"), + "compliant": policy_detail.get("status") + } + final_result.update({"reasons": policy_detail.get("reasons")}) + final_result_dict.get("results").append(final_result) + return final_result_dict + + def generate_json(self): + json_results = self.build_json() + return json.dumps(json_results, indent=2, sort_keys=True) + + def generate_report(self, report_template): + json_results = self.build_json() + with open(report_template, 'r') as f: + return chevron.render(f, json_results) def check_rules(self, config_file): with open(config_file, 'r') as stream: try: res = yaml.safe_load(stream) - for i, rule in enumerate(POLICIES): + for _, rule in enumerate(POLICY_DETAIL_MAPPING): if rule in res['rules']: self.checkRule(rule, res['rules'][rule]) else: diff --git a/docs/POLICY.md b/docs/POLICY.md index 48d699b..9371dc2 100644 --- a/docs/POLICY.md +++ b/docs/POLICY.md @@ -7,6 +7,7 @@ This command line tool takes a policy file that defines the policy rules that yo ## Prerequisites Before you begin, ensure you have met the following requirements: + * You have python 3 installed (tested with python 3.7) * You have an active Centrifuge account with a valid API authtoken * You have a completed Centrifuge report that you wish to check @@ -22,13 +23,19 @@ pip3 install centrifuge-cli ## Using centrifuge-policy-check.py ``` +# Command options: +--policy-yaml : location of policy rules file (see below) +--report-template : location of mustache-based template for compliance report + # Check your policy against Centrifuge report 1234 and output CSV format (default) centrifuge report --ufid= check-policy --policy-yaml my-policy.yml # Check your policy against Centrifuge report 1234 and output json format centrifuge --outfmt json report --ufid= check-policy --policy-yaml my-policy.yml -``` +# Check your policy against Centrifuge report 1234 and output full html compliance report using example template +centrifuge report --ufid= check-policy --policy-yaml my-policy.yml --report-template example-policy-report.mustache > compliance_report.htm +``` ## Policy Rule Definition @@ -46,7 +53,7 @@ Current policy schema version: `1.0` Refer to example rule definition files included in this repository for more examples. -#### Rule exceptions +### Rule exceptions You may only want to apply a rule to a certain set of files rather than all the files in the firmware image, or apply the rule to all files except a certain set of files you wish to ignore. Many (if not all) of the rules can be defined in this way for maximum flexibility. @@ -101,8 +108,13 @@ Firmware images containing any private keys will fail this policy check. ### Rule: Weak Password Hashes Firmware images containing any password hashes with the algorithms defined below will fail. +Set `allowUserAccounts: false` to fail if any user accounts are present, regardless of hash algorithm. ``` passwordHashes: + # whether defined user accounts are allowed or not + allowUserAccounts: true + + # any hashes with the following algorithms will fail the policy check weakAlgorithms: - des - md5 @@ -111,24 +123,39 @@ Firmware images containing any password hashes with the algorithms defined below ### Rule: Code Flaws Firmware images containing any high risk executables with code flaws will fail this policy check. -Set `allowed: false` to check against all files (except those omitted in the `exceptions` modifier. -Set `allowed: true` along with `exceptions` in order to only check a specific list of files for code flaws. +Set `allowed: false` to check against all files (except those omitted in the `exceptions` modifier). +Set `allowCritical: false` to check for only critical (emulated) flaws (except those omitted in the `exceptions` modifier) ``` code: flaws: + # allow any potential flaws allowed: true - # the policy check will fail if any of the following files contain code flaws + + # allow critical (emulated) flaws + allowCritical: false + + # optional list of files that are omitted from this rule exceptions: - /usr/local/bin/* - /opt/vendor/* ``` -### Rule: CVE Threshold +### Rule: Guardian -Firmware images containing any CVEs with a CVSS rating at or above the given threshold will fail the policy check. +Firmware images containing any CVEs with a CVSS rating at or above the given threshold or age will fail the policy check. +Use exceptions to exclude specific files from the policy check ``` guardian: - cvssScoreThreshold: 9.0 + # any CVEs found at or above this threshold will cause the policy check to fail + cvssScoreThreshold: 7.0 + + # any CVEs from this year or older will cause the policy check to fail + # either put year (i.e., 2017) or # years (i.e., 2 would fail 2018 or earlier CVEs in 2020) + cveAgeThreshold: 2 + + # optional list of files that are omitted from this rule + exceptions: + - cpio-root/bin/busybox ``` ### Rule: Binary Hardness diff --git a/docs/example-policy-lenient.yml b/docs/example-policy-lenient.yml index b4e81ef..84f84e9 100644 --- a/docs/example-policy-lenient.yml +++ b/docs/example-policy-lenient.yml @@ -4,6 +4,7 @@ rules: expired: allowed: true passwordHashes: + allowUserAccounts: true weakAlgorithms: [des, md5] privateKeys: allowed: true diff --git a/docs/example-policy-report.mustache b/docs/example-policy-report.mustache new file mode 100644 index 0000000..9989788 --- /dev/null +++ b/docs/example-policy-report.mustache @@ -0,0 +1,25 @@ + +

Centrifuge Policy Report

+Vendor: {{info.vendor}}
+Device: {{info.device}}
+Version: {{info.version}}
+ +

Overall: {{finalResult}}

+ +
    {{#results}} +
  • {{name}} : {{compliant}}

    +
      + {{#reasons}} +
    • {{.}}
    • + {{/reasons}} +
      +
    +
  • +{{/results}}
\ No newline at end of file diff --git a/docs/example-policy.yml b/docs/example-policy.yml index 79c9769..de09247 100644 --- a/docs/example-policy.yml +++ b/docs/example-policy.yml @@ -24,6 +24,9 @@ rules: - /usr/* passwordHashes: + # whether defined user accounts are allowed or not + allowUserAccounts: false + # any hashes with the following algorithms will fail the policy check weakAlgorithms: - des @@ -31,14 +34,27 @@ rules: code: flaws: - allowed: false + # allow any potential flaws + allowed: true + + # allow critical (emulated) flaws + allowCritical: false + # optional list of files that are omitted from this rule exceptions: - - /opt/vendor/* + - /opt/vendor/* guardian: # any CVEs found at or above this threshold will cause the policy check to fail - cvssScoreThreshold: 9.0 + cvssScoreThreshold: 7.0 + + # any CVEs from this year or older will cause the policy check to fail + # either put year (i.e., 2017) or # years (i.e., 2 would fail 2018 or earlier CVEs in 2020) + cveAgeThreshold: 2 + + # optional list of files that are omitted from this rule + exceptions: + - cpio-root/bin/busybox binaryHardening: # a list of hardening features required for ELF binaries @@ -50,5 +66,6 @@ rules: # optional whitelist of binaries to check for above features. # if omitted, all binaries will be checked include: + - /lib/modules/* - /opt/vendor/* - /root/* diff --git a/pyproject.toml b/pyproject.toml index f93552b..cef8a50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ pandas = "^0.25.1" dateparser = "^0.7.2" toml = "^0.10.0" pyyaml = "^5.3.1" +chevron = "^0.13.1" [tool.poetry.dev-dependencies] pytest = "^3.0"