Skip to content

Commit 83b22b9

Browse files
authored
feat: new json format for output (#3980)
Signed-off-by: [email protected] <[email protected]>
1 parent 0881252 commit 83b22b9

File tree

5 files changed

+370
-23
lines changed

5 files changed

+370
-23
lines changed

cve_bin_tool/cli.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ def main(argv=None):
248248
note: don't use spaces between comma (',') and the output formats.
249249
"""
250250
),
251-
metavar="{csv,json,console,html,pdf}",
251+
metavar="{csv,json,json2,console,html,pdf}",
252252
default="console",
253253
)
254254
output_group.add_argument(
@@ -565,7 +565,8 @@ def main(argv=None):
565565
configs = conf.parse_config()
566566

567567
args = ChainMap(args, configs, defaults)
568-
if args["generate_config"] != "":
568+
organized_arguments = {}
569+
if args["format"] == "json2" or args["generate_config"] != "":
569570
store = parser.parse_args(argv[1:])
570571
arg_groups = {}
571572
for grp in parser._action_groups:
@@ -574,7 +575,6 @@ def main(argv=None):
574575
}
575576
arg_groups[grp.title] = argparse.Namespace(**grp_dict)
576577

577-
organized_arguments = {}
578578
for group_title, group_args in arg_groups.items():
579579
group_title = group_title.replace(" ", "_")
580580
organized_arguments[group_title] = {}
@@ -883,7 +883,7 @@ def main(argv=None):
883883

884884
output_formats = set(args["format"].split(","))
885885
output_formats = [output_format.strip() for output_format in output_formats]
886-
extensions = ["csv", "json", "console", "html", "pdf"]
886+
extensions = ["csv", "json", "console", "html", "pdf", "json2"]
887887
for output_format in output_formats:
888888
if output_format not in extensions:
889889
LOGGER.error(
@@ -1180,6 +1180,7 @@ def main(argv=None):
11801180
affected_versions=args["affected_versions"],
11811181
exploits=args["exploits"],
11821182
metrics=metrics,
1183+
organized_arguements=organized_arguments,
11831184
detailed=args["detailed"],
11841185
vex_filename=args["vex_output"],
11851186
vex_type=args["vex_type"],

cve_bin_tool/output_engine/__init__.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@
1111
from datetime import datetime
1212
from logging import Logger
1313
from pathlib import Path
14-
from typing import IO
1514

1615
from cve_bin_tool.cve_scanner import CVEData
1716
from cve_bin_tool.cvedb import CVEDB
1817
from cve_bin_tool.error_handler import ErrorHandler, ErrorMode
1918
from cve_bin_tool.log import LOGGER
2019
from cve_bin_tool.output_engine.console import output_console
2120
from cve_bin_tool.output_engine.html import output_html
21+
from cve_bin_tool.output_engine.json_output import output_json, output_json2
2222
from cve_bin_tool.output_engine.util import (
23+
ProductInfo,
24+
Remarks,
25+
VersionInfo,
2326
add_extension_if_not,
2427
format_output,
2528
format_path,
@@ -29,26 +32,10 @@
2932
intermediate_output,
3033
)
3134
from cve_bin_tool.sbom_manager.generate import SBOMGenerate
32-
from cve_bin_tool.util import ProductInfo, Remarks, VersionInfo
3335
from cve_bin_tool.version import VERSION
3436
from cve_bin_tool.vex_manager.generate import VEXGenerate
3537

3638

37-
def output_json(
38-
all_cve_data: dict[ProductInfo, CVEData],
39-
all_cve_version_info: dict[str, VersionInfo] | None,
40-
outfile: IO,
41-
detailed: bool = False,
42-
affected_versions: int = 0,
43-
metrics: bool = False,
44-
):
45-
"""Output a JSON of CVEs"""
46-
formatted_output = format_output(
47-
all_cve_data, all_cve_version_info, detailed, affected_versions, metrics
48-
)
49-
json.dump(formatted_output, outfile, indent=" ")
50-
51-
5239
def save_intermediate(
5340
all_cve_data: dict[ProductInfo, CVEData],
5441
filename: str,
@@ -684,6 +671,7 @@ def __init__(
684671
vex_type: str = "",
685672
vex_product_info: dict[str, str] = {},
686673
offline: bool = False,
674+
organized_arguements: dict = None,
687675
):
688676
"""Constructor for OutputEngine class."""
689677
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
@@ -711,6 +699,7 @@ def __init__(
711699
self.sbom_format = sbom_format
712700
self.sbom_root = sbom_root
713701
self.offline = offline
702+
self.organized_arguements = organized_arguements
714703
self.sbom_packages = {}
715704
self.vex_type = vex_type
716705
self.vex_product_info = vex_product_info
@@ -730,6 +719,18 @@ def output_cves(self, outfile, output_type="console"):
730719
self.affected_versions,
731720
self.metrics,
732721
)
722+
elif output_type == "json2":
723+
output_json2(
724+
self.all_cve_data,
725+
self.all_cve_version_info,
726+
self.time_of_last_update,
727+
outfile,
728+
self.affected_versions,
729+
self.organized_arguements,
730+
self.detailed,
731+
self.exploits,
732+
self.metrics,
733+
)
733734
elif output_type == "csv":
734735
output_csv(
735736
self.all_cve_data,
@@ -868,7 +869,9 @@ def output_file(self, output_type="console"):
868869
"Switching Back to Default Naming Convention"
869870
)
870871
self.filename = generate_filename(output_type)
871-
872+
# if extension is set to .json2 due to current code logic make it .json
873+
if self.filename.endswith(".json2"):
874+
self.filename = self.filename[:-1]
872875
# Log the filename generated
873876
self.logger.info(f"{output_type.upper()} report stored at {self.filename}")
874877

@@ -883,6 +886,8 @@ def output_file(self, output_type="console"):
883886
def check_file_path(self, filepath: str, output_type: str, prefix: str = "output"):
884887
"""Generate a new filename if file already exists."""
885888
# check if the file already exists
889+
if filepath.endswith(".json2"):
890+
filepath = filepath[:-1]
886891
if Path(filepath).is_file():
887892
self.logger.warning(f"Failed to write at '{filepath}'. File already exists")
888893
self.logger.info("Generating a new filename with Default Naming Convention")
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright (C) 2024 Intel Corporation
2+
# SPDX-License-Identifier: GPL-3.0-or-later
3+
4+
from __future__ import annotations
5+
6+
import json
7+
from datetime import datetime
8+
from typing import IO
9+
10+
from cve_bin_tool.cvedb import CVEDB
11+
from cve_bin_tool.util import CVEData, ProductInfo, VersionInfo
12+
from cve_bin_tool.version import VERSION
13+
14+
from .util import format_output, get_cve_summary
15+
16+
17+
def vulnerabilities_builder(
18+
all_cve_data, exploits, all_cve_version_info, detailed, affected_versions, metrics
19+
):
20+
"""
21+
Builds a dictionary of vulnerabilities based on the provided inputs.
22+
"""
23+
vulnerabilities = {}
24+
vulnerabilities["summary"] = get_cve_summary(all_cve_data, exploits)
25+
vulnerability_reports = []
26+
source_entries_map = {}
27+
formatted_cve_data = format_output(
28+
all_cve_data, all_cve_version_info, detailed, affected_versions, metrics
29+
)
30+
for cve_entry in formatted_cve_data:
31+
source = cve_entry["source"]
32+
if source not in source_entries_map:
33+
source_entries_map[source] = [cve_entry]
34+
else:
35+
source_entries_map[source].append(cve_entry)
36+
37+
for source, entries in source_entries_map.items():
38+
report = {"datasource": source, "entries": entries}
39+
vulnerability_reports.append(report)
40+
vulnerabilities["report"] = vulnerability_reports
41+
return vulnerabilities
42+
43+
44+
def db_entries_count():
45+
"""
46+
Retrieves the count of CVE entries from the database grouped by data source.
47+
48+
Returns:
49+
dict: A dictionary containing the count of CVE entries for each data source.
50+
"""
51+
instance = CVEDB()
52+
cursor = instance.db_open_and_get_cursor()
53+
cve_entries_check = "SELECT data_source, COUNT(*) as number FROM cve_severity GROUP BY data_source ORDER BY number DESC"
54+
cursor.execute(cve_entries_check)
55+
data_entries = {}
56+
rows = cursor.fetchall()
57+
for row in rows:
58+
source = row[0]
59+
entries = row[1]
60+
data_entries[source] = entries
61+
instance.db_close()
62+
return data_entries
63+
64+
65+
def metadata_builder(organized_parameters):
66+
"""
67+
Builds metadata dictionary based on the organized parameters.
68+
"""
69+
metadata = {}
70+
metadata["tool"] = {"name": "cve-bin-tool", "version": f"{VERSION}"}
71+
metadata["generation_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
72+
parameter = {}
73+
for key, value in organized_parameters.items():
74+
parameter_values = {}
75+
for k, v in value.items():
76+
val = v["arg_value"]
77+
parameter_values[k] = val
78+
if parameter_values:
79+
parameter[key.lower()] = parameter_values
80+
metadata["parameter"] = parameter
81+
return metadata
82+
83+
84+
def output_json(
85+
all_cve_data: dict[ProductInfo, CVEData],
86+
all_cve_version_info: dict[str, VersionInfo],
87+
outfile: IO,
88+
detailed: bool = False,
89+
affected_versions: int = 0,
90+
metrics: bool = False,
91+
):
92+
"""Output a JSON of CVEs"""
93+
formatted_output = format_output(
94+
all_cve_data, all_cve_version_info, detailed, affected_versions, metrics
95+
)
96+
json.dump(formatted_output, outfile, indent=2)
97+
98+
99+
def output_json2(
100+
all_cve_data: dict[ProductInfo, CVEData],
101+
all_cve_version_info: dict[str, VersionInfo],
102+
time_of_last_update: datetime,
103+
outfile: IO,
104+
affected_versions: int,
105+
organized_parameters: dict,
106+
detailed: bool = False,
107+
exploits: bool = False,
108+
metrics: bool = False,
109+
):
110+
"""Output a JSON of CVEs in JSON2 format"""
111+
output = {}
112+
output["$schema"] = ""
113+
output["metadata"] = metadata_builder(organized_parameters)
114+
output["database_info"] = {
115+
"last_updated": time_of_last_update.strftime("%Y-%m-%d %H:%M:%S"),
116+
"total_entries": db_entries_count(),
117+
}
118+
output["vulnerabilities"] = vulnerabilities_builder(
119+
all_cve_data,
120+
exploits,
121+
all_cve_version_info,
122+
detailed,
123+
affected_versions,
124+
metrics,
125+
)
126+
json.dump(output, outfile, indent=2)

0 commit comments

Comments
 (0)