-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial version of Qualys -> Tenable One Connector
- Loading branch information
1 parent
e79fb96
commit e3b8c15
Showing
28 changed files
with
3,032 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Qualys -> T1 Ingest Connector | ||
|
||
This connector code will download the asset, finding, and knowledgebase metadata from | ||
a Qualys instance, convert the data from it's native XML format into JSON, then | ||
transform that data into the T1 spec. | ||
|
||
All job management is handled by the pyTenable sync JobManager (currently within the | ||
`feature/sync` branch). As the sync JobManager isn't yet mainlined into the pyTenable | ||
repository, there are a few extra steps involved to setup the connector command-line | ||
(below). | ||
|
||
In terms of performance testing, we have observed nominal usage of memory (not much more | ||
than 120MiB on an M1 Mac) even while streaming the Knowldgebase XML into the cache database | ||
for later mergins with the finding data. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
#!/usr/bin/env python3 | ||
import logging | ||
from enum import Enum | ||
from pathlib import Path | ||
|
||
from rich.logging import RichHandler | ||
from tenable.io import TenableIO | ||
from typer import Option, Typer | ||
from typing_extensions import Annotated | ||
|
||
from qualys.api import QualysAPI | ||
from qualys.transform import Transformer | ||
|
||
app = Typer() | ||
|
||
|
||
class LogLevels(str, Enum): | ||
debug = 'DEBUG' | ||
info = 'INFO' | ||
warn = 'WARN' | ||
error = 'ERROR' | ||
|
||
|
||
def setup_logging(log_level: LogLevels) -> None: | ||
""" | ||
Setup logging for qualys integration | ||
""" | ||
fileHandler = logging.FileHandler('qualys2tone.log') | ||
fileHandler.setFormatter( | ||
logging.Formatter( | ||
fmt='%(asctime)s %(levelname)-5.5s %(message)s', datefmt='[%X]' | ||
) | ||
) | ||
logging.basicConfig( | ||
level=log_level.value, | ||
format='%(message)s', | ||
datefmt='[%X]', | ||
handlers=[RichHandler(rich_tracebacks=True), fileHandler], | ||
) | ||
|
||
|
||
@app.command() | ||
def run( | ||
tio_access_key: Annotated[ | ||
str, Option(envvar='TIO_ACCESS_KEY', prompt=True, help='TVM/TIO API Access Key') | ||
], | ||
tio_secret_key: Annotated[ | ||
str, Option(envvar='TIO_SECRET_KEY', prompt=True, help='TVM/TIO API Secret Key') | ||
], | ||
qualys_url: Annotated[ | ||
str, Option(envvar='QUALYS_URL', prompt=True, help='Qualys API URL') | ||
], | ||
qualys_username: Annotated[ | ||
str, Option(envvar='QUALYS_USERNAME', prompt=True, help='Qualys API Username') | ||
], | ||
qualys_password: Annotated[ | ||
str, | ||
Option( | ||
envvar='QUALYS_PASSWORD', | ||
prompt=True, | ||
hide_input=True, | ||
help='Qualys API Password', | ||
), | ||
], | ||
log_level: Annotated[ | ||
LogLevels, Option(envvar='LOG_LEVEL', help='Output logging level') | ||
] = 'INFO', | ||
cache_file: Annotated[Path, Option(help='Local cache file')] = Path('cache.db'), | ||
flush_cache: Annotated[bool, Option(help='Flush any cache that exists?')] = True, | ||
download_kbs: Annotated[ | ||
bool, Option(help='Download the Qualys knowledgebase?') | ||
] = True, | ||
download_vulns: Annotated[ | ||
bool, Option(help='Download the Qualys Findings?') | ||
] = True, | ||
) -> None: | ||
""" | ||
Run the Qualys integration | ||
""" | ||
setup_logging(log_level) | ||
|
||
if flush_cache: | ||
if cache_file.exists(): | ||
logging.info('Removing old cache file.') | ||
cache_file.unlink() | ||
|
||
tvm = TenableIO(access_key=tio_access_key, secret_key=tio_secret_key) | ||
qualys = QualysAPI( | ||
url=qualys_url, username=qualys_username, password=qualys_password | ||
) | ||
q2t1 = Transformer(tvm=tvm, qualys=qualys, db_uri=f'sqlite:///{cache_file}') | ||
q2t1.run(get_kbs=download_kbs, get_findings=download_vulns) | ||
|
||
|
||
if __name__ == '__main__': | ||
app() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
#!/usr/bin/env python | ||
import logging | ||
from typing import Annotated | ||
|
||
from pydantic import AnyHttpUrl, Field, SecretStr | ||
from qualys.transform import Transformer | ||
from tenint import Connector, Credential, Settings, TenableVMCredential | ||
|
||
|
||
class QualysCredential(Credential): | ||
""" | ||
Qualys Credentials | ||
""" | ||
|
||
prefix: str = "qualys" | ||
name: str = "Qualys VM" | ||
slug: str = "qualys" | ||
description: str = "Qualys QVM Credential" | ||
username: str | ||
password: SecretStr | ||
url: AnyHttpUrl | ||
|
||
|
||
class AppSettings(Settings): | ||
""" | ||
Qualys2TOne Connector Settings | ||
""" | ||
|
||
debug: Annotated[bool, Field(title="Debug")] = False | ||
import_findings: Annotated[bool, Field(title="Import Findings")] = True | ||
|
||
|
||
connector = Connector( | ||
settings=AppSettings, credentials=[QualysCredential, TenableVMCredential] | ||
) | ||
|
||
|
||
@connector.job | ||
def main(config: AppSettings): | ||
""" | ||
Qualys to Tenable One Connector | ||
""" | ||
if config.debug: | ||
logging.getLogger().setLevel("DEBUG") | ||
transformer = Transformer() | ||
transformer.run(get_findings=config.import_findings) | ||
|
||
|
||
if __name__ == "__main__": | ||
connector.app() |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
[project] | ||
name = "qualys2tone" | ||
version = "0.9.1" | ||
description = "Qualys VM to Tenable One" | ||
readme = "README.md" | ||
authors = [ | ||
{name = "Tenable, Inc", email = "[email protected]"}, | ||
] | ||
requires-python = ">=3.12" | ||
dependencies = [ | ||
"arrow>=1.3.0", | ||
"lxml>=5.3.0", | ||
"pydantic-xml>=2.13.1", | ||
"pydantic>=2.9.2", | ||
"restfly>=1.5.0", | ||
"rich>=13.9.2", | ||
"sqlalchemy>=2.0.36", | ||
"typer>=0.12.5", | ||
"pytenable>=1.5.0", | ||
"pydantic-extra-types>=2.10.0", | ||
"tenint>=0.1.0", | ||
"defusedxml>=0.7.1", | ||
] | ||
|
||
[project.optional-dependencies] | ||
testing = [ | ||
"pytest-cov>=6.0.0", | ||
"responses>=0.25.3", | ||
] | ||
|
||
[project.urls] | ||
logo = "https://raw.githubusercontent.com/tenable/tenable-connectors/refs/heads/main/connectors/qualys2tone/logo.svg" | ||
support = "https://community.tenable.com" | ||
|
||
[tool.tenint.connector] | ||
title = "Qualys to Tenable One" | ||
tags= ["qualys", "tvm"] | ||
timeout = 3600 | ||
|
||
[tool.uv.sources] | ||
pytenable = {url = "https://github.com/tenable/pyTenable/archive/refs/heads/feature/sync.zip"} | ||
|
||
[tool.uv] | ||
dev-dependencies = [ | ||
"ptpython>=3.0.29", | ||
"pyright>=1.1.385", | ||
"pytest-cov>=5.0.0", | ||
"pytest>=8.3.3", | ||
"responses>=0.25.3", | ||
"ruff>=0.7.0", | ||
"isort>=5.13.2", | ||
] | ||
|
||
[tool.ruff] | ||
line-length = 88 | ||
indent-width = 4 | ||
exclude = [ | ||
".nova", | ||
".github", | ||
".git", | ||
".pytest_cache", | ||
"__pycache__" | ||
] | ||
|
||
[tool.ruff.lint] | ||
select = ["E4", "E7", "E9", "F", "B"] | ||
fixable = [ "ALL" ] | ||
unfixable = [ "B" ] | ||
|
||
[tool.ruff.format] | ||
quote-style = "single" | ||
indent-style = "space" | ||
line-ending = "lf" | ||
docstring-code-format = false | ||
docstring-code-line-length = "dynamic" | ||
|
||
[tool.ruff.lint.per-file-ignores] | ||
"__init__.py" = ["E402", "F401"] | ||
"**/{tests,docs,tools}/*" = ["E402"] | ||
|
||
[tool.pytest.ini_options] | ||
pythonpath = ["."] | ||
testpaths = ["tests"] | ||
|
||
[tool.bandit] | ||
exclude_dirs = ["tests", ".venv"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
#from .api.session import QualysAPI |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .session import QualysAPI # noqa F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
""" | ||
Asset Handling API Module for Qualys | ||
""" | ||
|
||
from typing import Optional | ||
|
||
import arrow | ||
from arrow.arrow import Arrow | ||
from restfly.endpoint import APIEndpoint | ||
|
||
from .models.asset import Host | ||
from .streaming import xml_handler | ||
|
||
|
||
class AssetsAPI(APIEndpoint): | ||
_path = 'asset/host/' | ||
|
||
def _list( | ||
self, | ||
compliance_enabled: bool, | ||
page_size: Optional[int] = 10000, | ||
since: Optional[Arrow | str] = None, | ||
**kwargs, | ||
) -> xml_handler: | ||
""" | ||
Get all hosts last seen with vuln findings | ||
Args: | ||
compliance_enabled (bool): if we should get compliance data or vuln data | ||
page_size (int, optional): the page size we want to download | ||
since (Arrow|str, optional): | ||
An arrow object or time string to pull data since. | ||
If None we pull api default. | ||
Returns: | ||
QualysIterator | ||
Docs: | ||
page 535 | ||
""" | ||
|
||
params = { | ||
'action': 'list', | ||
#'os_hostname': 1, | ||
'show_asset_id': 1, | ||
#'detail': 'All', #option: {Basic|Basic/AGs|All|All/AGs | None} | ||
'show_ars': 1, | ||
'show_tags': 1, | ||
'show_ars_factors': 1, | ||
'show_trurisk': 1, | ||
'show_trurisk_factors': 1, | ||
#'host_metadata': 'all', # List cloud and non-cloud | ||
'show_cloud_tags': 1, | ||
'truncation_limit': page_size, | ||
'compliance_enabled': 1 if compliance_enabled else None, | ||
} | ||
|
||
if since is not None: | ||
# TODO check if we can move to detection_ since from vulns api | ||
# format YYYY-MM-DD[THH:MM:SSZ] | ||
if compliance_enabled: | ||
# Set compliance data since param | ||
params['compliance_scan_since'] = arrow.get(since).isoformat() | ||
else: | ||
# set vuln data since param | ||
params['vm_scan_since'] = arrow.get(since).isoformat() | ||
|
||
return xml_handler(self._api, self._path, params, Host, 'HOST') | ||
|
||
def vuln(self, since: Optional[Arrow | str] = None, **kwargs) -> xml_handler: | ||
""" | ||
Get all hosts last seen with vuln findings | ||
Args: | ||
since optional(Arrow:str): | ||
An arrow object or time string to pull data since. If None we pull | ||
api default. | ||
Returns: | ||
QualysIterator | ||
""" | ||
return self._list(compliance_enabled=False, since=since, **kwargs) | ||
|
||
def compliance(self, since: Optional[Arrow | str] = None, **kwargs) -> xml_handler: | ||
""" | ||
Get all hosts last seen with compliance findings | ||
Args: | ||
since optional(Arrow:str): | ||
An arrow object or time string to pull data | ||
since. If None we pull api default. | ||
Returns: | ||
QualysIterator | ||
""" | ||
raise NotImplementedError('This feature has not been implemented yet.') | ||
# return self._list(compliance_enabled=True,since=since, **kwargs) |
Oops, something went wrong.