Skip to content

Commit

Permalink
Change: Encode CPE match strings directly to file
Browse files Browse the repository at this point in the history
The JSON downloader for CPE match strings now writes the encoded
chunks of the JSON dict directly to the output file and only
buffer it in memory if validation is requested.

This reduces the amount of memory used for creating the output file
after the data has been downloaded from the API.
  • Loading branch information
timopollmeier committed Jan 10, 2025
1 parent e2833c5 commit 1df2a47
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions greenbone/scap/cpe_match/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from array import array
from dataclasses import asdict, dataclass
from datetime import datetime
from io import TextIOWrapper
from pathlib import Path
from typing import Sequence
from typing import Sequence, Any, Optional

from pontos.nvd.models.cpe_match_string import CPEMatchString
from rich.console import Console
Expand Down Expand Up @@ -96,6 +97,22 @@ def add_match_strings(
for match_string in match_strings:
self.add_match_string(match_string)

def _encode_json(
self,
data: dict[str, Any],
out_file: TextIOWrapper,
validation_buffer: Optional[bytearray] = None,
*,
indent: int = 1,
):
encoder = JsonEncoder(indent = indent)

for chunk in encoder.iterencode(data):
out_file.write(chunk)
if validation_buffer is not None:
validation_buffer.extend(chunk.encode("utf-8"))


def write(self, file_name: str = "nvd_cpe_matches") -> None:
"""
Write the CPE data to JSON files with optional compression in the specified folder.
Expand All @@ -108,20 +125,21 @@ def write(self, file_name: str = "nvd_cpe_matches") -> None:
self._match_string_response.match_strings
)

encoder = JsonEncoder(indent=1)
char_array = array("b")
validation_buffer: Optional[bytearray] = None
if self.validate:
validation_buffer = bytearray()

response_dict = asdict(self._match_string_response)
convert_keys_to_camel(response_dict)
for chunk in encoder.iterencode(response_dict):
char_array.frombytes(chunk.encode("utf-8"))
json_data = char_array.tobytes()

self._validate_json(file_name, json_data)

if self._compress:
path = self._storage_path / f"{file_name}.json.gz"
path.write_bytes(gzip.compress(json_data))
with gzip.open(path, "wt", encoding="utf-8") as out_file:
self._encode_json(response_dict, out_file, validation_buffer)
else:
path = self._storage_path / f"{file_name}.json"
path.write_bytes(json_data)
with open(path, "wt", encoding="utf-8") as out_file:
self._encode_json(response_dict, out_file, validation_buffer)

if self.validate:
self._validate_json(file_name, validation_buffer)

0 comments on commit 1df2a47

Please sign in to comment.