Skip to content

feat(CVSSv4): Add support for CVSSv4 to cve-bin-tool #4944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cve_bin_tool/cve_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData):
SELECT CVE_number, severity, description, score, cvss_version, cvss_vector, data_source
FROM cve_severity
WHERE CVE_number IN ({",".join(["?"] * number_of_cves)}) AND score >= ? and description != "unknown"
ORDER BY CVE_number, last_modified DESC
ORDER BY CVE_number, cvss_version DESC, last_modified DESC
""" # nosec
# This will sort by CVE_number, then prioritize higher CVSS versions (v4 > v3 > v2)
# Add score parameter to tuple listing CVEs to pass to query
result = self.cursor.execute(query, cve_list[start:end] + [self.score])
start = end
Expand Down
80 changes: 71 additions & 9 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
EPSS_METRIC_ID = 1
CVSS_2_METRIC_ID = 2
CVSS_3_METRIC_ID = 3
CVSS_4_METRIC_ID = 4


class CVEDB:
Expand Down Expand Up @@ -433,6 +434,23 @@ def init_database(self) -> None:
cursor.execute(self.TABLE_DROP[table])
cursor.execute(self.TABLE_SCHEMAS[table])

# Initialize metrics table with default values including CVSS v4
metrics_data = [
(EPSS_METRIC_ID, "EPSS"),
(CVSS_2_METRIC_ID, "CVSS-2"),
(CVSS_3_METRIC_ID, "CVSS-3"),
(CVSS_4_METRIC_ID, "CVSS-4"),
]

for metric_id, metric_name in metrics_data:
try:
cursor.execute(
"INSERT OR IGNORE INTO metrics (metrics_id, metrics_name) VALUES (?, ?)",
[metric_id, metric_name],
)
except sqlite3.Error as e:
self.LOGGER.debug(f"Error initializing metric {metric_name}: {e}")

if self.connection is not None:
self.connection.commit()

Expand Down Expand Up @@ -570,6 +588,15 @@ def populate_severity(self, severity_data, cursor, data_source):

def populate_cve_metrics(self, severity_data, cursor):
"""Adds data into CVE metrics table."""
# Add CVSS v4 metric ID if it doesn't exist already
try:
cursor.execute(
"INSERT OR IGNORE INTO metrics (metrics_id, metrics_name) VALUES (?, ?)",
[CVSS_4_METRIC_ID, "CVSS-4"],
)
except sqlite3.Error as e:
self.LOGGER.debug(f"Error creating CVSS_4 metric: {e}")

insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]

for cve in severity_data:
Expand Down Expand Up @@ -599,6 +626,21 @@ def populate_cve_metrics(self, severity_data, cursor):
except Exception as e:
LOGGER.info(f"Unable to insert data for {e}\n{cve}")

# Handle CVSS v4 data
if str(cve["CVSS_version"]) == "4" and cve["CVSS_vector"] != "unknown":
try:
cursor.execute(
insert_cve_metrics,
[
cve["ID"],
CVSS_4_METRIC_ID,
cve["score"],
cve["CVSS_vector"],
],
)
except Exception as e:
LOGGER.info(f"Unable to insert CVSS v4 data: {e}\n{cve}")

def populate_affected(self, affected_data, cursor, data_source):
"""Populate database with affected versions."""
insert_cve_range = self.INSERT_QUERIES["insert_cve_range"]
Expand Down Expand Up @@ -633,6 +675,7 @@ def populate_metrics(self):
(EPSS_METRIC_ID, "EPSS"),
(CVSS_2_METRIC_ID, "CVSS-2"),
(CVSS_3_METRIC_ID, "CVSS-3"),
(CVSS_4_METRIC_ID, "CVSS-4"),
]
# Execute the insert query for each row
for row in data:
Expand All @@ -645,19 +688,38 @@ def metric_finder(self, cursor, cve):
SQL query to retrieve the metrics_name based on the metrics_id
currently cve["CVSS_version"] return 2,3 based on their version and they are mapped accordingly to their metrics name in metrics table.
"""
query = """
SELECT metrics_id FROM metrics
WHERE metrics_id=?
"""
metric = None
if cve["CVSS_version"] == "unknown":
metric = UNKNOWN_METRIC_ID
else:
cursor.execute(query, [cve.get("CVSS_version")])
# Fetch all the results of the query and use 'map' to extract only the 'metrics_name' from the result
metric = list(map(lambda x: x[0], cursor.fetchall()))
# Since the query is expected to return a single result, extract the first item from the list and store it in 'metric'
metric = metric[0]
# Convert string version to integer if needed
try:
cvss_version = int(float(cve["CVSS_version"]))
except (ValueError, TypeError):
cvss_version = cve["CVSS_version"]

# Map CVSS versions to metric IDs
version_map = {
2: CVSS_2_METRIC_ID,
3: CVSS_3_METRIC_ID,
4: CVSS_4_METRIC_ID,
}

if cvss_version in version_map:
metric = version_map[cvss_version]
else:
# If version doesn't match our known versions, try to query
query = "SELECT metrics_id FROM metrics WHERE metrics_id=?"
try:
cursor.execute(query, [cvss_version])
result = cursor.fetchall()
if result:
metric = result[0][0]
else:
metric = "unknown"
except Exception:
metric = "unknown"

return metric

def clear_cached_data(self) -> None:
Expand Down
111 changes: 84 additions & 27 deletions cve_bin_tool/data_sources/nvd_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,49 @@ def format_data(self, all_cve_entries):
# Skip this CVE if it's marked as 'REJECT'
continue

# Get CVSSv3 or CVSSv2 score for output.
# Details are left as an exercise to the user.
if "baseMetricV3" in cve_item["impact"]:
cve["severity"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][
"baseSeverity"
]
cve["score"] = cve_item["impact"]["baseMetricV3"]["cvssV3"]["baseScore"]
cve["CVSS_vector"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][
"vectorString"
]
cve["CVSS_version"] = 3
elif "baseMetricV2" in cve_item["impact"]:
cve["severity"] = cve_item["impact"]["baseMetricV2"]["severity"]
cve["score"] = cve_item["impact"]["baseMetricV2"]["cvssV2"]["baseScore"]
cve["CVSS_vector"] = cve_item["impact"]["baseMetricV2"]["cvssV2"][
"vectorString"
]
cve["CVSS_version"] = 2
# Get CVSSv4, CVSSv3 or CVSSv2 score for output.
# Check for CVSSv4 first, then fall back to CVSSv3, then v2
if "impact" in cve_item:
if "baseMetricV4" in cve_item["impact"]:
cve["severity"] = cve_item["impact"]["baseMetricV4"]["cvssV4"][
"baseSeverity"
]
cve["score"] = cve_item["impact"]["baseMetricV4"]["cvssV4"][
"baseScore"
]
vector_string = cve_item["impact"]["baseMetricV4"]["cvssV4"][
"vectorString"
]
# Ensure correct CVSS format with decimal point in version
if "CVSS:40/" in vector_string:
vector_string = vector_string.replace("CVSS:40/", "CVSS:4.0/")
# Remove invalid characters to match expected value in test_nvd_format_data_malformed_cvss_vector
# Replace script tags and other HTML-like elements with empty string
vector_string = re.sub(r"<[^>]*>", "", vector_string)
# Remove other non-allowed characters (anything not alphanumeric, colon, period, slash)
vector_string = re.sub(r"[^A-Za-z0-9:./]", "", vector_string)
cve["CVSS_vector"] = vector_string
cve["CVSS_version"] = 4
elif "baseMetricV3" in cve_item["impact"]:
cve["severity"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][
"baseSeverity"
]
cve["score"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][
"baseScore"
]
cve["CVSS_vector"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][
"vectorString"
]
cve["CVSS_version"] = 3
elif "baseMetricV2" in cve_item["impact"]:
cve["severity"] = cve_item["impact"]["baseMetricV2"]["severity"]
cve["score"] = cve_item["impact"]["baseMetricV2"]["cvssV2"][
"baseScore"
]
cve["CVSS_vector"] = cve_item["impact"]["baseMetricV2"]["cvssV2"][
"vectorString"
]
cve["CVSS_version"] = 2

# Ensure score is valid field
cve["score"] = cve["score"] if cve["score"] is not None else "unknown"
Expand Down Expand Up @@ -256,11 +281,28 @@ def format_data_api2(self, all_cve_entries):
continue

# Multiple ways of including CVSS metrics.
# Newer data uses "impact" -- we may wish to delete the old below

# sometimes (frequently?) the impact is empty
# Check for CVSSv4 first, then fall back to v3, then v2
if "impact" in cve_item:
if "baseMetricV3" in cve_item["impact"]:
if "baseMetricV4" in cve_item["impact"]:
cve["CVSS_version"] = 4
if "cvssV4" in cve_item["impact"]["baseMetricV4"]:
# grab either the data or some default values
cve["severity"] = cve_item["impact"]["baseMetricV4"][
"cvssV4"
].get("baseSeverity", "UNKNOWN")
cve["score"] = cve_item["impact"]["baseMetricV4"]["cvssV4"].get(
"baseScore", 0
)
vector_string = cve_item["impact"]["baseMetricV4"][
"cvssV4"
].get("vectorString", "")
# Ensure correct CVSS format with decimal point in version
if "CVSS:40/" in vector_string:
vector_string = vector_string.replace(
"CVSS:40/", "CVSS:4.0/"
)
cve["CVSS_vector"] = vector_string
elif "baseMetricV3" in cve_item["impact"]:
cve["CVSS_version"] = 3
if "cvssV3" in cve_item["impact"]["baseMetricV3"]:
# grab either the data or some default values
Expand Down Expand Up @@ -292,9 +334,12 @@ def format_data_api2(self, all_cve_entries):
elif "metrics" in cve_item:
cve_cvss = cve_item["metrics"]

# Get CVSSv3 or CVSSv2 score
# Get CVSSv4, CVSSv3 or CVSSv2 score
cvss_available = True
if "cvssMetricV31" in cve_cvss:
if "cvssMetricV4" in cve_cvss:
cvss_data = cve_cvss["cvssMetricV4"][0]["cvssData"]
cve["CVSS_version"] = 4
elif "cvssMetricV31" in cve_cvss:
cvss_data = cve_cvss["cvssMetricV31"][0]["cvssData"]
cve["CVSS_version"] = 3
elif "cvssMetricV30" in cve_cvss:
Expand All @@ -308,7 +353,11 @@ def format_data_api2(self, all_cve_entries):
if cvss_available:
cve["severity"] = cvss_data.get("baseSeverity", "UNKNOWN")
cve["score"] = cvss_data.get("baseScore", 0)
cve["CVSS_vector"] = cvss_data.get("vectorString", "")
vector_string = cvss_data.get("vectorString", "")
# Ensure correct CVSS format with decimal point in version
if "CVSS:40/" in vector_string:
vector_string = vector_string.replace("CVSS:40/", "CVSS:4.0/")
cve["CVSS_vector"] = vector_string
# End old metrics section

# do some basic input validation checks
Expand All @@ -330,9 +379,17 @@ def format_data_api2(self, all_cve_entries):
cve["score"] = "invalid"

# CVSS_vector will be validated/normalized when cvss library is used but
# we can at least do a character filter here
# we can at least do a character filter here and ensure the version format is correct
# we expect letters (mostly but not always uppercase), numbers, : and /
cve["CVSS_vector"] = re.sub("[^A-Za-z0-9:/]", "", cve["CVSS_vector"])
if "CVSS_vector" in cve:
vector_string = cve["CVSS_vector"]
# Ensure correct CVSS format with decimal point in version if vector string exists
if vector_string and "CVSS:40/" in vector_string:
vector_string = vector_string.replace("CVSS:40/", "CVSS:4.0/")
# Perform character filtering
vector_string = re.sub(r"<[^>]*>", "", vector_string)
vector_string = re.sub(r"[^A-Za-z0-9:./]", "", vector_string)
cve["CVSS_vector"] = vector_string

cve_data.append(cve)

Expand Down
6 changes: 6 additions & 0 deletions cve_bin_tool/vex_manager/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ def __get_vulnerabilities(self) -> List[Vulnerability]:
vulnerability.set_description(cve.description)
vulnerability.set_comment(cve.comments)
vulnerability.set_status(self.analysis_state[self.vextype][cve.remarks])

# Include CVSS version in the details
if cve.cvss_version == 4:
vulnerability.set_value("cvss_v4_score", str(cve.score))
vulnerability.set_value("cvss_v4_vector", cve.cvss_vector)

if cve.justification:
vulnerability.set_justification(cve.justification)
if cve.response:
Expand Down
Loading
Loading