Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/fivetran): add safeguards on table/column lineage #11674

Merged
merged 6 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import logging
from dataclasses import dataclass, field as dataclass_field
from typing import Dict, List, Optional
from typing import Dict, Optional

import pydantic
from pydantic import Field, root_validator
Expand All @@ -23,6 +23,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,24 +115,24 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
return values


@dataclass
@dataclasses.dataclass
class MetadataExtractionPerfReport(Report):
connectors_metadata_extraction_sec: PerfTimer = dataclass_field(
connectors_metadata_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)
connectors_lineage_extraction_sec: PerfTimer = dataclass_field(
connectors_lineage_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)
connectors_jobs_extraction_sec: PerfTimer = dataclass_field(
connectors_jobs_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)


@dataclass
@dataclasses.dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field(
filtered_connectors: LossyList[str] = dataclasses.field(default_factory=LossyList)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclasses.field(
default_factory=MetadataExtractionPerfReport
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Connector:
sync_frequency: int
destination_id: str
user_id: str
table_lineage: List[TableLineage]
lineage: List[TableLineage]
jobs: List["Job"]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
PlatformDetail,
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import (
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_query import (
MAX_JOBS_PER_CONNECTOR,
FivetranLogAPI,
MAX_TABLE_LINEAGE_PER_CONNECTOR,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
Expand Down Expand Up @@ -106,13 +107,21 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
)

for table_lineage in connector.table_lineage:
if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
self.report.warning(
title="Table lineage truncated",
message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. "
f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)

for lineage in connector.lineage:
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_platform,
table_name=(
f"{source_database.lower()}.{table_lineage.source_table}"
f"{source_database.lower()}.{lineage.source_table}"
if source_database
else table_lineage.source_table
else lineage.source_table
),
env=source_platform_detail.env,
platform_instance=source_platform_detail.platform_instance,
Expand All @@ -121,14 +130,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:

output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}",
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
)
output_dataset_urn_list.append(output_dataset_urn)

if self.config.include_column_lineage:
for column_lineage in table_lineage.column_lineage:
for column_lineage in lineage.column_lineage:
fine_grained_lineage.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import json
import logging
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple

import sqlglot
Expand All @@ -22,10 +23,6 @@

logger: logging.Logger = logging.getLogger(__name__)

# We don't want to generate a massive number of dataProcesses for a single connector.
# This is primarily used as a safeguard to prevent performance issues.
MAX_JOBS_PER_CONNECTOR = 1000


class FivetranLogAPI:
def __init__(self, fivetran_log_config: FivetranLogConfig) -> None:
Expand Down Expand Up @@ -91,55 +88,51 @@ def _query(self, query: str) -> List[Dict]:
resp = self.engine.execute(query)
return [row for row in resp]

def _get_column_lineage_metadata(self) -> Dict[str, List]:
def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]:
"""
Return's dict of column lineage metadata with key as '<SOURCE_TABLE_ID>-<DESTINATION_TABLE_ID>'
Returns dict of column lineage metadata with key as (<SOURCE_TABLE_ID>, <DESTINATION_TABLE_ID>)
"""
all_column_lineage: Dict[str, List] = {}
all_column_lineage = defaultdict(list)
column_lineage_result = self._query(
self.fivetran_log_query.get_column_lineage_query()
)
for column_lineage in column_lineage_result:
key = f"{column_lineage[Constant.SOURCE_TABLE_ID]}-{column_lineage[Constant.DESTINATION_TABLE_ID]}"
if key not in all_column_lineage:
all_column_lineage[key] = [column_lineage]
else:
all_column_lineage[key].append(column_lineage)
return all_column_lineage
key = (
column_lineage[Constant.SOURCE_TABLE_ID],
column_lineage[Constant.DESTINATION_TABLE_ID],
)
all_column_lineage[key].append(column_lineage)
return dict(all_column_lineage)

def _get_connectors_table_lineage_metadata(self) -> Dict[str, List]:
def _get_table_lineage_metadata(self) -> Dict[str, List]:
"""
Return's dict of table lineage metadata with key as 'CONNECTOR_ID'
Returns dict of table lineage metadata with key as 'CONNECTOR_ID'
"""
connectors_table_lineage_metadata: Dict[str, List] = {}
connectors_table_lineage_metadata = defaultdict(list)
table_lineage_result = self._query(
self.fivetran_log_query.get_table_lineage_query()
)
for table_lineage in table_lineage_result:
if (
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
not in connectors_table_lineage_metadata
):
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
] = [table_lineage]
else:
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
].append(table_lineage)
return connectors_table_lineage_metadata
].append(table_lineage)
return dict(connectors_table_lineage_metadata)

def _get_table_lineage(
def _extract_connector_lineage(
self,
column_lineage_metadata: Dict[str, List],
table_lineage_result: Optional[List],
column_lineage_metadata: Dict[Tuple[str, str], List],
) -> List[TableLineage]:
table_lineage_list: List[TableLineage] = []
if table_lineage_result is None:
return table_lineage_list
for table_lineage in table_lineage_result:
# Join the column lineage into the table lineage.
column_lineage_result = column_lineage_metadata.get(
f"{table_lineage[Constant.SOURCE_TABLE_ID]}-{table_lineage[Constant.DESTINATION_TABLE_ID]}"
(
table_lineage[Constant.SOURCE_TABLE_ID],
table_lineage[Constant.DESTINATION_TABLE_ID],
)
)
column_lineage_list: List[ColumnLineage] = []
if column_lineage_result:
Expand All @@ -152,6 +145,7 @@ def _get_table_lineage(
)
for column_lineage in column_lineage_result
]

table_lineage_list.append(
TableLineage(
source_table=f"{table_lineage[Constant.SOURCE_SCHEMA_NAME]}.{table_lineage[Constant.SOURCE_TABLE_NAME]}",
Expand All @@ -167,14 +161,9 @@ def _get_all_connector_sync_logs(
) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]:
sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {}

# Format connector_ids as a comma-separated string of quoted IDs
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)

query = self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
query = self.fivetran_log_query.get_sync_logs_query(
syncs_interval=syncs_interval,
max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR,
connector_ids=formatted_connector_ids,
connector_ids=connector_ids,
)

for row in self._query(query):
Expand Down Expand Up @@ -234,13 +223,13 @@ def get_user_email(self, user_id: str) -> Optional[str]:
return None
return self._get_users().get(user_id)

def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
for connector in connectors:
connector.table_lineage = self._get_table_lineage(
column_lineage_metadata=column_lineage_metadata,
connector.lineage = self._extract_connector_lineage(
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
column_lineage_metadata=column_lineage_metadata,
)

def _fill_connectors_jobs(
Expand All @@ -262,6 +251,7 @@ def get_allowed_connectors_list(
) -> List[Connector]:
connectors: List[Connector] = []
with report.metadata_extraction_perf.connectors_metadata_extraction_sec:
logger.info("Fetching connector list")
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
Expand All @@ -279,12 +269,20 @@ def get_allowed_connectors_list(
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_id=connector[Constant.CONNECTING_USER_ID],
table_lineage=[],
jobs=[],
lineage=[], # filled later
jobs=[], # filled later
)
)

if not connectors:
# Some of our queries don't work well when there's no connectors, since
# we push down connector id filters.
return []

with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
self._fill_connectors_table_lineage(connectors)
logger.info("Fetching connector lineage")
self._fill_connectors_lineage(connectors)
with report.metadata_extraction_perf.connectors_jobs_extraction_sec:
logger.info("Fetching connector job run history")
self._fill_connectors_jobs(connectors, syncs_interval)
return connectors
Loading
Loading