Skip to content

Commit e96323a

Browse files
authored
feat(ingest/fivetran): show connector filter reason (datahub-project#11695)
1 parent 326afc6 commit e96323a

File tree

5 files changed

+43
-21
lines changed

5 files changed

+43
-21
lines changed

metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ source:
2525
client_email: "client_email"
2626
client_id: "client_id"
2727
private_key: "private_key"
28-
dataset: "fivetran_log_dataset"
28+
dataset: "fivetran_log_dataset"
2929

3030
# Optional - filter for certain connector names instead of ingesting everything.
3131
# connector_patterns:
@@ -35,7 +35,7 @@ source:
3535
# Optional -- A mapping of the connector's all sources to its database.
3636
# sources_to_database:
3737
# connector_id: source_db
38-
38+
3939
# Optional -- This mapping is optional and only required to configure platform-instance for source
4040
# A mapping of Fivetran connector id to data platform instance
4141
# sources_to_platform_instance:

metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
160160
)
161161
connector_patterns: AllowDenyPattern = Field(
162162
default=AllowDenyPattern.allow_all(),
163-
description="Filtering regex patterns for connector ids. "
164-
"They're visible in the Fivetran UI under Connectors -> Setup -> Fivetran Connector ID.",
163+
description="Filtering regex patterns for connector names.",
165164
)
166165
destination_patterns: AllowDenyPattern = Field(
167166
default=AllowDenyPattern.allow_all(),

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py

+24-13
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,21 @@ def _query(self, query: str) -> List[Dict]:
8484
query = sqlglot.parse_one(query, dialect="snowflake").sql(
8585
dialect=self.fivetran_log_config.destination_platform, pretty=True
8686
)
87-
logger.debug(f"Query : {query}")
87+
logger.info(f"Executing query: {query}")
8888
resp = self.engine.execute(query)
8989
return [row for row in resp]
9090

91-
def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]:
91+
def _get_column_lineage_metadata(
92+
self, connector_ids: List[str]
93+
) -> Dict[Tuple[str, str], List]:
9294
"""
9395
Returns dict of column lineage metadata with key as (<SOURCE_TABLE_ID>, <DESTINATION_TABLE_ID>)
9496
"""
9597
all_column_lineage = defaultdict(list)
9698
column_lineage_result = self._query(
97-
self.fivetran_log_query.get_column_lineage_query()
99+
self.fivetran_log_query.get_column_lineage_query(
100+
connector_ids=connector_ids
101+
)
98102
)
99103
for column_lineage in column_lineage_result:
100104
key = (
@@ -104,13 +108,13 @@ def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]:
104108
all_column_lineage[key].append(column_lineage)
105109
return dict(all_column_lineage)
106110

107-
def _get_table_lineage_metadata(self) -> Dict[str, List]:
111+
def _get_table_lineage_metadata(self, connector_ids: List[str]) -> Dict[str, List]:
108112
"""
109113
Returns dict of table lineage metadata with key as 'CONNECTOR_ID'
110114
"""
111115
connectors_table_lineage_metadata = defaultdict(list)
112116
table_lineage_result = self._query(
113-
self.fivetran_log_query.get_table_lineage_query()
117+
self.fivetran_log_query.get_table_lineage_query(connector_ids=connector_ids)
114118
)
115119
for table_lineage in table_lineage_result:
116120
connectors_table_lineage_metadata[
@@ -224,8 +228,9 @@ def get_user_email(self, user_id: str) -> Optional[str]:
224228
return self._get_users().get(user_id)
225229

226230
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
227-
table_lineage_metadata = self._get_table_lineage_metadata()
228-
column_lineage_metadata = self._get_column_lineage_metadata()
231+
connector_ids = [connector.connector_id for connector in connectors]
232+
table_lineage_metadata = self._get_table_lineage_metadata(connector_ids)
233+
column_lineage_metadata = self._get_column_lineage_metadata(connector_ids)
229234
for connector in connectors:
230235
connector.lineage = self._extract_connector_lineage(
231236
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
@@ -254,20 +259,25 @@ def get_allowed_connectors_list(
254259
logger.info("Fetching connector list")
255260
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
256261
for connector in connector_list:
257-
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
258-
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
262+
connector_name = connector[Constant.CONNECTOR_NAME]
263+
if not connector_patterns.allowed(connector_name):
264+
report.report_connectors_dropped(connector_name)
259265
continue
260-
if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]):
261-
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
266+
if not destination_patterns.allowed(
267+
destination_id := connector[Constant.DESTINATION_ID]
268+
):
269+
report.report_connectors_dropped(
270+
f"{connector_name} (destination_id: {destination_id})"
271+
)
262272
continue
263273
connectors.append(
264274
Connector(
265275
connector_id=connector[Constant.CONNECTOR_ID],
266-
connector_name=connector[Constant.CONNECTOR_NAME],
276+
connector_name=connector_name,
267277
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
268278
paused=connector[Constant.PAUSED],
269279
sync_frequency=connector[Constant.SYNC_FREQUENCY],
270-
destination_id=connector[Constant.DESTINATION_ID],
280+
destination_id=destination_id,
271281
user_id=connector[Constant.CONNECTING_USER_ID],
272282
lineage=[], # filled later
273283
jobs=[], # filled later
@@ -279,6 +289,7 @@ def get_allowed_connectors_list(
279289
# we push down connector id filters.
280290
logger.info("No allowed connectors found")
281291
return []
292+
logger.info(f"Found {len(connectors)} allowed connectors")
282293

283294
with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
284295
logger.info("Fetching connector lineage")

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ def get_sync_logs_query(
8080
ORDER BY connector_id, end_time DESC
8181
"""
8282

83-
def get_table_lineage_query(self) -> str:
83+
def get_table_lineage_query(self, connector_ids: List[str]) -> str:
84+
# Format connector_ids as a comma-separated string of quoted IDs
85+
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)
86+
8487
return f"""\
8588
SELECT
8689
stm.connector_id as connector_id,
@@ -95,11 +98,15 @@ def get_table_lineage_query(self) -> str:
9598
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
9699
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
97100
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
101+
WHERE stm.connector_id IN ({formatted_connector_ids})
98102
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
99103
ORDER BY stm.connector_id, tl.created_at DESC
100104
"""
101105

102-
def get_column_lineage_query(self) -> str:
106+
def get_column_lineage_query(self, connector_ids: List[str]) -> str:
107+
# Format connector_ids as a comma-separated string of quoted IDs
108+
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)
109+
103110
return f"""\
104111
SELECT
105112
scm.table_id as source_table_id,
@@ -114,6 +121,7 @@ def get_column_lineage_query(self) -> str:
114121
-- Only joining source_table_metadata to get the connector_id.
115122
JOIN {self.db_clause}source_table_metadata as stm
116123
ON scm.table_id = stm.id
124+
WHERE stm.connector_id IN ({formatted_connector_ids})
117125
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
118126
ORDER BY stm.connector_id, cl.created_at DESC
119127
"""

metadata-ingestion/tests/integration/fivetran/test_fivetran.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ def default_query_results(
4343
return []
4444
elif query == fivetran_log_query.get_connectors_query():
4545
return connector_query_results
46-
elif query == fivetran_log_query.get_table_lineage_query():
46+
elif query == fivetran_log_query.get_table_lineage_query(
47+
connector_ids=["calendar_elected"]
48+
):
4749
return [
4850
{
4951
"connector_id": "calendar_elected",
@@ -64,7 +66,9 @@ def default_query_results(
6466
"destination_schema_name": "postgres_public",
6567
},
6668
]
67-
elif query == fivetran_log_query.get_column_lineage_query():
69+
elif query == fivetran_log_query.get_column_lineage_query(
70+
connector_ids=["calendar_elected"]
71+
):
6872
return [
6973
{
7074
"source_table_id": "10040",

0 commit comments

Comments
 (0)