From c1807de0b0d6fc5369e50e7063987193c3728170 Mon Sep 17 00:00:00 2001 From: I748376 Date: Thu, 29 Aug 2024 14:30:24 +0000 Subject: [PATCH] cleans up code --- prospector/pipeline/filter_entries.py | 62 ++++++++------------------- prospector/pipeline/job_creation.py | 3 -- 2 files changed, 17 insertions(+), 48 deletions(-) diff --git a/prospector/pipeline/filter_entries.py b/prospector/pipeline/filter_entries.py index 4703e7f8e..f12506278 100644 --- a/prospector/pipeline/filter_entries.py +++ b/prospector/pipeline/filter_entries.py @@ -20,10 +20,6 @@ config = parse_config_file() -# with open("./data/project_metadata.json", "r") as f: -# global match_list -# match_list = json.load(f) - def connect_to_db(): db = PostgresBackendDB( @@ -54,7 +50,7 @@ async def get_cve_data(d_time): """ with ConsoleWriter("Obtaining CVE data from NVD API") as console: - start_date, end_date = get_time_range(d_time) + start_date, end_date = _get_time_range(d_time) data = "" # Set up the URL to retrieve the latest CVE entries from NVD @@ -114,7 +110,8 @@ def save_cves_to_db(raw_data_from_nvd): db.disconnect() -async def get_cve_by_id(id): +async def get_cve_by_id(id: str): + """Obtains one CVE from the NVD database.""" nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveID={id}" async with aiohttp.ClientSession() as session: @@ -132,39 +129,9 @@ async def get_cve_by_id(id): return data -async def add_single_cve(vuln_id: str): - raw_json_cve = get_cve_by_id(vuln_id) - save_cves_to_db(raw_json_cve) - - -def write_list_to_file(lst, filename): - with open(filename, "w") as file: - for item in lst: - file.write(str(item) + "\n") - - -def csv_to_json(csv_file_path): - with open(csv_file_path, "r") as csv_file: - csv_reader = csv.reader(csv_file) - data = [] - # Skip the header row - next(csv_reader) - # Loop through the rows of the file - for row in csv_reader: - # Create a dictionary for the row data - row_data = { - "project": row[0], - "service_name": row[1], - "repository": row[2], - } - data.append(row_data) - # Convert to JSON object - json_data = json.dumps(data) - return json_data - - -def get_time_range(d_time): - # calculate the date to retrieve new entries (%Y-%m-%dT%H:%M:%S.%f%2B01:00) +def _get_time_range(d_time): + """Calculates the date to retrieve new entries + (%Y-%m-%dT%H:%M:%S.%f%2B01:00)""" date_now = datetime.datetime.now() start_date = (date_now - datetime.timedelta(days=d_time)).strftime( "%Y-%m-%dT%H:%M:%S" @@ -193,7 +160,7 @@ async def process_cve_data(): entry_id = unprocessed_vuln[0] raw_record = unprocessed_vuln[1] - processed_vuln = await map_entry(raw_record) + processed_vuln = await _filter_raw_cve_data(raw_record) if processed_vuln is not None: processed_vulns.append(processed_vuln) db.save_processed_vuln( @@ -211,22 +178,27 @@ async def process_cve_data(): return processed_vulns -async def map_entry(vuln): +async def _filter_raw_cve_data(raw_nvd_cve: str): + """Filters out CVEs that affect products listed in project_metadata.json""" # TODO: improve mapping technique async with aiofiles.open("./data/project_metadata.json", "r") as f: match_list = json.loads(await f.read()) - project_names = extract_products(vuln["cve"]["descriptions"][0]["value"]) - # print(project_names) + project_names = extract_products( + raw_nvd_cve["cve"]["descriptions"][0]["value"] + ) + # print(project_names) # Sanity Check + for project_name in project_names: for data in match_list.values(): keywords = [kw.lower() for kw in data["search keywords"]] if project_name.lower() in keywords: version = extract_version_range( - vuln["cve"], vuln["cve"]["descriptions"][0]["value"] + raw_nvd_cve["cve"], + raw_nvd_cve["cve"]["descriptions"][0]["value"], ) filtered_vuln = { - "nvd_info": vuln, + "nvd_info": raw_nvd_cve, "repo_url": data["git"], "version_interval": version, } diff --git a/prospector/pipeline/job_creation.py b/prospector/pipeline/job_creation.py index 07704c1ee..a67213269 100644 --- a/prospector/pipeline/job_creation.py +++ b/prospector/pipeline/job_creation.py @@ -1,6 +1,3 @@ -import json -import sys -import time from datetime import datetime from typing import Optional