From 947fceb8ea48259b823b0300502d52b1928ca0ab Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Thu, 16 Mar 2023 17:25:57 +0530 Subject: [PATCH] Support advisory comparison across different DataSources - Add debug flag --vers to display equivalent normalized versions for corresponding native ranges. - Add debug flag --no-compare to run the CLI without comparison. - Auto-adjust text table width based on the terminal width. Signed-off-by: Keshav Priyadarshi --- vulntotal/vulntotal_cli.py | 341 +++++++++++++++++++++++++++++++++---- 1 file changed, 306 insertions(+), 35 deletions(-) diff --git a/vulntotal/vulntotal_cli.py b/vulntotal/vulntotal_cli.py index 4106b0f98..f820e7b2c 100755 --- a/vulntotal/vulntotal_cli.py +++ b/vulntotal/vulntotal_cli.py @@ -11,6 +11,8 @@ import concurrent.futures import json +import math +import os import pydoc import click @@ -19,9 +21,12 @@ import yaml from packageurl import PackageURL from texttable import Texttable +from univers.normalized_range import NormalizedVersionRanges +from vulnerabilities.package_managers import VERSION_API_CLASSES_BY_PACKAGE_TYPE from vulntotal.datasources import DATASOURCE_REGISTRY from vulntotal.validator import VendorData +from vulntotal.vulntotal_utils import get_item @click.command() @@ -42,8 +47,6 @@ metavar="FILE", help="Write output as YAML to FILE. Use '-' to print on screen.", ) - -# hidden debug options @click.option( "-l", "--list", @@ -53,6 +56,8 @@ required=False, help="List available datasources.", ) + +# hidden debug options @click.option( "-e", "--enable", @@ -88,7 +93,7 @@ hidden=True, multiple=False, required=False, - help="Report the raw responses from each datasource. Used for debugging. Used for debugging.", + help="Report the raw responses from each datasource. Used for debugging.", ) @click.option( "--no-threading", @@ -118,6 +123,24 @@ required=False, help="Do not group output by vulnerability/CVE. Used for debugging.", ) +@click.option( + "--vers", + "vers", + is_flag=True, + hidden=True, + multiple=False, + required=False, + help="Show normalized vers. Used for debugging.", +) +@click.option( + "--no-compare", + "no_compare", + is_flag=True, + hidden=True, + multiple=False, + required=False, + help="Do not compare datasource output. Used for debugging.", +) @click.help_option("-h", "--help") def handler( purl, @@ -131,6 +154,8 @@ def handler( json_output, yaml_output, no_group, + vers, + no_compare, ): """ Search all the available vulnerabilities databases for the package-url PURL. @@ -155,16 +180,16 @@ def handler( get_raw_response(purl, active_datasource) elif json_output: - write_json_output(purl, active_datasource, json_output, no_threading) + write_json_output(purl, active_datasource, json_output, no_threading, no_group, no_compare) elif yaml_output: - write_yaml_output(purl, active_datasource, yaml_output, no_threading) + write_yaml_output(purl, active_datasource, yaml_output, no_threading, no_group, no_compare) elif no_group: prettyprint(purl, active_datasource, pagination, no_threading) elif purl: - prettyprint_group_by_cve(purl, active_datasource, pagination, no_threading) + prettyprint_group_by_cve(purl, active_datasource, pagination, no_threading, vers, no_compare) def get_valid_datasources(datasources): @@ -209,6 +234,9 @@ def list_supported_ecosystem(datasources): def formatted_row(datasource, advisory): + if not advisory: + return [datasource.upper(), "", "", ""] + aliases = "\n".join(advisory.aliases) affected = " ".join(advisory.affected_versions) fixed = " ".join(advisory.fixed_versions) @@ -253,16 +281,24 @@ def run_datasources(purl, datasources, no_threading=False): return vulnerabilities -class VendorDataEncoder(json.JSONEncoder): +class VulntotalEncoder(json.JSONEncoder): def default(self, obj): - if isinstance(obj, VendorData): + if isinstance(obj, VendorData) or isinstance(obj, NormalizedVersionRanges): return obj.to_dict() return json.JSONEncoder.default(self, obj) -def write_json_output(purl, datasources, json_output, no_threading): +def write_json_output(purl, datasources, json_output, no_threading, no_group, no_compare): + results = {"purl": purl, "datasources": list(datasources.keys())} + vulnerabilities = run_datasources(purl, datasources, no_threading) - return json.dump(vulnerabilities, json_output, cls=VendorDataEncoder, indent=2) + if no_group: + results.update(vulnerabilities) + else: + grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare) + results.update(grouped_by_cve) + + return json.dump(results, json_output, cls=VulntotalEncoder, indent=2) def noop(self, *args, **kw): @@ -272,9 +308,38 @@ def noop(self, *args, **kw): yaml.emitter.Emitter.process_tag = noop -def write_yaml_output(purl, datasources, yaml_output, no_threading): +def write_yaml_output(purl, datasources, yaml_output, no_threading, no_group, no_compare): + results = {"purl": purl, "datasources": list(datasources.keys())} + vulnerabilities = run_datasources(purl, datasources, no_threading) - return yaml.dump(vulnerabilities, yaml_output, default_flow_style=False, indent=2) + if no_group: + results.update(vulnerabilities) + else: + grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare) + serialize_normalized_range(grouped_by_cve, no_compare) + results.update(grouped_by_cve) + + return yaml.dump(results, yaml_output, default_flow_style=False, indent=2, sort_keys=False) + + +def serialize_normalized_range(grouped_by_cve, no_compare): + if no_compare: + return + for cve, value in grouped_by_cve.items(): + if cve in ("NOCVE", "NOADVISORY"): + continue + for datasource, resources in value.items(): + for resource in resources: + affected_versions = resource.get("normalized_affected_versions") + fixed_versions = resource.get("normalized_fixed_versions") + if isinstance(affected_versions, NormalizedVersionRanges): + resource["normalized_affected_versions"] = [ + str(vers) for vers in affected_versions.version_ranges + ] + if isinstance(fixed_versions, NormalizedVersionRanges): + resource["normalized_fixed_versions"] = [ + str(vers) for vers in fixed_versions.version_ranges + ] def prettyprint(purl, datasources, pagination, no_threading): @@ -285,11 +350,7 @@ def prettyprint(purl, datasources, pagination, no_threading): active_datasources = ", ".join(sorted([x.upper() for x in datasources.keys()])) metadata = f"PURL: {purl}\nActive datasources: {active_datasources}\n\n" - table = Texttable() - table.set_cols_dtype(["t", "t", "t", "t"]) - table.set_cols_align(["c", "l", "l", "l"]) - table.set_cols_valign(["t", "t", "a", "t"]) - table.header(["DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) + table = get_texttable(no_group=True) for datasource, advisories in vulnerabilities.items(): if not advisories: @@ -302,47 +363,255 @@ def prettyprint(purl, datasources, pagination, no_threading): pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) -def group_by_cve(vulnerabilities): +NORMALIZED_VERSION_RANGE_BY_DATASOURCE = { + "deps": NormalizedVersionRanges.from_discrete, + "github": NormalizedVersionRanges.from_github, + "gitlab": NormalizedVersionRanges.from_gitlab, + "oss_index": None, + "osv": NormalizedVersionRanges.from_discrete, + "snyk": NormalizedVersionRanges.from_snyk, + "vulnerablecode": NormalizedVersionRanges.from_discrete, +} + + +def group_by_cve(vulnerabilities, purl, no_compare): grouped_by_cve = {} - nocve = [] - noadvisory = [] + nocve = {} + noadvisory = {} for datasource, advisories in vulnerabilities.items(): if not advisories: - noadvisory.append([datasource.upper(), "", "", ""]) - + if datasource not in noadvisory: + noadvisory[datasource] = [] + noadvisory[datasource].append( + { + "advisory": None, + } + ) for advisory in advisories: cve = next((x for x in advisory.aliases if x.startswith("CVE")), None) if not cve: - nocve.append(formatted_row(datasource, advisory)) + if datasource not in nocve: + nocve[datasource] = [] + nocve[datasource].append( + { + "advisory": advisory, + } + ) continue if cve not in grouped_by_cve: - grouped_by_cve[cve] = [] - grouped_by_cve[cve].append(formatted_row(datasource, advisory)) + grouped_by_cve[cve] = {} + + if datasource not in grouped_by_cve[cve]: + grouped_by_cve[cve][datasource] = [] + grouped_by_cve[cve][datasource].append( + { + "advisory": advisory, + } + ) grouped_by_cve["NOCVE"] = nocve grouped_by_cve["NOADVISORY"] = noadvisory + if not no_compare: + normalize_version_ranges(grouped_by_cve, purl) + compare(grouped_by_cve) return grouped_by_cve -def prettyprint_group_by_cve(purl, datasources, pagination, no_threading): +def normalize_version_ranges(grouped_by_cve, purl): + package_versions = get_all_versions(purl) + + for cve, value in grouped_by_cve.items(): + if cve in ("NOCVE", "NOADVISORY"): + continue + for datasource, resources in value.items(): + for resource in resources: + advisory = resource["advisory"] + normalized_affected_versions = [] + normalized_fixed_versions = [] + datasource_normalizer = NORMALIZED_VERSION_RANGE_BY_DATASOURCE.get(datasource) + if datasource_normalizer and advisory.affected_versions: + try: + normalized_affected_versions = datasource_normalizer( + advisory.affected_versions, purl.type, package_versions + ) + except Exception as err: + normalized_affected_versions = [err] + + if advisory.fixed_versions: + try: + normalized_fixed_versions = NormalizedVersionRanges.from_discrete( + advisory.fixed_versions, purl.type, package_versions + ) + except Exception as err: + normalized_fixed_versions = [err] + + resource["normalized_affected_versions"] = normalized_affected_versions + resource["normalized_fixed_versions"] = normalized_fixed_versions + + +def compare(grouped_by_cve): + for cve, value in grouped_by_cve.items(): + if cve in ("NOCVE", "NOADVISORY"): + continue + sources = list(value.keys()) + board = {source: {} for source in sources} + """ + A typical board after comparison may look like this. + + board = { + "github":{ + "snyk": 0, + "gitlab": 1, + "deps": 0, + "vulnerablecode": 1, + "osv": 1, + "oss_index": 1, + }, + "snyk":{ + "github": 0, + "gitlab": 1, + "deps": 0, + "vulnerablecode": 1, + "osv": 1, + "oss_index": 1, + }, + ... + } + """ + for datasource, resources in value.items(): + normalized_affected_versions_a = get_item(resources, 0, "normalized_affected_versions") + normalized_fixed_versions_a = get_item(resources, 0, "normalized_fixed_versions") + if normalized_fixed_versions_a and normalized_affected_versions_a: + for source in sources: + if ( + source == datasource + or source in board[datasource] + or datasource in board[source] + ): + continue + normalized_affected_versions_b = get_item( + value, source, 0, "normalized_affected_versions" + ) + normalized_fixed_versions_b = get_item( + value, source, 0, "normalized_fixed_versions" + ) + board[datasource][source] = 0 + board[source][datasource] = 0 + if ( + normalized_fixed_versions_a == normalized_fixed_versions_b + and normalized_affected_versions_a == normalized_affected_versions_b + ): + board[datasource][source] = 1 + board[source][datasource] = 1 + + maximum = max([sum(list(table.values())) for table in board.values()]) + datasource_count = len(sources) + for datasource, table in board.items(): + if maximum == 0: + # NA if only one advisory else TC aka `Total Collision`. + value[datasource][0]["score"] = "TC" if datasource_count > 1 else "NA" + continue + value[datasource][0]["score"] = (sum(list(table.values())) / maximum) * 100 + + +def prettyprint_group_by_cve(purl, datasources, pagination, no_threading, vers, no_compare): vulnerabilities = run_datasources(purl, datasources, no_threading) if not vulnerabilities: return - grouped_by_cve = group_by_cve(vulnerabilities) + grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare) active_datasource = ", ".join(sorted([x.upper() for x in datasources.keys()])) metadata = f"PURL: {purl}\nActive DataSources: {active_datasource}\n\n" + table = get_texttable(no_compare=no_compare) + + for cve, value in grouped_by_cve.items(): + for datasource, resources in value.items(): + row = [cve] + formatted_row(datasource, resources[0].get("advisory")) + if not no_compare: + row.append(resources[0].get("score", "NA")) + + table.add_row(row) + + if not no_compare and vers and "score" in resources[0]: + na_affected = get_item(resources, 0, "normalized_affected_versions") + na_fixed = get_item(resources, 0, "normalized_fixed_versions") + na_affected = ( + na_affected.version_ranges + if isinstance(na_affected, NormalizedVersionRanges) + else na_affected + ) + na_fixed = ( + na_fixed.version_ranges + if isinstance(na_fixed, NormalizedVersionRanges) + else na_fixed + ) + na_affected = "\n".join([str(i) for i in na_affected]) + na_fixed = "\n".join([str(i) for i in na_fixed]) + table.add_row(["", "", "", na_affected, na_fixed, ""]) + + pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) + + +def strip_leading_v(version): + if version.startswith("v"): + return version[1:] + return version + + +def get_texttable(no_group=False, no_compare=False): + quantum = 100 / 125 + terminal_width = os.get_terminal_size().columns + line_factor = terminal_width / 100 + + column_5x = math.floor(5 * quantum * line_factor) + column_15x = math.floor(15 * quantum * line_factor) + column_20x = math.floor(20 * quantum * line_factor) + table = Texttable() - table.set_cols_dtype(["a", "a", "a", "a", "a"]) - table.set_cols_align(["l", "l", "l", "l", "l"]) - table.set_cols_valign(["t", "t", "t", "a", "t"]) - table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) - for cve, advisories in grouped_by_cve.items(): - for count, advisory in enumerate(advisories): - table.add_row([cve] + advisory) + if no_group: + table.set_cols_dtype(["t", "t", "t", "t"]) + table.set_cols_align(["c", "l", "l", "l"]) + table.set_cols_valign(["t", "t", "a", "t"]) + table.set_cols_width([column_20x, column_20x, column_20x, column_20x]) + table.header(["DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) + return table + + if no_compare: + table.set_cols_dtype(["a", "a", "a", "a", "a"]) + table.set_cols_align(["l", "l", "l", "l", "l"]) + table.set_cols_valign(["t", "t", "t", "a", "t"]) + table.set_cols_width([column_20x, column_15x, column_20x, column_20x, column_20x]) + table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) + return table + + table.set_cols_dtype(["a", "a", "a", "a", "a", "a"]) + table.set_cols_align(["l", "l", "l", "l", "l", "l"]) + table.set_cols_valign(["t", "t", "t", "a", "t", "t"]) + table.set_cols_width([column_20x, column_15x, column_20x, column_20x, column_20x, column_5x]) + table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED", "SCORE"]) + + return table + + +def get_all_versions(purl: PackageURL): + if purl.type not in VERSION_API_CLASSES_BY_PACKAGE_TYPE: + return - pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) + versionAPI = None + package_name = None + + if purl.type == "maven": + package_name = f"{purl.namespace}:{purl.name}" + if purl.type in ("composer", "golang", "github"): + package_name = f"{purl.namespace}/{purl.name}" + if purl.type in ("nuget", "pypi", "gem", "npm", "hex", "deb", "cargo"): + package_name = purl.name + + versionAPI = VERSION_API_CLASSES_BY_PACKAGE_TYPE.get(purl.type)() + all_versions = versionAPI.fetch(package_name) + + return [strip_leading_v(package_version.value) for package_version in all_versions] if __name__ == "__main__": @@ -366,5 +635,7 @@ def prettyprint_group_by_cve(purl, datasources, pagination, no_threading): --no-threading Run DataSources sequentially. -p, --pagination Enable default pagination. --no-group Don't group by CVE. + --vers Show normalized vers. + --no-compare Do not compare datasource output. -h, --help Show this message and exit. """