forked from intel/cve-bin-tool
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add intermediate reports in cve binary tool to merge outputs from dif…
…ferent sources (intel#1169) * add --append flag in cli * refactor -a -append code * Take optional intermediate report path argument * add --tag paramter for intermediate report * add tag arg in test_output * add merge argument for intermediate reports * refactor code and autoremove temporary generated merged * reformat cli.py * update prefix for intermediate reports * Add tests for MergeReports
- Loading branch information
1 parent
1099187
commit 63f1c51
Showing
12 changed files
with
697 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,3 +17,6 @@ doc/_build | |
test/downloads/ | ||
cve_bin_tool_requirements.csv | ||
!test/condensed-downloads/*.tar.gz | ||
intermediate*.json | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
# Copyright (C) 2021 Intel Corporation | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
import json | ||
import os | ||
from datetime import datetime | ||
from logging import Logger | ||
from typing import Dict, List | ||
|
||
from .cvedb import DISK_LOCATION_DEFAULT | ||
from .error_handler import ( | ||
ErrorHandler, | ||
ErrorMode, | ||
InvalidIntermediateJsonError, | ||
InvalidJsonError, | ||
MissingFieldsError, | ||
) | ||
from .log import LOGGER | ||
from .util import DirWalk | ||
|
||
REQUIRED_INTERMEDIATE_METADATA = { | ||
"scanned_dir", | ||
"total_files", | ||
"products_without_cve", | ||
"products_with_cve", | ||
"tag", | ||
"timestamp", | ||
} | ||
|
||
|
||
class MergeReports: | ||
def __init__( | ||
self, | ||
merge_files: List[str], | ||
logger: Logger = None, | ||
error_mode=ErrorMode.TruncTrace, | ||
cache_dir=DISK_LOCATION_DEFAULT, | ||
): | ||
self.logger = logger or LOGGER.getChild(self.__class__.__name__) | ||
self.merge_files = merge_files | ||
self.all_cve_data = [] | ||
self.file_stack = [] | ||
self.error_mode = error_mode | ||
self.total_inter_files = 0 | ||
self.total_files = 0 | ||
self.products_with_cve = 0 | ||
self.products_without_cve = 0 | ||
self.cache_dir = cache_dir | ||
|
||
self.walker = DirWalk( | ||
pattern=";".join( | ||
file_path if file_path.endswith(".json") else file_path + "*.json" | ||
for file_path in self.merge_files | ||
), | ||
yield_files=True, | ||
).walk | ||
|
||
def recursive_scan(self, merge_files): | ||
"""Recursive scan all json in a directory/regex path""" | ||
for intermediate_path in merge_files: | ||
if os.path.isdir(intermediate_path): | ||
for filepath in self.walker([intermediate_path]): | ||
self.file_stack.append(filepath) | ||
yield filepath | ||
self.file_stack.pop() | ||
elif os.path.isfile(intermediate_path) and not os.path.islink( | ||
intermediate_path | ||
): | ||
self.file_stack.append(intermediate_path) | ||
yield intermediate_path | ||
self.file_stack.pop() | ||
|
||
def scan_intermediate_file(self, filename): | ||
"""Reads intermediate json file through filename and verify missing fields""" | ||
self.logger.info(f"Loading file: {filename}") | ||
|
||
missing_fields = set() | ||
with open(filename) as json_file: | ||
json_file = json_file.read() | ||
inter_data = json.loads(json_file) | ||
if not inter_data or not isinstance(inter_data, dict): | ||
with ErrorHandler(mode=self.error_mode): | ||
raise InvalidJsonError(filename) | ||
|
||
required_fields = set({"metadata", "report"}) | ||
missing_fields = required_fields - set(inter_data.keys()) | ||
|
||
if missing_fields == set(): | ||
if isinstance(inter_data["metadata"], dict): | ||
|
||
missing_fields = set(REQUIRED_INTERMEDIATE_METADATA) - set( | ||
inter_data["metadata"].keys() | ||
) | ||
if missing_fields == set(): | ||
|
||
if isinstance(inter_data["report"], list): | ||
self.logger.info( | ||
f"Adding data from {os.path.basename(filename)} with timestamp {inter_data['metadata']['timestamp']}" | ||
) | ||
self.total_inter_files += 1 | ||
return inter_data | ||
|
||
if missing_fields != set(): | ||
with ErrorHandler(mode=self.error_mode): | ||
raise MissingFieldsError(f"{missing_fields} are required fields") | ||
|
||
with ErrorHandler(mode=self.error_mode): | ||
raise InvalidIntermediateJsonError(filename) | ||
|
||
def merge_reports(self): | ||
"""Merge valid intermediate dictionaries""" | ||
|
||
for inter_file in self.recursive_scan(self.merge_files): | ||
# Remove duplicate paths from cve-entries | ||
self.all_cve_data.append(self.scan_intermediate_file(inter_file)) | ||
|
||
if self.all_cve_data: | ||
self.all_cve_data = self.remove_intermediate_duplicates() | ||
merged_file_path = self.save_merged_intermediate() | ||
return merged_file_path | ||
|
||
self.logger.error("No valid Intermediate reports found!") | ||
return "" | ||
|
||
def save_merged_intermediate(self): | ||
"""Save a temporary merged report in .cache/cve-bin-tool""" | ||
|
||
if not os.path.isdir(self.cache_dir): | ||
os.makedirs(self.cache_dir) | ||
|
||
now = datetime.now().strftime("%Y-%m-%d.%H-%M-%S") | ||
filename = os.path.join(self.cache_dir, f"merged-{now}.json") | ||
with open(filename, "w") as f: | ||
json.dump(self.all_cve_data, f, indent=" ") | ||
|
||
return filename | ||
|
||
def remove_intermediate_duplicates(self) -> List[Dict[str, str]]: | ||
"""Returns a list of dictionary with same format as cve-bin-tool json output""" | ||
|
||
output = {} | ||
for inter_data in self.all_cve_data: | ||
self.products_with_cve += inter_data["metadata"]["products_with_cve"] | ||
self.products_without_cve += inter_data["metadata"]["products_without_cve"] | ||
for cve in inter_data["report"]: | ||
if cve["cve_number"] != "UNKNOWN": | ||
if cve["cve_number"] not in output: | ||
output[cve["cve_number"]] = cve | ||
self.total_files += len(cve["paths"].split(",")) | ||
else: | ||
path_list = output[cve["cve_number"]]["paths"].split(",") | ||
self.total_files -= len(path_list) | ||
path_list.extend(cve["paths"].split(",")) | ||
# remove duplicate paths(if any) | ||
path_list = list(set(path_list)) | ||
self.total_files += len(path_list) | ||
output[cve["cve_number"]]["path"] = path_list | ||
|
||
return list(output.values()) |
Oops, something went wrong.