diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index cf850dca80..6acf7200b0 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -147,6 +147,7 @@ jobs: test/test_extractor.py test/test_condensed_downloads.py test/test_package_list_parser.py + test/test_merge.py long_tests: name: Long tests on python3.8 @@ -214,6 +215,7 @@ jobs: test/test_util.py test/test_condensed_downloads.py test/test_package_list_parser.py + test/test_merge.py - name: Run Synchronous test run: > pytest -v --cov --cov-append --cov-report=xml @@ -281,6 +283,7 @@ jobs: test/test_util.py test/test_condensed_downloads.py test/test_package_list_parser.py + test/test_merge.py - name: Run Synchronous test run: > pytest -v diff --git a/.gitignore b/.gitignore index 13d12ad514..5b7881df10 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ doc/_build test/downloads/ cve_bin_tool_requirements.csv !test/condensed-downloads/*.tar.gz +intermediate*.json + + diff --git a/cve_bin_tool/cli.py b/cve_bin_tool/cli.py index 296fca0c39..ee0a511ce4 100755 --- a/cve_bin_tool/cli.py +++ b/cve_bin_tool/cli.py @@ -36,6 +36,7 @@ ) from cve_bin_tool.input_engine import InputEngine, TriageData from cve_bin_tool.log import LOGGER +from cve_bin_tool.merge import MergeReports from cve_bin_tool.output_engine import OutputEngine from cve_bin_tool.package_list_parser import PackageListParser from cve_bin_tool.util import ProductInfo @@ -143,6 +144,19 @@ def main(argv=None): choices=["low", "medium", "high", "critical"], help="minimum CVE severity to report (default: low)", ) + output_group.add_argument( + "-a", + "--append", + nargs="?", + const=True, + help="save output as intermediate report in json format", + ) + output_group.add_argument( + "-t", + "--tag", + action="store", + help="add a unique tag to differentiate between multiple intermediate reports", + ) parser.add_argument("-V", "--version", action="version", version=VERSION) parser.add_argument( "-u", @@ -162,6 +176,12 @@ def main(argv=None): action="store_true", help="skips checking for a new version", ) + parser.add_argument( + "-m", + "--merge", + action=StringToListAction, + help="comma separated intermediate reports path for merging", + ) checker_group = parser.add_argument_group("Checkers") checker_group.add_argument( @@ -197,6 +217,9 @@ def main(argv=None): "output_file": "", "html_theme": "", "package_list": "", + "append": False, + "tag": "", + "merge": None, } with ErrorHandler(mode=ErrorMode.NoTrace): @@ -233,6 +256,17 @@ def main(argv=None): ********************************************** """ LOGGER.warning(warning_nolinux) + if args["merge"]: + LOGGER.info( + "You can use -f --format and -o --output-file for saving merged intermediate reports in a file" + ) + merged_cves = MergeReports(merge_files=args["merge"]) + if args["input_file"]: + LOGGER.warning( + "Ignoring -i --input-file while merging intermediate reports" + ) + args["input_file"] = merged_cves.merge_reports() + # Creates a Object for OutputEngine # Database update related settings # Connect to the database @@ -276,6 +310,12 @@ def main(argv=None): "Please specify a directory to scan or an input file required" ) + # Output validation + if not args["append"] and args["tag"]: + LOGGER.warning( + f"Please specify -a --append to generate intermediate reports while using -t --tag" + ) + if args["directory"] and not os.path.exists(args["directory"]): parser.print_usage() with ErrorHandler(logger=LOGGER, mode=ErrorMode.NoTrace): @@ -374,10 +414,15 @@ def main(argv=None): filename=args["output_file"], themes_dir=args["html_theme"], time_of_last_update=cvedb_orig.time_of_last_update, + tag=args["tag"], products_with_cve=cve_scanner.products_with_cve, products_without_cve=cve_scanner.products_without_cve, total_files=total_files, + append=args["append"], ) + if args["merge"] and args["input_file"]: + # remove the merged json from .cache + os.remove(args["input_file"]) if not args["quiet"]: output.output_file(args["format"]) diff --git a/cve_bin_tool/error_handler.py b/cve_bin_tool/error_handler.py index fa3cfb5be1..b7a2ffc81b 100644 --- a/cve_bin_tool/error_handler.py +++ b/cve_bin_tool/error_handler.py @@ -40,6 +40,10 @@ class InvalidJsonError(Exception): """Given File is an Invalid JSON""" +class InvalidIntermediateJsonError(Exception): + """Given Intermediate File is not in valid Format""" + + class EmptyCache(Exception): """ Raised when NVD is opened when verify=False and there are no files in the @@ -177,4 +181,5 @@ def __exit__(self, exc_type, exc_val, exc_tb): CVEDataMissing: -15, InvalidCheckerError: -16, NVDRateLimit: -17, + InvalidIntermediateJsonError: -18, } diff --git a/cve_bin_tool/merge.py b/cve_bin_tool/merge.py new file mode 100644 index 0000000000..2df910e0a6 --- /dev/null +++ b/cve_bin_tool/merge.py @@ -0,0 +1,159 @@ +# Copyright (C) 2021 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +import json +import os +from datetime import datetime +from logging import Logger +from typing import Dict, List + +from .cvedb import DISK_LOCATION_DEFAULT +from .error_handler import ( + ErrorHandler, + ErrorMode, + InvalidIntermediateJsonError, + InvalidJsonError, + MissingFieldsError, +) +from .log import LOGGER +from .util import DirWalk + +REQUIRED_INTERMEDIATE_METADATA = { + "scanned_dir", + "total_files", + "products_without_cve", + "products_with_cve", + "tag", + "timestamp", +} + + +class MergeReports: + def __init__( + self, + merge_files: List[str], + logger: Logger = None, + error_mode=ErrorMode.TruncTrace, + cache_dir=DISK_LOCATION_DEFAULT, + ): + self.logger = logger or LOGGER.getChild(self.__class__.__name__) + self.merge_files = merge_files + self.all_cve_data = [] + self.file_stack = [] + self.error_mode = error_mode + self.total_inter_files = 0 + self.total_files = 0 + self.products_with_cve = 0 + self.products_without_cve = 0 + self.cache_dir = cache_dir + + self.walker = DirWalk( + pattern=";".join( + file_path if file_path.endswith(".json") else file_path + "*.json" + for file_path in self.merge_files + ), + yield_files=True, + ).walk + + def recursive_scan(self, merge_files): + """Recursive scan all json in a directory/regex path""" + for intermediate_path in merge_files: + if os.path.isdir(intermediate_path): + for filepath in self.walker([intermediate_path]): + self.file_stack.append(filepath) + yield filepath + self.file_stack.pop() + elif os.path.isfile(intermediate_path) and not os.path.islink( + intermediate_path + ): + self.file_stack.append(intermediate_path) + yield intermediate_path + self.file_stack.pop() + + def scan_intermediate_file(self, filename): + """Reads intermediate json file through filename and verify missing fields""" + self.logger.info(f"Loading file: {filename}") + + missing_fields = set() + with open(filename) as json_file: + json_file = json_file.read() + inter_data = json.loads(json_file) + if not inter_data or not isinstance(inter_data, dict): + with ErrorHandler(mode=self.error_mode): + raise InvalidJsonError(filename) + + required_fields = set({"metadata", "report"}) + missing_fields = required_fields - set(inter_data.keys()) + + if missing_fields == set(): + if isinstance(inter_data["metadata"], dict): + + missing_fields = set(REQUIRED_INTERMEDIATE_METADATA) - set( + inter_data["metadata"].keys() + ) + if missing_fields == set(): + + if isinstance(inter_data["report"], list): + self.logger.info( + f"Adding data from {os.path.basename(filename)} with timestamp {inter_data['metadata']['timestamp']}" + ) + self.total_inter_files += 1 + return inter_data + + if missing_fields != set(): + with ErrorHandler(mode=self.error_mode): + raise MissingFieldsError(f"{missing_fields} are required fields") + + with ErrorHandler(mode=self.error_mode): + raise InvalidIntermediateJsonError(filename) + + def merge_reports(self): + """Merge valid intermediate dictionaries""" + + for inter_file in self.recursive_scan(self.merge_files): + # Remove duplicate paths from cve-entries + self.all_cve_data.append(self.scan_intermediate_file(inter_file)) + + if self.all_cve_data: + self.all_cve_data = self.remove_intermediate_duplicates() + merged_file_path = self.save_merged_intermediate() + return merged_file_path + + self.logger.error("No valid Intermediate reports found!") + return "" + + def save_merged_intermediate(self): + """Save a temporary merged report in .cache/cve-bin-tool""" + + if not os.path.isdir(self.cache_dir): + os.makedirs(self.cache_dir) + + now = datetime.now().strftime("%Y-%m-%d.%H-%M-%S") + filename = os.path.join(self.cache_dir, f"merged-{now}.json") + with open(filename, "w") as f: + json.dump(self.all_cve_data, f, indent=" ") + + return filename + + def remove_intermediate_duplicates(self) -> List[Dict[str, str]]: + """Returns a list of dictionary with same format as cve-bin-tool json output""" + + output = {} + for inter_data in self.all_cve_data: + self.products_with_cve += inter_data["metadata"]["products_with_cve"] + self.products_without_cve += inter_data["metadata"]["products_without_cve"] + for cve in inter_data["report"]: + if cve["cve_number"] != "UNKNOWN": + if cve["cve_number"] not in output: + output[cve["cve_number"]] = cve + self.total_files += len(cve["paths"].split(",")) + else: + path_list = output[cve["cve_number"]]["paths"].split(",") + self.total_files -= len(path_list) + path_list.extend(cve["paths"].split(",")) + # remove duplicate paths(if any) + path_list = list(set(path_list)) + self.total_files += len(path_list) + output[cve["cve_number"]]["path"] = path_list + + return list(output.values()) diff --git a/cve_bin_tool/output_engine/__init__.py b/cve_bin_tool/output_engine/__init__.py index 1329ecc199..acf6795d9e 100644 --- a/cve_bin_tool/output_engine/__init__.py +++ b/cve_bin_tool/output_engine/__init__.py @@ -6,7 +6,7 @@ import os import time from logging import Logger -from typing import IO, Dict +from typing import IO, Dict, Union from ..cve_scanner import CVEData from ..cvedb import CVEDB @@ -17,7 +17,12 @@ from . import pdfbuilder from .console import output_console from .html import output_html -from .util import add_extension_if_not, format_output, generate_filename +from .util import ( + add_extension_if_not, + format_output, + generate_filename, + intermediate_output, +) def output_json(all_cve_data: Dict[ProductInfo, CVEData], outfile: IO): @@ -26,6 +31,29 @@ def output_json(all_cve_data: Dict[ProductInfo, CVEData], outfile: IO): json.dump(formatted_output, outfile, indent=" ") +def save_intermediate( + all_cve_data: Dict[ProductInfo, CVEData], + filename: str, + tag: str, + scanned_dir: str, + products_with_cve: int, + products_without_cve: int, + total_files: int, +): + """Save the intermediate report""" + + inter_output = intermediate_output( + all_cve_data, + tag, + scanned_dir, + products_with_cve, + products_without_cve, + total_files, + ) + with open(filename, "w") as f: + json.dump(inter_output, f, indent=" ") + + def output_csv(all_cve_data: Dict[ProductInfo, CVEData], outfile): """Output a CSV of CVEs""" formatted_output = format_output(all_cve_data) @@ -159,10 +187,12 @@ def __init__( filename: str, themes_dir: str, time_of_last_update, + tag: str, logger: Logger = None, products_with_cve: int = 0, products_without_cve: int = 0, total_files: int = 0, + append: Union[str, bool] = False, ): self.logger = logger or LOGGER.getChild(self.__class__.__name__) self.all_cve_data = all_cve_data @@ -173,6 +203,8 @@ def __init__( self.total_files = total_files self.themes_dir = themes_dir self.time_of_last_update = time_of_last_update + self.append = append + self.tag = tag def output_cves(self, outfile, output_type="console"): """Output a list of CVEs @@ -200,9 +232,34 @@ def output_cves(self, outfile, output_type="console"): else: # console, or anything else that is unrecognised output_console(self.all_cve_data, self.time_of_last_update) + if isinstance(self.append, str): + save_intermediate( + self.all_cve_data, + self.append, + self.tag, + self.scanned_dir, + self.products_with_cve, + self.products_without_cve, + self.total_files, + ) + self.logger.info(f"Output stored at {self.append}") + def output_file(self, output_type="console"): """Generate a file for list of CVE""" + if self.append: + if isinstance(self.append, str): + self.append = self.check_dir_path( + self.append, output_type="json", prefix="intermediate" + ) + self.append = add_extension_if_not(self.append, "json") + self.append = self.check_file_path( + self.append, output_type="json", prefix="intermediate" + ) + else: + # file path for intermediate report not given + self.append = generate_filename("json", "intermediate") + if output_type == "console": # short circuit file opening logic if we are actually # just writing to stdout @@ -216,15 +273,7 @@ def output_file(self, output_type="console"): # check and add if the filename doesn't contain extension self.filename = add_extension_if_not(self.filename, output_type) - # check if the file already exists - if os.path.isfile(self.filename): - self.logger.warning( - f"Failed to write at '{self.filename}'. File already exists" - ) - self.logger.info( - "Generating a new filename with Default Naming Convention" - ) - self.filename = generate_filename(output_type) + self.filename = self.check_file_path(self.filename, output_type) # try opening that file with ErrorHandler(mode=ErrorMode.Ignore) as e: @@ -250,3 +299,25 @@ def output_file(self, output_type="console"): mode = "wb" with open(self.filename, mode) as f: self.output_cves(f, output_type) + + def check_file_path(self, filepath: str, output_type: str, prefix: str = "output"): + # check if the file already exists + if os.path.isfile(filepath): + self.logger.warning(f"Failed to write at '{filepath}'. File already exists") + self.logger.info("Generating a new filename with Default Naming Convention") + filepath = generate_filename(output_type, prefix) + + return filepath + + def check_dir_path( + self, filepath: str, output_type: str, prefix: str = "intermediate" + ): + + if os.path.isdir(filepath): + self.logger.info( + f"Generating a new filename with Default Naming Convention in directory path {filepath}" + ) + filename = os.path.basename(generate_filename(output_type, prefix)) + filepath = os.path.join(filepath, filename) + + return filepath diff --git a/cve_bin_tool/output_engine/util.py b/cve_bin_tool/output_engine/util.py index 471f816115..1ef039bf44 100644 --- a/cve_bin_tool/output_engine/util.py +++ b/cve_bin_tool/output_engine/util.py @@ -8,12 +8,12 @@ import os from collections import defaultdict from datetime import datetime -from typing import DefaultDict, Dict, List +from typing import DefaultDict, Dict, List, Union from ..util import CVE, CVEData, ProductInfo, Remarks -def generate_filename(extension: str) -> str: +def generate_filename(extension: str, prefix: str = "output") -> str: """ summary: Generate a unique filename with extension provided. Function use inbuilt datetime function to generate unique filename. @@ -25,9 +25,11 @@ def generate_filename(extension: str) -> str: str: unique generated filename """ now = datetime.now().strftime("%Y-%m-%d.%H-%M-%S") + filename = os.path.abspath( - os.path.join(os.getcwd(), f"output.cve-bin-tool.{now}.{extension}") + os.path.join(os.getcwd(), f"{prefix}.cve-bin-tool.{now}.{extension}") ) + return filename @@ -74,6 +76,53 @@ def format_output(all_cve_data: Dict[ProductInfo, CVEData]) -> List[Dict[str, st return formatted_output +def intermediate_output( + all_cve_data: Dict[ProductInfo, CVEData], + tag: str, + scanned_dir: str, + products_with_cve: int, + products_without_cve: int, + total_files: int, +) -> Dict[Dict[str, Union[str, int]], List[Dict[str, str]]]: + """ + summary: Generate an intermediate output in the list of dictionary format with some metadata. + Returns: + formatted_output: Dict[Dict[str, Union[str, int]], List[Dict[str, str]]] + - example: { + metadata: { + timestamp: 2021-03-24T11:07:55Z, + "tag": "backend", + "scanned_dir": "/home/project/binaries", + "products_with_cve": 139, + "products_without_cve": 2, + "total_files": 49 + } + report: [ + { + "vendor": "haxx" + "product": "curl", + "version": "1.2.1", + "cve_number": "CVE-1234-1234", + "severity": "LOW" + }, + ... + ] + } + """ + + return { + "metadata": { + "timestamp": datetime.now().strftime("%Y-%m-%d.%H-%M-%S"), + "tag": tag, + "scanned_dir": scanned_dir, + "products_with_cve": products_with_cve, + "products_without_cve": products_without_cve, + "total_files": total_files, + }, + "report": format_output(all_cve_data), + } + + def add_extension_if_not(filename: str, output_type: str) -> str: """ summary: Checks if the filename ends with the extension and if not diff --git a/test/json/bad_intermediate.json b/test/json/bad_intermediate.json new file mode 100644 index 0000000000..961465c120 --- /dev/null +++ b/test/json/bad_intermediate.json @@ -0,0 +1,83 @@ +{ + "user_info": { + "timestamp": "2021-06-18.20-47-14", + "tag": "backend", + "scanned_dir": "/home/", + "products_with_cve": 7, + "products_without_cve": 0, + "total_files": 7 + }, + "intermediate": [{ + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "remarks": 3, + "comments": "High priority need to resolve fast", + "cve_number": "CVE-2018-19664", + "severity": "CRITICAL" + }, + { + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "remarks": 2, + "comments": "Need to mitigate cves of this product", + "cve_number": "", + "severity": "HIGH" + }, + { + "vendor": "haxx", + "product": "curl", + "version": "7.59.0", + "remarks": 1, + "comments": "", + "cve_number": "", + "severity": "" + }, + { + "vendor": "haxx", + "product": "libcurl", + "version": "7.59.0", + "remarks": "", + "comments": "", + "cve_number": "", + "severity": "" + }, + { + "vendor": "mit", + "product": "kerberos_5", + "version": "5-1.15.1", + "remarks": "3", + "comments": "", + "cve_number": "", + "severity": "" + }, + { + "vendor": "mit", + "product": "kerberos", + "version": "1.15.1", + "remarks": "", + "comments": "", + "cve_number": "", + "severity": "" + }, + { + "vendor": "sun", + "product": "sunos", + "version": "5.4", + "remarks": "4", + "comments": "", + "cve_number": "", + "severity": "" + }, + { + "vendor": "ssh", + "product": "ssh2", + "version": "2.0", + "remarks": "Mitigated", + "comments": "", + "cve_number": "", + "severity": "" + } + ] +} \ No newline at end of file diff --git a/test/json/bad_metadata.json b/test/json/bad_metadata.json new file mode 100644 index 0000000000..585fe08ab9 --- /dev/null +++ b/test/json/bad_metadata.json @@ -0,0 +1,88 @@ +{ + "metadata": { + "nope": "0", + "nope2": "1", + "nope3": "2" + }, + "report": [{ + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "remarks": 3, + "comments": "High priority need to resolve fast", + "cve_number": "CVE-2018-19664", + "severity": "CRITICAL", + "paths": "" + }, + { + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "remarks": 2, + "comments": "Need to mitigate cves of this product", + "cve_number": "", + "severity": "HIGH", + "paths": "" + }, + { + "vendor": "haxx", + "product": "curl", + "version": "7.59.0", + "remarks": 1, + "comments": "", + "cve_number": "", + "severity": "", + "paths": "" + }, + { + "vendor": "haxx", + "product": "libcurl", + "version": "7.59.0", + "remarks": "", + "comments": "", + "cve_number": "", + "severity": "", + "paths": "" + }, + { + "vendor": "mit", + "product": "kerberos_5", + "version": "5-1.15.1", + "remarks": "3", + "comments": "", + "cve_number": "", + "severity": "", + "paths": "" + }, + { + "vendor": "mit", + "product": "kerberos", + "version": "1.15.1", + "remarks": "", + "comments": "", + "cve_number": "", + "severity": "", + "paths": "" + }, + { + "vendor": "sun", + "product": "sunos", + "version": "5.4", + "remarks": "4", + "comments": "", + "cve_number": "", + "severity": "", + "paths": "" + }, + { + "vendor": "ssh", + "product": "ssh2", + "version": "2.0", + "remarks": "Mitigated", + "comments": "", + "cve_number": "", + "severity": "", + "paths": "" + } + ] +} \ No newline at end of file diff --git a/test/json/test_intermediate.json b/test/json/test_intermediate.json new file mode 100644 index 0000000000..9a4f5e8df2 --- /dev/null +++ b/test/json/test_intermediate.json @@ -0,0 +1,48 @@ +{ + "metadata": { + "timestamp": "2021-06-18.20-47-14", + "tag": "backend", + "scanned_dir": "/home/", + "products_with_cve": 7, + "products_without_cve": 0, + "total_files": 7 + }, + "report": [{ + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "cve_number": "CVE-2018-19664", + "severity": "CRITICAL", + "score": "6.5", + "cvss_version": "3", + "paths": "", + "remarks": "Confirmed", + "comments": "High priority need to resolve fast" + }, + { + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "cve_number": "CVE-2018-20330", + "severity": "HIGH", + "score": "8.8", + "cvss_version": "3", + "paths": "", + "remarks": "Unexplored", + "comments": "Need to mitigate cves of this product" + }, + { + "vendor": "libjpeg-turbo", + "product": "libjpeg-turbo", + "version": "2.0.1", + "cve_number": "CVE-2020-17541", + "severity": "HIGH", + "score": "8.8", + "cvss_version": "3", + "paths": "", + "remarks": "Unexplored", + "comments": "Need to mitigate cves of this product" + } + ] + +} \ No newline at end of file diff --git a/test/test_merge.py b/test/test_merge.py new file mode 100644 index 0000000000..83d54774d1 --- /dev/null +++ b/test/test_merge.py @@ -0,0 +1,128 @@ +# Copyright (C) 2021 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +import os +import re +import shutil +import tempfile + +import pytest + +from cve_bin_tool.error_handler import ErrorMode +from cve_bin_tool.input_engine import InputEngine +from cve_bin_tool.merge import ( + REQUIRED_INTERMEDIATE_METADATA, + InvalidJsonError, + MergeReports, + MissingFieldsError, +) +from cve_bin_tool.util import ProductInfo, Remarks + + +class TestMergeReports: + + INTERMEDIATE_PATH = os.path.join(os.path.abspath(os.path.dirname(__file__)), "json") + MERGED_TRIAGE_PATH = os.path.join( + os.path.abspath(os.path.dirname(__file__)), "json" + ) + + @classmethod + def setup_class(cls): + cls.merged_cache = None + + @classmethod + def teardown_class(cls): + if cls.merged_cache: + shutil.rmtree(cls.merged_cache) + + MERGED_TRIAGE_DATA = { + ProductInfo(vendor="libjpeg-turbo", product="libjpeg-turbo", version="2.0.1"): { + "CVE-2018-19664": { + "remarks": Remarks.Confirmed, + "comments": "High priority need to resolve fast", + "severity": "CRITICAL", + }, + "paths": {""}, + "CVE-2018-20330": { + "remarks": Remarks.Unexplored, + "comments": "Need to mitigate cves of this product", + "severity": "HIGH", + }, + "CVE-2020-17541": { + "remarks": Remarks.Unexplored, + "comments": "Need to mitigate cves of this product", + "severity": "HIGH", + }, + } + } + MISSING_FIELD_REGEX = re.compile(r"({.+}) are required fields") + + @pytest.mark.parametrize( + "filepaths, exception", + (([os.path.join(INTERMEDIATE_PATH, "bad.json")], InvalidJsonError),), + ) + def test_invalid_file(self, filepaths, exception): + merged_cves = MergeReports( + merge_files=filepaths, error_mode=ErrorMode.FullTrace + ) + with pytest.raises(exception): + path = merged_cves.merge_reports() + + @pytest.mark.parametrize( + "filepaths,missing_fields", + ( + ( + [os.path.join(INTERMEDIATE_PATH, "bad_intermediate.json")], + {"metadata", "report"}, + ), + ( + [os.path.join(INTERMEDIATE_PATH, "bad_metadata.json")], + REQUIRED_INTERMEDIATE_METADATA, + ), + ), + ) + def test_missing_fields(self, filepaths, missing_fields): + merged_cves = MergeReports( + merge_files=filepaths, error_mode=ErrorMode.FullTrace + ) + with pytest.raises(MissingFieldsError) as exc: + path = merged_cves.merge_reports() + match = self.MISSING_FIELD_REGEX.search(exc.value.args[0]) + raised_fields = match.group(1) + + assert missing_fields - eval(raised_fields) == set() + + @pytest.mark.parametrize( + "filepaths, merged_data", + ( + ( + [os.path.join(INTERMEDIATE_PATH, "test_intermediate.json")], + MERGED_TRIAGE_DATA, + ), + ), + ) + def test_valid_merge(self, filepaths, merged_data): + self.merged_cache = tempfile.mkdtemp(prefix="cvemerge-") + + merged_cves = MergeReports( + merge_files=filepaths, + error_mode=ErrorMode.FullTrace, + cache_dir=self.merged_cache, + ) + merged_path = merged_cves.merge_reports() + input_engine = InputEngine(merged_path, error_mode=ErrorMode.FullTrace) + assert dict(input_engine.parse_input()) == merged_data + + @pytest.mark.parametrize( + "filepaths", + (([os.path.join(INTERMEDIATE_PATH, "test_intermediate.json")]),), + ) + def test_valid_merge_path(self, filepaths): + + merged_cves = MergeReports( + merge_files=filepaths, + error_mode=ErrorMode.FullTrace, + ) + path = merged_cves.merge_reports() + + assert os.path.isfile(path) == True diff --git a/test/test_output_engine.py b/test/test_output_engine.py index 089d7eda11..588ae314fc 100644 --- a/test/test_output_engine.py +++ b/test/test_output_engine.py @@ -99,6 +99,7 @@ def setUp(self) -> None: filename="", themes_dir="", time_of_last_update=datetime.today(), + tag="", ) self.mock_file = tempfile.NamedTemporaryFile("w+", encoding="utf-8")