From fe20e49c9664a7a1052bf6fdc3570708aec0be00 Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Tue, 30 Aug 2022 00:10:48 +0530 Subject: [PATCH] adopt new CLI Signed-off-by: Keshav Priyadarshi --- vulntotal/vulntotal_cli.py | 382 +++++++++++++++++++++++++++++++++++++ 1 file changed, 382 insertions(+) create mode 100755 vulntotal/vulntotal_cli.py diff --git a/vulntotal/vulntotal_cli.py b/vulntotal/vulntotal_cli.py new file mode 100755 index 000000000..667b706df --- /dev/null +++ b/vulntotal/vulntotal_cli.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# http://nexb.com and https://github.com/nexB/vulnerablecode/ +# The VulnTotal software is licensed under the Apache License version 2.0. +# Data generated with VulnTotal require an acknowledgment. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# When you publish or redistribute any data created with VulnTotal or any VulnTotal +# derivative work, you must accompany this data with the following acknowledgment: +# +# Generated with VulnTotal and provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# VulnTotal should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# VulnTotal is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/nexB/vulnerablecode/ for support and download. + +import concurrent.futures +import json +import pydoc +import sys + +import click +import yaml +from packageurl import PackageURL +from texttable import Texttable + +from vulntotal.datasources import DATASOURCE_REGISTRY +from vulntotal.validator import VendorData + + +@click.command() +@click.option( + "-l", + "--list", + "list_source", + is_flag=True, + multiple=False, + required=False, + help="Lists all the available DataSources.", +) +@click.option( + "-e", + "--enable", + "enable", + hidden=True, + multiple=True, + type=click.Choice(DATASOURCE_REGISTRY.keys()), + required=False, + help="Enable these datasource/s only.", +) +@click.option( + "-d", + "--disable", + "disable", + hidden=True, + multiple=True, + type=click.Choice(DATASOURCE_REGISTRY.keys()), + required=False, + help="Disable these datasource/s.", +) +@click.option( + "--ecosystem", + "ecosystem", + hidden=True, + is_flag=True, + required=False, + help="Lists ecosystem supported by active DataSources", +) +@click.option( + "--raw", + "raw_output", + is_flag=True, + hidden=True, + multiple=False, + required=False, + help="List of all the raw response from DataSources.", +) +@click.option( + "--no-threading", + "no_threading", + is_flag=True, + hidden=True, + multiple=False, + required=False, + help="Run DataSources sequentially.", +) +@click.option( + "-p", + "--pagination", + "pagination", + is_flag=True, + hidden=True, + multiple=False, + required=False, + help="Enable default pagination.", +) +@click.option( + "--json", + "json_output", + type=click.File("w"), + required=False, + metavar="FILE", + help="Write output as pretty-printed JSON to FILE. ", +) +@click.option( + "--yaml", + "yaml_output", + type=click.File("w"), + required=False, + metavar="FILE", + help="Write output as YAML to FILE. ", +) +@click.option( + "--no-group", + "no_group", + is_flag=True, + hidden=True, + multiple=False, + required=False, + help="Don't group by CVE.", +) +@click.argument("purl", required=False) +@click.help_option("-h", "--help") +def handler( + purl, + list_source, + enable, + disable, + ecosystem, + raw_output, + no_threading, + pagination, + json_output, + yaml_output, + no_group, +): + """ + Runs the PURL through all the available datasources and group vulnerability by CVEs. + Use the special '-' file name to print JSON or YAML results on screen/stdout. + """ + active_datasource = ( + get_enabled_datasource(enable) + if enable + else (get_undisabled_datasource(disable) if disable else DATASOURCE_REGISTRY) + ) + + if list_source: + list_datasources() + + elif not active_datasource: + click.echo("No datasources available!", err=True) + + elif ecosystem: + list_supported_ecosystem(active_datasource) + + elif raw_output: + if purl: + get_raw_response(purl, active_datasource) + + elif json_output: + write_json_output(purl, active_datasource, json_output, no_threading) + + elif yaml_output: + write_yaml_output(purl, active_datasource, yaml_output, no_threading) + + elif no_group: + prettyprint(purl, active_datasource, pagination, no_threading) + + elif purl: + prettyprint_group_by_cve(purl, active_datasource, pagination, no_threading) + + +def get_valid_datasources(datasources): + valid_datasources = {} + unknown_datasources = [] + for datasource in datasources: + key = datasource.lower() + try: + valid_datasources[key] = DATASOURCE_REGISTRY[key] + except KeyError: + unknown_datasources.append(key) + if unknown_datasources: + raise CommandError(f"Unknown datasource: {unknown_datasources}") + return valid_datasources + + +def get_undisabled_datasource(datasources): + disabled = get_valid_datasources(datasources) + return {key: value for key, value in DATASOURCE_REGISTRY.items() if key not in disabled} + + +def get_enabled_datasource(datasources): + return get_valid_datasources(datasources) + + +def list_datasources(): + datasources = [x.upper() for x in list(DATASOURCE_REGISTRY)] + click.echo("Currently supported datasources:") + click.echo("\n".join(sorted(datasources))) + + +def list_supported_ecosystem(datasources): + ecosystems = [] + for key, datasource in datasources.items(): + vendor_supported_ecosystem = datasource.supported_ecosystem() + ecosystems.extend([x.upper() for x in vendor_supported_ecosystem.keys()]) + + active_datasource = [x.upper() for x in datasources.keys()] + click.echo("Active DataSources: %s\n" % ", ".join(sorted(active_datasource))) + click.echo("Ecosystem supported by active datasources") + click.echo("\n".join(sorted(set(ecosystems)))) + + +def formatted_row(datasource, advisory): + aliases = "\n".join(advisory.aliases) + affected = " ".join(advisory.affected_versions) + fixed = " ".join(advisory.fixed_versions) + return [datasource.upper(), aliases, affected, fixed] + + +def get_raw_response(purl, datasources): + all_raw_responses = {} + for key, datasource in datasources.items(): + vendor = datasource() + vendor_advisories = list(vendor.datasource_advisory(PackageURL.from_string(purl))) + all_raw_responses[key] = vendor.raw_dump + click.echo(json.dumps(all_raw_responses, indent=2)) + + +def run_datasources(purl, datasources, no_threading=False): + vulnerabilities = {} + if not no_threading: + with concurrent.futures.ThreadPoolExecutor(max_workers=len(datasources)) as executor: + future_to_advisory = { + executor.submit( + datasource().datasource_advisory, PackageURL.from_string(purl) + ): datasource + for key, datasource in datasources.items() + } + for future in concurrent.futures.as_completed(future_to_advisory): + vendor = future_to_advisory[future].__name__[:-10].lower() + try: + vendor_advisories = future.result() + vulnerabilities[vendor] = [] + if vendor_advisories: + vulnerabilities[vendor].extend([advisory for advisory in vendor_advisories]) + except Exception as exc: + click.echo("%s generated an exception: %s" % (vendor, exc)) + else: + for key, datasource in datasources.items(): + vendor_advisories = datasource().datasource_advisory(PackageURL.from_string(purl)) + vulnerabilities[key] = [] + if vendor_advisories: + vulnerabilities[key].extend([advisory for advisory in vendor_advisories]) + + return vulnerabilities + + +class VendorDataEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, VendorData): + return obj.to_dict() + return json.JSONEncoder.default(self, obj) + + +def write_json_output(purl, datasources, json_output, no_threading): + vulnerabilities = run_datasources(purl, datasources, no_threading) + return json.dump(vulnerabilities, json_output, cls=VendorDataEncoder, indent=2) + + +def noop(self, *args, **kw): + pass + + +yaml.emitter.Emitter.process_tag = noop + + +def write_yaml_output(purl, datasources, yaml_output, no_threading): + vulnerabilities = run_datasources(purl, datasources, no_threading) + return yaml.dump(vulnerabilities, yaml_output, default_flow_style=False, indent=2) + + +def prettyprint(purl, datasources, pagination, no_threading): + vulnerabilities = run_datasources(purl, datasources, no_threading) + if not vulnerabilities: + return + + active_datasource = ", ".join(sorted([x.upper() for x in datasources.keys()])) + metadata = f"PURL: {purl}\nActive DataSources: {active_datasource}\n\n" + + table = Texttable() + table.set_cols_dtype(["t", "t", "t", "t"]) + table.set_cols_align(["c", "l", "l", "l"]) + table.set_cols_valign(["t", "t", "a", "t"]) + table.header(["DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) + + for datasource, advisories in vulnerabilities.items(): + if not advisories: + table.add_row([datasource.upper(), "", "", ""]) + continue + + for advisory in advisories: + table.add_row(formatted_row(datasource, advisory)) + + pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) + + +def group_by_cve(vulnerabilities): + grouped_by_cve = {} + nocve = [] + noadvisory = [] + for datasource, advisories in vulnerabilities.items(): + if not advisories: + noadvisory.append([datasource.upper(), "", "", ""]) + + for advisory in advisories: + cve = next((x for x in advisory.aliases if x.startswith("CVE")), None) + if not cve: + nocve.append(formatted_row(datasource, advisory)) + continue + if cve not in grouped_by_cve: + grouped_by_cve[cve] = [] + grouped_by_cve[cve].append(formatted_row(datasource, advisory)) + grouped_by_cve["NOCVE"] = nocve + grouped_by_cve["NOADVISORY"] = noadvisory + return grouped_by_cve + + +def prettyprint_group_by_cve(purl, datasources, pagination, no_threading): + vulnerabilities = run_datasources(purl, datasources, no_threading) + if not vulnerabilities: + return + grouped_by_cve = group_by_cve(vulnerabilities) + + active_datasource = ", ".join(sorted([x.upper() for x in datasources.keys()])) + metadata = f"PURL: {purl}\nActive DataSources: {active_datasource}\n\n" + + table = Texttable() + table.set_cols_dtype(["a", "a", "a", "a", "a"]) + table.set_cols_align(["l", "l", "l", "l", "l"]) + table.set_cols_valign(["t", "t", "t", "a", "t"]) + table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) + + for cve, advisories in grouped_by_cve.items(): + for count, advisory in enumerate(advisories): + table.add_row([cve] + advisory) + + pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) + + +if __name__ == "__main__": + handler() + +""" +Advanced Usage: vulntotal_cli.py [OPTIONS] [PURL] + + Runs the PURL through all the available datasources and group vulnerability + by CVEs. Use the special '-' file name to print JSON or YAML results on + screen/stdout. + +Options: + -l, --list Lists all the available DataSources. + --json FILE Write output as pretty-printed JSON to FILE. + --yaml FILE Write output as YAML to FILE. + -e, --enable Enable these datasource/s only. + -d, --disable Disable these datasource/s. + --ecosystem Lists ecosystem supported by active DataSources + --raw List of all the raw response from DataSources. + --no-threading Run DataSources sequentially. + -p, --pagination Enable default pagination. + --no-group Don't group by CVE. + -h, --help Show this message and exit. +"""