Skip to content

Commit 7a519ac

Browse files
authored
fix(ingest/dbt): resolve more dbt ephemeral node lineage gaps (datahub-project#10553)
1 parent 666de9e commit 7a519ac

File tree

4 files changed

+127
-62
lines changed

4 files changed

+127
-62
lines changed

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

+86-56
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,17 @@
142142

143143
@dataclass
144144
class DBTSourceReport(StaleEntityRemovalSourceReport):
145-
sql_statements_parsed: int = 0
146-
sql_statements_table_error: int = 0
147-
sql_statements_column_error: int = 0
148-
sql_parser_detach_ctes_failures: LossyList[str] = field(default_factory=LossyList)
149145
sql_parser_skipped_missing_code: LossyList[str] = field(default_factory=LossyList)
146+
sql_parser_parse_failures: int = 0
147+
sql_parser_detach_ctes_failures: int = 0
148+
sql_parser_table_errors: int = 0
149+
sql_parser_column_errors: int = 0
150+
sql_parser_successes: int = 0
151+
152+
sql_parser_parse_failures_list: LossyList[str] = field(default_factory=LossyList)
153+
sql_parser_detach_ctes_failures_list: LossyList[str] = field(
154+
default_factory=LossyList
155+
)
150156

151157
in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList)
152158

@@ -558,10 +564,11 @@ def get_fake_ephemeral_table_name(self) -> str:
558564
assert self.is_ephemeral_model()
559565

560566
# Similar to get_db_fqn.
561-
fqn = self._join_parts(
567+
db_fqn = self._join_parts(
562568
[self.database, self.schema, f"__datahub__dbt__ephemeral__{self.name}"]
563569
)
564-
return fqn.replace('"', "")
570+
db_fqn = db_fqn.lower()
571+
return db_fqn.replace('"', "")
565572

566573
def get_urn_for_upstream_lineage(
567574
self,
@@ -819,9 +826,10 @@ def get_column_type(
819826

820827
# if still not found, report the warning
821828
if TypeClass is None:
822-
report.report_warning(
823-
dataset_name, f"unable to map type {column_type} to metadata schema"
824-
)
829+
if column_type:
830+
report.report_warning(
831+
dataset_name, f"unable to map type {column_type} to metadata schema"
832+
)
825833
TypeClass = NullTypeClass
826834

827835
return SchemaFieldDataType(type=TypeClass())
@@ -1041,15 +1049,16 @@ def _infer_schemas_and_update_cll( # noqa: C901
10411049

10421050
# Iterate over the dbt nodes in topological order.
10431051
# This ensures that we process upstream nodes before downstream nodes.
1044-
for dbt_name in topological_sort(
1052+
node_order = topological_sort(
10451053
list(all_nodes_map.keys()),
10461054
edges=list(
10471055
(upstream, node.dbt_name)
10481056
for node in all_nodes_map.values()
10491057
for upstream in node.upstream_nodes
10501058
if upstream in all_nodes_map
10511059
),
1052-
):
1060+
)
1061+
for dbt_name in node_order:
10531062
node = all_nodes_map[dbt_name]
10541063
logger.debug(f"Processing CLL/schemas for {node.dbt_name}")
10551064

@@ -1119,55 +1128,26 @@ def _infer_schemas_and_update_cll( # noqa: C901
11191128

11201129
# Run sql parser to infer the schema + generate column lineage.
11211130
sql_result = None
1122-
if node.node_type in {"source", "test"}:
1131+
if node.node_type in {"source", "test", "seed"}:
11231132
# For sources, we generate CLL as a 1:1 mapping.
1124-
# We don't support CLL for tests (assertions).
1133+
# We don't support CLL for tests (assertions) or seeds.
11251134
pass
11261135
elif node.compiled_code:
1127-
try:
1128-
# Add CTE stops based on the upstreams list.
1129-
cte_mapping = {
1130-
cte_name: upstream_node.get_fake_ephemeral_table_name()
1131-
for upstream_node in [
1132-
all_nodes_map[upstream_node_name]
1133-
for upstream_node_name in node.upstream_nodes
1134-
if upstream_node_name in all_nodes_map
1135-
]
1136-
if upstream_node.is_ephemeral_model()
1137-
for cte_name in _get_dbt_cte_names(
1138-
upstream_node.name, schema_resolver.platform
1139-
)
1140-
}
1141-
preprocessed_sql = detach_ctes(
1142-
parse_statements_and_pick(
1143-
node.compiled_code,
1144-
platform=schema_resolver.platform,
1145-
),
1146-
platform=schema_resolver.platform,
1147-
cte_mapping=cte_mapping,
1148-
)
1149-
except Exception as e:
1150-
self.report.sql_parser_detach_ctes_failures.append(node.dbt_name)
1151-
logger.debug(
1152-
f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage."
1153-
)
1154-
sql_result = SqlParsingResult.make_from_error(e)
1155-
else:
1156-
sql_result = sqlglot_lineage(
1157-
preprocessed_sql, schema_resolver=schema_resolver
1136+
# Add CTE stops based on the upstreams list.
1137+
cte_mapping = {
1138+
cte_name: upstream_node.get_fake_ephemeral_table_name()
1139+
for upstream_node in [
1140+
all_nodes_map[upstream_node_name]
1141+
for upstream_node_name in node.upstream_nodes
1142+
if upstream_node_name in all_nodes_map
1143+
]
1144+
if upstream_node.is_ephemeral_model()
1145+
for cte_name in _get_dbt_cte_names(
1146+
upstream_node.name, schema_resolver.platform
11581147
)
1159-
if sql_result.debug_info.error:
1160-
self.report.sql_statements_table_error += 1
1161-
logger.info(
1162-
f"Failed to parse compiled code for {node.dbt_name}: {sql_result.debug_info.error}"
1163-
)
1164-
elif sql_result.debug_info.column_error:
1165-
self.report.sql_statements_column_error += 1
1166-
logger.info(
1167-
f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}"
1168-
)
1169-
else:
1170-
self.report.sql_statements_parsed += 1
1148+
}
1149+
1150+
sql_result = self._parse_cll(node, cte_mapping, schema_resolver)
11711151
else:
11721152
self.report.sql_parser_skipped_missing_code.append(node.dbt_name)
11731153

@@ -1212,6 +1192,56 @@ def _infer_schemas_and_update_cll( # noqa: C901
12121192
if inferred_schema_fields:
12131193
node.columns_setdefault(inferred_schema_fields)
12141194

1195+
def _parse_cll(
1196+
self,
1197+
node: DBTNode,
1198+
cte_mapping: Dict[str, str],
1199+
schema_resolver: SchemaResolver,
1200+
) -> SqlParsingResult:
1201+
assert node.compiled_code is not None
1202+
1203+
try:
1204+
picked_statement = parse_statements_and_pick(
1205+
node.compiled_code,
1206+
platform=schema_resolver.platform,
1207+
)
1208+
except Exception as e:
1209+
logger.debug(
1210+
f"Failed to parse compiled code. {node.dbt_name} will not have column lineage."
1211+
)
1212+
self.report.sql_parser_parse_failures += 1
1213+
self.report.sql_parser_parse_failures_list.append(node.dbt_name)
1214+
return SqlParsingResult.make_from_error(e)
1215+
1216+
try:
1217+
preprocessed_sql = detach_ctes(
1218+
picked_statement,
1219+
platform=schema_resolver.platform,
1220+
cte_mapping=cte_mapping,
1221+
)
1222+
except Exception as e:
1223+
self.report.sql_parser_detach_ctes_failures += 1
1224+
self.report.sql_parser_detach_ctes_failures_list.append(node.dbt_name)
1225+
logger.debug(
1226+
f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage."
1227+
)
1228+
return SqlParsingResult.make_from_error(e)
1229+
1230+
sql_result = sqlglot_lineage(preprocessed_sql, schema_resolver=schema_resolver)
1231+
if sql_result.debug_info.table_error:
1232+
self.report.sql_parser_table_errors += 1
1233+
logger.info(
1234+
f"Failed to generate any CLL lineage for {node.dbt_name}: {sql_result.debug_info.error}"
1235+
)
1236+
elif sql_result.debug_info.column_error:
1237+
self.report.sql_parser_column_errors += 1
1238+
logger.info(
1239+
f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}"
1240+
)
1241+
else:
1242+
self.report.sql_parser_successes += 1
1243+
return sql_result
1244+
12151245
def create_dbt_platform_mces(
12161246
self,
12171247
dbt_nodes: List[DBTNode],

metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ def _column_level_lineage( # noqa: C901
365365
col_normalized = col
366366

367367
table_schema_normalized_mapping[table][col_normalized] = col
368-
normalized_table_schema[col_normalized] = col_type
368+
normalized_table_schema[col_normalized] = col_type or "UNKNOWN"
369369

370370
sqlglot_db_schema.add_table(
371371
table.as_sqlglot_table(),
@@ -923,12 +923,20 @@ def _sqlglot_lineage_inner(
923923
out_urns = sorted({table_name_urn_mapping[table] for table in modified})
924924
column_lineage_urns = None
925925
if column_lineage:
926-
column_lineage_urns = [
927-
_translate_internal_column_lineage(
928-
table_name_urn_mapping, internal_col_lineage, dialect=dialect
926+
try:
927+
column_lineage_urns = [
928+
_translate_internal_column_lineage(
929+
table_name_urn_mapping, internal_col_lineage, dialect=dialect
930+
)
931+
for internal_col_lineage in column_lineage
932+
]
933+
except KeyError as e:
934+
# When this happens, it's usually because of things like PIVOT where we can't
935+
# really go up the scope chain.
936+
logger.debug(
937+
f"Failed to translate column lineage to urns: {e}", exc_info=True
929938
)
930-
for internal_col_lineage in column_lineage
931-
]
939+
debug_info.column_error = e
932940

933941
query_type, query_type_props = get_query_type_of_sql(
934942
original_statement, dialect=dialect

metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py

+2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ def parse_statement(
8181

8282

8383
def parse_statements_and_pick(sql: str, platform: DialectOrStr) -> sqlglot.Expression:
84+
logger.debug("Parsing SQL query: %s", sql)
85+
8486
dialect = get_dialect(platform)
8587
statements = [
8688
expression for expression in sqlglot.parse(sql, dialect=dialect) if expression

metadata-ingestion/tests/integration/powerbi/golden_test_cll.json

+25
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,31 @@
907907
"lastRunId": "no-run-id-provided"
908908
}
909909
},
910+
{
911+
"entityType": "dataset",
912+
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)",
913+
"changeType": "UPSERT",
914+
"aspectName": "upstreamLineage",
915+
"aspect": {
916+
"json": {
917+
"upstreams": [
918+
{
919+
"auditStamp": {
920+
"time": 0,
921+
"actor": "urn:li:corpuser:unknown"
922+
},
923+
"dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)",
924+
"type": "TRANSFORMED"
925+
}
926+
]
927+
}
928+
},
929+
"systemMetadata": {
930+
"lastObserved": 1643871600000,
931+
"runId": "powerbi-test",
932+
"lastRunId": "no-run-id-provided"
933+
}
934+
},
910935
{
911936
"entityType": "corpuser",
912937
"entityUrn": "urn:li:corpuser:[email protected]",

0 commit comments

Comments
 (0)