diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index 52dce3a8b7599f..25781cd2f1dcc9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -2,10 +2,11 @@ import logging import ssl import time +from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from enum import Enum -from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union +from typing import Callable, Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin import requests @@ -196,6 +197,75 @@ def validator_site_url(cls, site_url: str) -> str: return site_url +class BidirectionalComponentGraph: + def __init__(self): + self._outgoing: Dict[str, Set[str]] = defaultdict(set) + self._incoming: Dict[str, Set[str]] = defaultdict(set) + # this will not count duplicates/removal of non-existing connections correctly - it is only there for a quick check + self._connections_cnt = 0 + + def add_connection(self, from_component: str, to_component: str) -> None: + # this is sanity check + outgoing_duplicated = to_component in self._outgoing[from_component] + incoming_duplicated = from_component in self._incoming[to_component] + + self._outgoing[from_component].add(to_component) + self._incoming[to_component].add(from_component) + self._connections_cnt += 1 + + if outgoing_duplicated or incoming_duplicated: + logger.warning( + f"Somehow we attempted to add a connection between 2 components which already existed! Duplicated incoming: {incoming_duplicated}, duplicated outgoing: {outgoing_duplicated}. Connection from component: {from_component} to component: {to_component}" + ) + + def remove_connection(self, from_component: str, to_component: str) -> None: + self._outgoing[from_component].discard(to_component) + self._incoming[to_component].discard(from_component) + self._connections_cnt -= 1 + + def get_outgoing_connections(self, component: str) -> Set[str]: + return self._outgoing[component] + + def get_incoming_connections(self, component: str) -> Set[str]: + return self._incoming[component] + + def delete_component(self, component: str) -> None: + logger.debug(f"Deleting component with id: {component}") + incoming = self._incoming[component] + logger.debug( + f"Recognized {len(incoming)} incoming connections to the component" + ) + outgoing = self._outgoing[component] + logger.debug( + f"Recognized {len(outgoing)} outgoing connections from the component" + ) + + for i in incoming: + for o in outgoing: + self.add_connection(i, o) + + for i in incoming: + self._outgoing[i].remove(component) + for o in outgoing: + self._incoming[o].remove(component) + + added_connections_cnt = len(incoming) * len(outgoing) + deleted_connections_cnt = len(incoming) + len(outgoing) + logger.debug( + f"Deleted {deleted_connections_cnt} connections and added {added_connections_cnt}" + ) + + del self._outgoing[component] + del self._incoming[component] + + # for performance reasons we are not using `remove_connection` function when deleting an entire component, + # therefor we need to adjust the estimated count + self._connections_cnt -= deleted_connections_cnt + + def __len__(self): + return self._connections_cnt + + TOKEN_ENDPOINT = "access/token" KERBEROS_TOKEN_ENDPOINT = "access/kerberos" ABOUT_ENDPOINT = "flow/about" @@ -360,7 +430,9 @@ class NifiFlow: root_process_group: NifiProcessGroup components: Dict[str, NifiComponent] = field(default_factory=dict) remotely_accessible_ports: Dict[str, NifiComponent] = field(default_factory=dict) - connections: List[Tuple[str, str]] = field(default_factory=list) + connections: BidirectionalComponentGraph = field( + default_factory=BidirectionalComponentGraph + ) processGroups: Dict[str, NifiProcessGroup] = field(default_factory=dict) remoteProcessGroups: Dict[str, NifiRemoteProcessGroup] = field(default_factory=dict) remote_ports: Dict[str, NifiComponent] = field(default_factory=dict) @@ -416,10 +488,15 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": def get_report(self) -> SourceReport: return self.report - def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 + def update_flow( + self, pg_flow_dto: Dict, recursion_level: int = 0 + ) -> None: # noqa: C901 """ Update self.nifi_flow with contents of the input process group `pg_flow_dto` """ + logger.debug( + f"Updating flow with pg_flow_dto {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}, recursion level: {recursion_level}" + ) breadcrumb_dto = pg_flow_dto.get("breadcrumb", {}).get("breadcrumb", {}) nifi_pg = NifiProcessGroup( breadcrumb_dto.get("id"), @@ -433,6 +510,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 flow_dto = pg_flow_dto.get("flow", {}) + logger.debug(f"Processing {len(flow_dto.get('processors', []))} processors") for processor in flow_dto.get("processors", []): component = processor.get("component") self.nifi_flow.components[component.get("id")] = NifiComponent( @@ -445,6 +523,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 comments=component.get("config", {}).get("comments"), status=component.get("status", {}).get("runStatus"), ) + logger.debug(f"Processing {len(flow_dto.get('funnels', []))} funnels") for funnel in flow_dto.get("funnels", []): component = funnel.get("component") self.nifi_flow.components[component.get("id")] = NifiComponent( @@ -458,13 +537,15 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding funnel {component.get('id')}") + logger.debug(f"Processing {len(flow_dto.get('connections', []))} connections") for connection in flow_dto.get("connections", []): # Exclude self - recursive relationships if connection.get("sourceId") != connection.get("destinationId"): - self.nifi_flow.connections.append( - (connection.get("sourceId"), connection.get("destinationId")) + self.nifi_flow.connections.add_connection( + connection.get("sourceId"), connection.get("destinationId") ) + logger.debug(f"Processing {len(flow_dto.get('inputPorts', []))} inputPorts") for inputPort in flow_dto.get("inputPorts", []): component = inputPort.get("component") if inputPort.get("allowRemoteAccess"): @@ -492,6 +573,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding port {component.get('id')}") + logger.debug(f"Processing {len(flow_dto.get('outputPorts', []))} outputPorts") for outputPort in flow_dto.get("outputPorts", []): component = outputPort.get("component") if outputPort.get("allowRemoteAccess"): @@ -519,6 +601,9 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding report port {component.get('id')}") + logger.debug( + f"Processing {len(flow_dto.get('remoteProcessGroups', []))} remoteProcessGroups" + ) for rpg in flow_dto.get("remoteProcessGroups", []): rpg_component = rpg.get("component", {}) remote_ports = {} @@ -564,7 +649,13 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 self.nifi_flow.components.update(remote_ports) self.nifi_flow.remoteProcessGroups[nifi_rpg.id] = nifi_rpg + logger.debug( + f"Processing {len(flow_dto.get('processGroups', []))} processGroups" + ) for pg in flow_dto.get("processGroups", []): + logger.debug( + f"Retrieving process group: {pg.get('id')} while updating flow for {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}" + ) pg_response = self.session.get( url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + pg.get("id") ) @@ -578,11 +669,24 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 pg_flow_dto = pg_response.json().get("processGroupFlow", {}) - self.update_flow(pg_flow_dto) + self.update_flow(pg_flow_dto, recursion_level=recursion_level + 1) def update_flow_keep_only_ingress_egress(self): components_to_del: List[NifiComponent] = [] - for component in self.nifi_flow.components.values(): + components = self.nifi_flow.components.values() + logger.debug( + f"Processing {len(components)} components for keep only ingress/egress" + ) + logger.debug( + f"All the connections recognized: {len(self.nifi_flow.connections)}" + ) + for index, component in enumerate(components, start=1): + logger.debug( + f"Processing {index}th component for ingress/egress pruning. Component id: {component.id}, name: {component.name}, type: {component.type}" + ) + logger.debug( + f"Current amount of connections: {len(self.nifi_flow.connections)}" + ) if ( component.nifi_type is NifiType.PROCESSOR and component.type @@ -592,47 +696,28 @@ def update_flow_keep_only_ingress_egress(self): NifiType.REMOTE_INPUT_PORT, NifiType.REMOTE_OUTPUT_PORT, ]: + self.nifi_flow.connections.delete_component(component.id) components_to_del.append(component) - incoming = list( - filter(lambda x: x[1] == component.id, self.nifi_flow.connections) - ) - outgoing = list( - filter(lambda x: x[0] == component.id, self.nifi_flow.connections) - ) - # Create new connections from incoming to outgoing - for i in incoming: - for j in outgoing: - self.nifi_flow.connections.append((i[0], j[1])) - - # Remove older connections, as we already created - # new connections bypassing component to be deleted - - for i in incoming: - self.nifi_flow.connections.remove(i) - for j in outgoing: - self.nifi_flow.connections.remove(j) - - for c in components_to_del: - if c.nifi_type is NifiType.PROCESSOR and ( - c.name.startswith("Get") - or c.name.startswith("List") - or c.name.startswith("Fetch") - or c.name.startswith("Put") + + for component in components_to_del: + if component.nifi_type is NifiType.PROCESSOR and component.name.startswith( + ("Get", "List", "Fetch", "Put") ): self.report.warning( - f"Dropping NiFi Processor of type {c.type}, id {c.id}, name {c.name} from lineage view. \ + f"Dropping NiFi Processor of type {component.type}, id {component.id}, name {component.name} from lineage view. \ This is likely an Ingress or Egress node which may be reading to/writing from external datasets \ However not currently supported in datahub", self.config.site_url, ) else: logger.debug( - f"Dropping NiFi Component of type {c.type}, id {c.id}, name {c.name} from lineage view." + f"Dropping NiFi Component of type {component.type}, id {component.id}, name {component.name} from lineage view." ) - del self.nifi_flow.components[c.id] + del self.nifi_flow.components[component.id] def create_nifi_flow(self): + logger.debug(f"Retrieving NIFI info from {ABOUT_ENDPOINT}") about_response = self.session.get( url=urljoin(self.rest_api_base_url, ABOUT_ENDPOINT) ) @@ -646,6 +731,8 @@ def create_nifi_flow(self): ) else: logger.warning("Failed to fetch version for nifi") + logger.debug(f"Retrieved nifi version: {nifi_version}") + logger.debug(f"Retrieving cluster info from {CLUSTER_ENDPOINT}") cluster_response = self.session.get( url=urljoin(self.rest_api_base_url, CLUSTER_ENDPOINT) ) @@ -654,8 +741,10 @@ def create_nifi_flow(self): clustered = ( cluster_response.json().get("clusterSummary", {}).get("clustered") ) + logger.debug(f"Retrieved cluster summary: {clustered}") else: logger.warning("Failed to fetch cluster summary for flow") + logger.debug("Retrieving ROOT Process Group") pg_response = self.session.get( url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + "root" ) @@ -695,7 +784,7 @@ def fetch_provenance_events( if provenance_response.ok: provenance = provenance_response.json().get("provenance", {}) provenance_uri = provenance.get("uri") - + logger.debug(f"Retrieving provenance uri: {provenance_uri}") provenance_response = self.session.get(provenance_uri) if provenance_response.ok: provenance = provenance_response.json().get("provenance", {}) @@ -734,7 +823,9 @@ def fetch_provenance_events( total = provenance.get("results", {}).get("total") totalCount = provenance.get("results", {}).get("totalCount") + logger.debug(f"Retrieved {totalCount} of {total}") if total != str(totalCount): + logger.debug("Trying to retrieve more events for the same processor") yield from self.fetch_provenance_events( processor, eventType, startDate, oldest_event_time ) @@ -800,6 +891,7 @@ def submit_provenance_query(self, processor, eventType, startDate, endDate): return provenance_response def delete_provenance(self, provenance_uri): + logger.debug(f"Deleting provenance with uri: {provenance_uri}") delete_response = self.session.delete(provenance_uri) if not delete_response.ok: logger.error("failed to delete provenance ", provenance_uri) @@ -821,12 +913,8 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 job_name = component.name job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id) - incoming = list( - filter(lambda x: x[1] == component.id, self.nifi_flow.connections) - ) - outgoing = list( - filter(lambda x: x[0] == component.id, self.nifi_flow.connections) - ) + incoming = self.nifi_flow.connections.get_incoming_connections(component.id) + outgoing = self.nifi_flow.connections.get_outgoing_connections(component.id) inputJobs = set() jobProperties = None @@ -864,8 +952,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 datasetProperties=dataset.dataset_properties, ) - for edge in incoming: - incoming_from = edge[0] + for incoming_from in incoming: if incoming_from in self.nifi_flow.remotely_accessible_ports.keys(): dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[incoming_from].name}" dataset_urn = builder.make_dataset_urn( @@ -882,8 +969,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 builder.make_data_job_urn_with_flow(flow_urn, incoming_from) ) - for edge in outgoing: - outgoing_to = edge[1] + for outgoing_to in outgoing: if outgoing_to in self.nifi_flow.remotely_accessible_ports.keys(): dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[outgoing_to].name}" dataset_urn = builder.make_dataset_urn( @@ -977,14 +1063,19 @@ def make_flow_urn(self) -> str: ) def process_provenance_events(self): + logger.debug("Starting processing of provenance events") startDate = datetime.now(timezone.utc) - timedelta( days=self.config.provenance_days ) eventAnalyzer = NifiProcessorProvenanceEventAnalyzer() eventAnalyzer.env = self.config.env - - for component in self.nifi_flow.components.values(): + components = self.nifi_flow.components.values() + logger.debug(f"Processing {len(components)} components") + for component in components: + logger.debug( + f"Processing provenance events for component id: {component.id} name: {component.name}" + ) if component.nifi_type is NifiType.PROCESSOR: eventType = eventAnalyzer.KNOWN_INGRESS_EGRESS_PROCESORS[component.type] events = self.fetch_provenance_events(component, eventType, startDate) diff --git a/metadata-ingestion/tests/unit/test_nifi_source.py b/metadata-ingestion/tests/unit/test_nifi_source.py index 9e8bf64261ffaf..30a0855d44f341 100644 --- a/metadata-ingestion/tests/unit/test_nifi_source.py +++ b/metadata-ingestion/tests/unit/test_nifi_source.py @@ -6,6 +6,7 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.nifi import ( + BidirectionalComponentGraph, NifiComponent, NifiFlow, NifiProcessGroup, @@ -55,7 +56,7 @@ def test_nifi_s3_provenance_event(): ) }, remotely_accessible_ports={}, - connections=[], + connections=BidirectionalComponentGraph(), processGroups={ "803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup( id="803ebb92-017d-1000-2961-4bdaa27a3ba0", @@ -126,7 +127,7 @@ def test_nifi_s3_provenance_event(): ) }, remotely_accessible_ports={}, - connections=[], + connections=BidirectionalComponentGraph(), processGroups={ "803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup( id="803ebb92-017d-1000-2961-4bdaa27a3ba0",