Skip to content

Commit

Permalink
Add doc for CPE match strings, small refactors
Browse files Browse the repository at this point in the history
Added docstrings for the new generic and CPE match string
CLI modules.
Non-public attributes are now prefixed with an underscore
and some misleading names have been changed.
  • Loading branch information
timopollmeier committed Nov 19, 2024
1 parent f8ac82c commit 3fc39ae
Show file tree
Hide file tree
Showing 11 changed files with 775 additions and 188 deletions.
34 changes: 32 additions & 2 deletions greenbone/scap/cpe_match/cli/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@


class CpeMatchProcessor(ScapProcessor[CPEMatchString]):
item_type_plural = CPE_MATCH_TYPE_PLURAL
arg_defaults = {
"""
Class that handles a producer object generating CPE match strings
to be processed by a worker object.
"""

_item_type_plural = CPE_MATCH_TYPE_PLURAL
_arg_defaults = {
"chunk_size": CPE_MATCH_DEFAULT_CHUNK_SIZE,
"queue_size": DEFAULT_QUEUE_SIZE,
"verbose": DEFAULT_VERBOSITY,
Expand All @@ -33,6 +38,19 @@ def from_args(
producer: BaseScapProducer,
worker: BaseScapWorker,
) -> "CpeMatchProcessor":
"""
Create a new `CPEMatchNvdApiProducer` with parameters from
the given command line args gathered by an `ArgumentParser`.
Args:
args: Command line arguments to use
console: Console for standard output.
error_console: Console for error output.
producer: The producer generating the CPE match strings.
worker: The worker processing the CPE match strings.
Returns:
The new `CpeMatchProcessor`.
"""
return CpeMatchProcessor(
console,
error_console,
Expand All @@ -54,6 +72,18 @@ def __init__(
chunk_size: int | None = None,
verbose: int | None = None,
):
"""
Constructor for a new CPE match string processor.
Args:
console: Console for standard output.
error_console: Console for error output.
producer: The producer generating the CPE match strings.
worker: The worker processing the CPE match strings.
queue_size: The number of chunks allowed in the queue.
chunk_size: The expected maximum number of CPE match strings per chunk.
verbose: Verbosity level of log messages.
"""
super().__init__(
console,
error_console,
Expand Down
72 changes: 61 additions & 11 deletions greenbone/scap/cpe_match/producer/nvd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from argparse import Namespace

from pontos.nvd import NVDApi, NVDResults
from pontos.nvd import NVDResults
from pontos.nvd.cpe_match import CPEMatchApi
from pontos.nvd.models.cpe_match_string import CPEMatchString
from rich.console import Console
Expand All @@ -14,8 +14,16 @@


class CpeMatchNvdApiProducer(NvdApiProducer[CPEMatchString]):
item_type_plural = CPE_MATCH_TYPE_PLURAL
arg_defaults = NvdApiProducer.arg_defaults
"""
Abstract async context manager class for a producer querying
CPE match strings from an NVD API.
"""

_item_type_plural = CPE_MATCH_TYPE_PLURAL
"Plural form of the type of items to use in log messages"

_arg_defaults = NvdApiProducer._arg_defaults
"Default values for optional arguments."

@classmethod
def from_args(
Expand All @@ -25,6 +33,19 @@ def from_args(
error_console: Console,
progress: Progress,
) -> "CpeMatchNvdApiProducer":
"""
Create a new `CPEMatchNvdApiProducer` with parameters from
the given command line args gathered by an `ArgumentParser`.
Args:
args: Command line arguments to use
console: Console for standard output.
error_console: Console for error output.
progress: Progress bar renderer to be updated by the producer.
Returns:
The new `CPEMatchNvdApiProducer`.
"""
request_filter_opts = {}

since = NvdApiProducer.since_from_args(args, error_console)
Expand Down Expand Up @@ -55,6 +76,20 @@ def __init__(
start_index: int = 0,
verbose: int = None,
):
"""
Constructor for a CPE match string NVD API producer.
Args:
console: Console for standard output.
error_console: Console for error output.
progress: Progress bar renderer to be updated by the producer.
retry_attempts: Number of retries for downloading items
nvd_api_key: API key to use for the requests to allow faster requests
request_results: Total number of results to request from the API
request_filter_opts: Filter options to pass to the API requests
start_index: index/offset of the first item to request
verbose: Verbosity level of log messages.
"""
super().__init__(
console,
error_console,
Expand All @@ -67,24 +102,39 @@ def __init__(
verbose=verbose,
)

def _create_nvd_api(self, nvd_api_key: str) -> NVDApi:
def _create_nvd_api(self, nvd_api_key: str) -> CPEMatchApi:
"""
Callback used by the constructor to create the NVD API object
that can be queried for CPE match strings.
Args:
nvd_api_key: An optional API key to allow faster requests.
Returns: The new `CPEMatchApi` object, which inherits from `NVDApi`.
"""
return CPEMatchApi(
token=nvd_api_key,
)

async def _create_nvd_results(self) -> NVDResults[CPEMatchString]:
"""
Callback used during `fetch_initial_data` getting
the `NVDResults` object the CPE match strings will be fetched from.
Returns: The new `NVDResults` object.
"""
return await self._nvd_api.cpe_matches(
last_modified_start_date=self.request_filter_opts.get(
last_modified_start_date=self._request_filter_opts.get(
"last_modified_start_date"
),
last_modified_end_date=self.request_filter_opts.get(
last_modified_end_date=self._request_filter_opts.get(
"last_modified_start_date"
),
cve_id=self.request_filter_opts.get("cve_id"),
match_string_search=self.request_filter_opts.get(
cve_id=self._request_filter_opts.get("cve_id"),
match_string_search=self._request_filter_opts.get(
"match_string_search"
),
request_results=self.request_results,
start_index=self.start_index,
results_per_page=self.queue.chunk_size,
request_results=self._request_results,
start_index=self._start_index,
results_per_page=self._queue.chunk_size,
)
57 changes: 51 additions & 6 deletions greenbone/scap/cpe_match/worker/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@


class CpeMatchDatabaseWriteWorker(ScapDatabaseWriteWorker[CPEMatchString]):
item_type_plural = CPE_MATCH_TYPE_PLURAL
arg_defaults = ScapDatabaseWriteWorker.arg_defaults
_item_type_plural = CPE_MATCH_TYPE_PLURAL
"Plural form of the type of items to use in log messages."

@classmethod
def get_item_type_plural(cls):
return "CPE Match Strings"
_arg_defaults = ScapDatabaseWriteWorker.arg_defaults
"Default values for optional arguments."

@classmethod
def from_args(
Expand All @@ -31,6 +30,19 @@ def from_args(
error_console: Console,
progress: Progress,
) -> "CpeMatchDatabaseWriteWorker":
"""
Create a new `CpeMatchDatabaseWriteWorker` with parameters from
the given command line args gathered by an `ArgumentParser`.
Args:
args: Command line arguments to use
console: Console for standard output.
error_console: Console for error output.
progress: Progress bar renderer to be updated by the worker.
Returns:
The new `CpeMatchDatabaseWriteWorker`.
"""
return CpeMatchDatabaseWriteWorker(
console,
error_console,
Expand Down Expand Up @@ -60,6 +72,26 @@ def __init__(
echo_sql: bool = False,
verbose: int = DEFAULT_VERBOSITY,
):
"""
Constructor for a CPE match string database write worker.
If the `database_...` arguments are None or not given, corresponding
environment variables will be tried next before finally using the
defaults as a fallback.
Args:
console: Console for standard output.
error_console: Console for error output.
progress: Progress bar renderer to be updated by the producer.
database_name: Name of the database to use.
database_schema: Optional database schema to use.
database_host: IP address or hostname of the database server to use.
database_port: Port of the database server to use.
database_user: Name of the database user to use.
database_password: Password of the database user to use.
echo_sql: Whether to print SQL statements.
verbose: Verbosity level of log messages.
"""
super().__init__(
console,
error_console,
Expand All @@ -74,8 +106,21 @@ def __init__(
verbose=verbose,
)

async def add_chunk(self, chunk: Sequence[CPEMatchString]):
async def _handle_chunk(self, chunk: Sequence[CPEMatchString]):
"""
Handles a chunk of CPE match strings from the queue.
Adds the match strings to the database using the manager.
Args:
chunk: The last chunk fetched from the queue.
"""
await self._manager.add_cpe_match_strings(chunk)

def _create_manager(self) -> AsyncContextManager:
"""
Callback creating a new database manager for handling SCAP items.
Returns: The new database manager.
"""
return CPEMatchStringDatabaseManager(self._database)
63 changes: 55 additions & 8 deletions greenbone/scap/cpe_match/worker/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@


class CpeMatchJsonWriteWorker(ScapJsonWriteWorker[CPEMatchString]):
item_type_plural = CPE_MATCH_TYPE_PLURAL
arg_defaults = ScapJsonWriteWorker.arg_defaults
"""
Async context manager base class for a worker writing
CPE match strings to a single JSON file.
"""

_item_type_plural = CPE_MATCH_TYPE_PLURAL
"Plural form of the type of items to use in log messages."

_arg_defaults = ScapJsonWriteWorker._arg_defaults
"Default values for optional arguments."

@classmethod
def from_args(
Expand All @@ -27,12 +35,25 @@ def from_args(
error_console: Console,
progress: Progress,
) -> "CpeMatchJsonWriteWorker":
"""
Create a new `CpeMatchJsonWriteWorker` with parameters from
the given command line args gathered by an `ArgumentParser`.
Args:
args: Command line arguments to use
console: Console for standard output.
error_console: Console for error output.
progress: Progress bar renderer to be updated by the worker.
Returns:
The new `CpeMatchJsonWriteWorker`.
"""
return CpeMatchJsonWriteWorker(
console,
error_console,
progress,
storage_path=args.storage_path or cls.arg_defaults["storage_path"],
schema_path=args.schema_path or cls.arg_defaults["schema_path"],
storage_path=args.storage_path or cls._arg_defaults["storage_path"],
schema_path=args.schema_path or cls._arg_defaults["schema_path"],
compress=args.compress if not None else False,
verbose=args.verbose or 0,
)
Expand All @@ -48,6 +69,18 @@ def __init__(
compress: bool = False,
verbose: int | None = None,
):
"""
Constructor for a `ScapJsonWriteWorker`.
Args:
console: Console for standard output.
error_console: Console for error output.
progress: Progress bar renderer to be updated by the producer.
storage_path: Path to the directory to write the JSON file into.
schema_path: Optional path to the schema file for JSON validation.
compress: Whether to gzip compress the JSON file.
verbose: Verbosity level of log messages.
"""
super().__init__(
console,
error_console,
Expand All @@ -58,17 +91,31 @@ def __init__(
verbose=verbose,
)

self.json_manager = MatchStringJsonManager(
self._json_manager = MatchStringJsonManager(
error_console,
storage_path,
compress=compress,
schema_path=schema_path,
raise_error_on_validation=False,
)
"Manager object handling saving the CPE match strings to a JSON file"

async def _handle_chunk(self, chunk: Sequence[CPEMatchString]):
"""
Callback handling a chunk of CPE match strings from the queue.
async def add_chunk(self, chunk: Sequence[CPEMatchString]):
self.json_manager.add_match_strings(chunk)
Adds the CPE match strings in the chunk to the document model.
Args:
chunk: The last chunk fetched from the queue.
"""
self._json_manager.add_match_strings(chunk)

async def loop_end(self) -> None:
self.json_manager.write()
"""
Callback handling the exiting the main worker loop.
Makes the JSON manager write the document to the file.
"""
self._json_manager.write()
await super().loop_end()
Loading

0 comments on commit 3fc39ae

Please sign in to comment.