Skip to content

Commit

Permalink
fix(ingestion/nifi): Improve nifi lineage extraction performance (#11490
Browse files Browse the repository at this point in the history
)
  • Loading branch information
skrydal authored Oct 1, 2024
1 parent 660fbf8 commit e1514d5
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 50 deletions.
187 changes: 139 additions & 48 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import logging
import ssl
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union
from typing import Callable, Dict, Iterable, List, Optional, Set, Union
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -196,6 +197,75 @@ def validator_site_url(cls, site_url: str) -> str:
return site_url


class BidirectionalComponentGraph:
def __init__(self):
self._outgoing: Dict[str, Set[str]] = defaultdict(set)
self._incoming: Dict[str, Set[str]] = defaultdict(set)
# this will not count duplicates/removal of non-existing connections correctly - it is only there for a quick check
self._connections_cnt = 0

def add_connection(self, from_component: str, to_component: str) -> None:
# this is sanity check
outgoing_duplicated = to_component in self._outgoing[from_component]
incoming_duplicated = from_component in self._incoming[to_component]

self._outgoing[from_component].add(to_component)
self._incoming[to_component].add(from_component)
self._connections_cnt += 1

if outgoing_duplicated or incoming_duplicated:
logger.warning(
f"Somehow we attempted to add a connection between 2 components which already existed! Duplicated incoming: {incoming_duplicated}, duplicated outgoing: {outgoing_duplicated}. Connection from component: {from_component} to component: {to_component}"
)

def remove_connection(self, from_component: str, to_component: str) -> None:
self._outgoing[from_component].discard(to_component)
self._incoming[to_component].discard(from_component)
self._connections_cnt -= 1

def get_outgoing_connections(self, component: str) -> Set[str]:
return self._outgoing[component]

def get_incoming_connections(self, component: str) -> Set[str]:
return self._incoming[component]

def delete_component(self, component: str) -> None:
logger.debug(f"Deleting component with id: {component}")
incoming = self._incoming[component]
logger.debug(
f"Recognized {len(incoming)} incoming connections to the component"
)
outgoing = self._outgoing[component]
logger.debug(
f"Recognized {len(outgoing)} outgoing connections from the component"
)

for i in incoming:
for o in outgoing:
self.add_connection(i, o)

for i in incoming:
self._outgoing[i].remove(component)
for o in outgoing:
self._incoming[o].remove(component)

added_connections_cnt = len(incoming) * len(outgoing)
deleted_connections_cnt = len(incoming) + len(outgoing)
logger.debug(
f"Deleted {deleted_connections_cnt} connections and added {added_connections_cnt}"
)

del self._outgoing[component]
del self._incoming[component]

# for performance reasons we are not using `remove_connection` function when deleting an entire component,
# therefor we need to adjust the estimated count
self._connections_cnt -= deleted_connections_cnt

def __len__(self):
return self._connections_cnt


TOKEN_ENDPOINT = "access/token"
KERBEROS_TOKEN_ENDPOINT = "access/kerberos"
ABOUT_ENDPOINT = "flow/about"
Expand Down Expand Up @@ -360,7 +430,9 @@ class NifiFlow:
root_process_group: NifiProcessGroup
components: Dict[str, NifiComponent] = field(default_factory=dict)
remotely_accessible_ports: Dict[str, NifiComponent] = field(default_factory=dict)
connections: List[Tuple[str, str]] = field(default_factory=list)
connections: BidirectionalComponentGraph = field(
default_factory=BidirectionalComponentGraph
)
processGroups: Dict[str, NifiProcessGroup] = field(default_factory=dict)
remoteProcessGroups: Dict[str, NifiRemoteProcessGroup] = field(default_factory=dict)
remote_ports: Dict[str, NifiComponent] = field(default_factory=dict)
Expand Down Expand Up @@ -416,10 +488,15 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
def get_report(self) -> SourceReport:
return self.report

def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901
def update_flow(
self, pg_flow_dto: Dict, recursion_level: int = 0
) -> None: # noqa: C901
"""
Update self.nifi_flow with contents of the input process group `pg_flow_dto`
"""
logger.debug(
f"Updating flow with pg_flow_dto {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}, recursion level: {recursion_level}"
)
breadcrumb_dto = pg_flow_dto.get("breadcrumb", {}).get("breadcrumb", {})
nifi_pg = NifiProcessGroup(
breadcrumb_dto.get("id"),
Expand All @@ -433,6 +510,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901

flow_dto = pg_flow_dto.get("flow", {})

logger.debug(f"Processing {len(flow_dto.get('processors', []))} processors")
for processor in flow_dto.get("processors", []):
component = processor.get("component")
self.nifi_flow.components[component.get("id")] = NifiComponent(
Expand All @@ -445,6 +523,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901
comments=component.get("config", {}).get("comments"),
status=component.get("status", {}).get("runStatus"),
)
logger.debug(f"Processing {len(flow_dto.get('funnels', []))} funnels")
for funnel in flow_dto.get("funnels", []):
component = funnel.get("component")
self.nifi_flow.components[component.get("id")] = NifiComponent(
Expand All @@ -458,13 +537,15 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901
)
logger.debug(f"Adding funnel {component.get('id')}")

logger.debug(f"Processing {len(flow_dto.get('connections', []))} connections")
for connection in flow_dto.get("connections", []):
# Exclude self - recursive relationships
if connection.get("sourceId") != connection.get("destinationId"):
self.nifi_flow.connections.append(
(connection.get("sourceId"), connection.get("destinationId"))
self.nifi_flow.connections.add_connection(
connection.get("sourceId"), connection.get("destinationId")
)

logger.debug(f"Processing {len(flow_dto.get('inputPorts', []))} inputPorts")
for inputPort in flow_dto.get("inputPorts", []):
component = inputPort.get("component")
if inputPort.get("allowRemoteAccess"):
Expand Down Expand Up @@ -492,6 +573,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901
)
logger.debug(f"Adding port {component.get('id')}")

logger.debug(f"Processing {len(flow_dto.get('outputPorts', []))} outputPorts")
for outputPort in flow_dto.get("outputPorts", []):
component = outputPort.get("component")
if outputPort.get("allowRemoteAccess"):
Expand Down Expand Up @@ -519,6 +601,9 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901
)
logger.debug(f"Adding report port {component.get('id')}")

logger.debug(
f"Processing {len(flow_dto.get('remoteProcessGroups', []))} remoteProcessGroups"
)
for rpg in flow_dto.get("remoteProcessGroups", []):
rpg_component = rpg.get("component", {})
remote_ports = {}
Expand Down Expand Up @@ -564,7 +649,13 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901
self.nifi_flow.components.update(remote_ports)
self.nifi_flow.remoteProcessGroups[nifi_rpg.id] = nifi_rpg

logger.debug(
f"Processing {len(flow_dto.get('processGroups', []))} processGroups"
)
for pg in flow_dto.get("processGroups", []):
logger.debug(
f"Retrieving process group: {pg.get('id')} while updating flow for {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}"
)
pg_response = self.session.get(
url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + pg.get("id")
)
Expand All @@ -578,11 +669,24 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901

pg_flow_dto = pg_response.json().get("processGroupFlow", {})

self.update_flow(pg_flow_dto)
self.update_flow(pg_flow_dto, recursion_level=recursion_level + 1)

def update_flow_keep_only_ingress_egress(self):
components_to_del: List[NifiComponent] = []
for component in self.nifi_flow.components.values():
components = self.nifi_flow.components.values()
logger.debug(
f"Processing {len(components)} components for keep only ingress/egress"
)
logger.debug(
f"All the connections recognized: {len(self.nifi_flow.connections)}"
)
for index, component in enumerate(components, start=1):
logger.debug(
f"Processing {index}th component for ingress/egress pruning. Component id: {component.id}, name: {component.name}, type: {component.type}"
)
logger.debug(
f"Current amount of connections: {len(self.nifi_flow.connections)}"
)
if (
component.nifi_type is NifiType.PROCESSOR
and component.type
Expand All @@ -592,47 +696,28 @@ def update_flow_keep_only_ingress_egress(self):
NifiType.REMOTE_INPUT_PORT,
NifiType.REMOTE_OUTPUT_PORT,
]:
self.nifi_flow.connections.delete_component(component.id)
components_to_del.append(component)
incoming = list(
filter(lambda x: x[1] == component.id, self.nifi_flow.connections)
)
outgoing = list(
filter(lambda x: x[0] == component.id, self.nifi_flow.connections)
)
# Create new connections from incoming to outgoing
for i in incoming:
for j in outgoing:
self.nifi_flow.connections.append((i[0], j[1]))

# Remove older connections, as we already created
# new connections bypassing component to be deleted

for i in incoming:
self.nifi_flow.connections.remove(i)
for j in outgoing:
self.nifi_flow.connections.remove(j)

for c in components_to_del:
if c.nifi_type is NifiType.PROCESSOR and (
c.name.startswith("Get")
or c.name.startswith("List")
or c.name.startswith("Fetch")
or c.name.startswith("Put")

for component in components_to_del:
if component.nifi_type is NifiType.PROCESSOR and component.name.startswith(
("Get", "List", "Fetch", "Put")
):
self.report.warning(
f"Dropping NiFi Processor of type {c.type}, id {c.id}, name {c.name} from lineage view. \
f"Dropping NiFi Processor of type {component.type}, id {component.id}, name {component.name} from lineage view. \
This is likely an Ingress or Egress node which may be reading to/writing from external datasets \
However not currently supported in datahub",
self.config.site_url,
)
else:
logger.debug(
f"Dropping NiFi Component of type {c.type}, id {c.id}, name {c.name} from lineage view."
f"Dropping NiFi Component of type {component.type}, id {component.id}, name {component.name} from lineage view."
)

del self.nifi_flow.components[c.id]
del self.nifi_flow.components[component.id]

def create_nifi_flow(self):
logger.debug(f"Retrieving NIFI info from {ABOUT_ENDPOINT}")
about_response = self.session.get(
url=urljoin(self.rest_api_base_url, ABOUT_ENDPOINT)
)
Expand All @@ -646,6 +731,8 @@ def create_nifi_flow(self):
)
else:
logger.warning("Failed to fetch version for nifi")
logger.debug(f"Retrieved nifi version: {nifi_version}")
logger.debug(f"Retrieving cluster info from {CLUSTER_ENDPOINT}")
cluster_response = self.session.get(
url=urljoin(self.rest_api_base_url, CLUSTER_ENDPOINT)
)
Expand All @@ -654,8 +741,10 @@ def create_nifi_flow(self):
clustered = (
cluster_response.json().get("clusterSummary", {}).get("clustered")
)
logger.debug(f"Retrieved cluster summary: {clustered}")
else:
logger.warning("Failed to fetch cluster summary for flow")
logger.debug("Retrieving ROOT Process Group")
pg_response = self.session.get(
url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + "root"
)
Expand Down Expand Up @@ -695,7 +784,7 @@ def fetch_provenance_events(
if provenance_response.ok:
provenance = provenance_response.json().get("provenance", {})
provenance_uri = provenance.get("uri")

logger.debug(f"Retrieving provenance uri: {provenance_uri}")
provenance_response = self.session.get(provenance_uri)
if provenance_response.ok:
provenance = provenance_response.json().get("provenance", {})
Expand Down Expand Up @@ -734,7 +823,9 @@ def fetch_provenance_events(

total = provenance.get("results", {}).get("total")
totalCount = provenance.get("results", {}).get("totalCount")
logger.debug(f"Retrieved {totalCount} of {total}")
if total != str(totalCount):
logger.debug("Trying to retrieve more events for the same processor")
yield from self.fetch_provenance_events(
processor, eventType, startDate, oldest_event_time
)
Expand Down Expand Up @@ -800,6 +891,7 @@ def submit_provenance_query(self, processor, eventType, startDate, endDate):
return provenance_response

def delete_provenance(self, provenance_uri):
logger.debug(f"Deleting provenance with uri: {provenance_uri}")
delete_response = self.session.delete(provenance_uri)
if not delete_response.ok:
logger.error("failed to delete provenance ", provenance_uri)
Expand All @@ -821,12 +913,8 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
job_name = component.name
job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id)

incoming = list(
filter(lambda x: x[1] == component.id, self.nifi_flow.connections)
)
outgoing = list(
filter(lambda x: x[0] == component.id, self.nifi_flow.connections)
)
incoming = self.nifi_flow.connections.get_incoming_connections(component.id)
outgoing = self.nifi_flow.connections.get_outgoing_connections(component.id)
inputJobs = set()
jobProperties = None

Expand Down Expand Up @@ -864,8 +952,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
datasetProperties=dataset.dataset_properties,
)

for edge in incoming:
incoming_from = edge[0]
for incoming_from in incoming:
if incoming_from in self.nifi_flow.remotely_accessible_ports.keys():
dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[incoming_from].name}"
dataset_urn = builder.make_dataset_urn(
Expand All @@ -882,8 +969,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
builder.make_data_job_urn_with_flow(flow_urn, incoming_from)
)

for edge in outgoing:
outgoing_to = edge[1]
for outgoing_to in outgoing:
if outgoing_to in self.nifi_flow.remotely_accessible_ports.keys():
dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[outgoing_to].name}"
dataset_urn = builder.make_dataset_urn(
Expand Down Expand Up @@ -977,14 +1063,19 @@ def make_flow_urn(self) -> str:
)

def process_provenance_events(self):
logger.debug("Starting processing of provenance events")
startDate = datetime.now(timezone.utc) - timedelta(
days=self.config.provenance_days
)

eventAnalyzer = NifiProcessorProvenanceEventAnalyzer()
eventAnalyzer.env = self.config.env

for component in self.nifi_flow.components.values():
components = self.nifi_flow.components.values()
logger.debug(f"Processing {len(components)} components")
for component in components:
logger.debug(
f"Processing provenance events for component id: {component.id} name: {component.name}"
)
if component.nifi_type is NifiType.PROCESSOR:
eventType = eventAnalyzer.KNOWN_INGRESS_EGRESS_PROCESORS[component.type]
events = self.fetch_provenance_events(component, eventType, startDate)
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/tests/unit/test_nifi_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.nifi import (
BidirectionalComponentGraph,
NifiComponent,
NifiFlow,
NifiProcessGroup,
Expand Down Expand Up @@ -55,7 +56,7 @@ def test_nifi_s3_provenance_event():
)
},
remotely_accessible_ports={},
connections=[],
connections=BidirectionalComponentGraph(),
processGroups={
"803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup(
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
Expand Down Expand Up @@ -126,7 +127,7 @@ def test_nifi_s3_provenance_event():
)
},
remotely_accessible_ports={},
connections=[],
connections=BidirectionalComponentGraph(),
processGroups={
"803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup(
id="803ebb92-017d-1000-2961-4bdaa27a3ba0",
Expand Down

0 comments on commit e1514d5

Please sign in to comment.