forked from lyft/cartography
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of github.com:chandanchowdhury/cartography
- Loading branch information
Showing
22 changed files
with
1,489 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import logging | ||
|
||
import neo4j | ||
|
||
from cartography.config import Config | ||
from cartography.intel.snipeit import asset | ||
from cartography.intel.snipeit import user | ||
from cartography.stats import get_stats_client | ||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
stat_handler = get_stats_client(__name__) | ||
|
||
|
||
@timeit | ||
def start_snipeit_ingestion(neo4j_session: neo4j.Session, config: Config) -> None: | ||
if config.snipeit_base_uri is None or config.snipeit_token is None or config.snipeit_tenant_id is None: | ||
logger.warning( | ||
"Required parameter(s) missing. Skipping sync.", | ||
) | ||
return | ||
|
||
common_job_parameters = { | ||
"UPDATE_TAG": config.update_tag, | ||
"TENANT_ID": config.snipeit_tenant_id, | ||
} | ||
|
||
# Ingest SnipeIT users and assets | ||
user.sync(neo4j_session, common_job_parameters, config.snipeit_base_uri, config.snipeit_token) | ||
asset.sync(neo4j_session, common_job_parameters, config.snipeit_base_uri, config.snipeit_token) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import logging | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
|
||
import neo4j | ||
|
||
from .util import call_snipeit_api | ||
from cartography.client.core.tx import load | ||
from cartography.graph.job import GraphJob | ||
from cartography.models.snipeit.asset import SnipeitAssetSchema | ||
from cartography.models.snipeit.tenant import SnipeitTenantSchema | ||
from cartography.util import timeit | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@timeit | ||
def get(base_uri: str, token: str) -> List[Dict]: | ||
api_endpoint = "/api/v1/hardware" | ||
results: List[Dict[str, Any]] = [] | ||
while True: | ||
offset = len(results) | ||
api_endpoint = f"{api_endpoint}?order='asc'&offset={offset}" | ||
response = call_snipeit_api(api_endpoint, base_uri, token) | ||
results.extend(response['rows']) | ||
|
||
total = response['total'] | ||
results_count = len(results) | ||
if results_count >= total: | ||
break | ||
|
||
return results | ||
|
||
|
||
@timeit | ||
def load_assets( | ||
neo4j_session: neo4j.Session, | ||
common_job_parameters: Dict, | ||
data: List[Dict[str, Any]], | ||
) -> None: | ||
# Create the SnipeIT Tenant | ||
load( | ||
neo4j_session, | ||
SnipeitTenantSchema(), | ||
[{'id': common_job_parameters["TENANT_ID"]}], | ||
lastupdated=common_job_parameters["UPDATE_TAG"], | ||
) | ||
|
||
load( | ||
neo4j_session, | ||
SnipeitAssetSchema(), | ||
data, | ||
lastupdated=common_job_parameters["UPDATE_TAG"], | ||
TENANT_ID=common_job_parameters["TENANT_ID"], | ||
) | ||
|
||
|
||
@timeit | ||
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None: | ||
GraphJob.from_node_schema(SnipeitAssetSchema(), common_job_parameters).run(neo4j_session) | ||
|
||
|
||
@timeit | ||
def sync( | ||
neo4j_session: neo4j.Session, | ||
common_job_parameters: Dict, | ||
base_uri: str, | ||
token: str, | ||
) -> None: | ||
assets = get(base_uri=base_uri, token=token) | ||
load_assets(neo4j_session=neo4j_session, common_job_parameters=common_job_parameters, data=assets) | ||
cleanup(neo4j_session, common_job_parameters) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
import logging | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
|
||
import neo4j | ||
|
||
from .util import call_snipeit_api | ||
from cartography.client.core.tx import load | ||
from cartography.graph.job import GraphJob | ||
from cartography.models.snipeit.tenant import SnipeitTenantSchema | ||
from cartography.models.snipeit.user import SnipeitUserSchema | ||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@timeit | ||
def get(base_uri: str, token: str) -> List[Dict]: | ||
api_endpoint = "/api/v1/users" | ||
results: List[Dict[str, Any]] = [] | ||
while True: | ||
offset = len(results) | ||
api_endpoint = f"{api_endpoint}?order='asc'&offset={offset}" | ||
response = call_snipeit_api(api_endpoint, base_uri, token) | ||
results.extend(response['rows']) | ||
|
||
total = response['total'] | ||
results_count = len(results) | ||
if results_count >= total: | ||
break | ||
|
||
return results | ||
|
||
|
||
@timeit | ||
def load_users( | ||
neo4j_session: neo4j.Session, | ||
common_job_parameters: Dict, | ||
data: List[Dict[str, Any]], | ||
) -> None: | ||
logger.debug(data[0]) | ||
|
||
# Create the SnipeIT Tenant | ||
load( | ||
neo4j_session, | ||
SnipeitTenantSchema(), | ||
[{'id': common_job_parameters["TENANT_ID"]}], | ||
lastupdated=common_job_parameters["UPDATE_TAG"], | ||
) | ||
|
||
load( | ||
neo4j_session, | ||
SnipeitUserSchema(), | ||
data, | ||
lastupdated=common_job_parameters["UPDATE_TAG"], | ||
TENANT_ID=common_job_parameters["TENANT_ID"], | ||
) | ||
|
||
|
||
@timeit | ||
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None: | ||
GraphJob.from_node_schema(SnipeitUserSchema(), common_job_parameters).run(neo4j_session) | ||
|
||
|
||
@timeit | ||
def sync( | ||
neo4j_session: neo4j.Session, | ||
common_job_parameters: Dict, | ||
base_uri: str, | ||
token: str, | ||
) -> None: | ||
users = get(base_uri=base_uri, token=token) | ||
load_users(neo4j_session, common_job_parameters, users) | ||
cleanup(neo4j_session, common_job_parameters) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import logging | ||
from typing import Any | ||
from typing import Dict | ||
|
||
import requests | ||
|
||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
# Connect and read timeouts of 60 seconds each; see https://requests.readthedocs.io/en/master/user/advanced/#timeouts | ||
_TIMEOUT = (60, 60) | ||
|
||
|
||
@timeit | ||
def call_snipeit_api(api_and_parameters: str, base_uri: str, token: str) -> Dict[str, Any]: | ||
uri = base_uri + api_and_parameters | ||
try: | ||
logger.debug( | ||
"SnipeIT: Get %s", uri, | ||
) | ||
response = requests.get( | ||
uri, | ||
headers={ | ||
'Accept': 'application/json', | ||
'Authorization': f'Bearer {token}', | ||
}, | ||
timeout=_TIMEOUT, | ||
) | ||
except requests.exceptions.Timeout: | ||
# Add context and re-raise for callers to handle | ||
logger.warning(f"SnipeIT: requests.get('{uri}') timed out.") | ||
raise | ||
# if call failed, use requests library to raise an exception | ||
response.raise_for_status() | ||
return response.json() |
Empty file.
Oops, something went wrong.