Skip to content

Commit eab2ac7

Browse files
authored
feat(ingest/snowflake): support lineage via rename and swap using que… (datahub-project#11600)
1 parent e96323a commit eab2ac7

10 files changed

+2037
-81
lines changed

metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py

+10-9
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
)
5454
from datahub.metadata.urns import DatasetUrn
5555
from datahub.sql_parsing.schema_resolver import SchemaResolver
56+
from datahub.sql_parsing.sql_parsing_aggregator import TableRename
5657
from datahub.sql_parsing.sqlglot_utils import get_dialect, parse_statement
5758
from datahub.utilities import memory_footprint
5859
from datahub.utilities.dedup_list import deduplicate_list
@@ -504,21 +505,21 @@ def _populate_lineage_map(
504505
self.report_status(f"extract-{lineage_type.name}", False)
505506

506507
def _update_lineage_map_for_table_renames(
507-
self, table_renames: Dict[str, str]
508+
self, table_renames: Dict[str, TableRename]
508509
) -> None:
509510
if not table_renames:
510511
return
511512

512513
logger.info(f"Updating lineage map for {len(table_renames)} table renames")
513-
for new_table_urn, prev_table_urn in table_renames.items():
514+
for entry in table_renames.values():
514515
# This table was renamed from some other name, copy in the lineage
515516
# for the previous name as well.
516-
prev_table_lineage = self._lineage_map.get(prev_table_urn)
517+
prev_table_lineage = self._lineage_map.get(entry.original_urn)
517518
if prev_table_lineage:
518519
logger.debug(
519-
f"including lineage for {prev_table_urn} in {new_table_urn} due to table rename"
520+
f"including lineage for {entry.original_urn} in {entry.new_urn} due to table rename"
520521
)
521-
self._lineage_map[new_table_urn].merge_lineage(
522+
self._lineage_map[entry.new_urn].merge_lineage(
522523
upstreams=prev_table_lineage.upstreams,
523524
cll=prev_table_lineage.cll,
524525
)
@@ -672,7 +673,7 @@ def populate_lineage(
672673
for db, schemas in all_tables.items()
673674
}
674675

675-
table_renames: Dict[str, str] = {}
676+
table_renames: Dict[str, TableRename] = {}
676677
if self.config.include_table_rename_lineage:
677678
table_renames, all_tables_set = self._process_table_renames(
678679
database=database,
@@ -851,11 +852,11 @@ def _process_table_renames(
851852
database: str,
852853
connection: redshift_connector.Connection,
853854
all_tables: Dict[str, Dict[str, Set[str]]],
854-
) -> Tuple[Dict[str, str], Dict[str, Dict[str, Set[str]]]]:
855+
) -> Tuple[Dict[str, TableRename], Dict[str, Dict[str, Set[str]]]]:
855856
logger.info(f"Processing table renames for db {database}")
856857

857858
# new urn -> prev urn
858-
table_renames: Dict[str, str] = {}
859+
table_renames: Dict[str, TableRename] = {}
859860

860861
query = self.queries.alter_table_rename_query(
861862
db_name=database,
@@ -893,7 +894,7 @@ def _process_table_renames(
893894
env=self.config.env,
894895
)
895896

896-
table_renames[new_urn] = prev_urn
897+
table_renames[new_urn] = TableRename(prev_urn, new_urn, query_text)
897898

898899
# We want to generate lineage for the previous name too.
899900
all_tables[database][schema].add(prev_name)

metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,8 @@ def build(
146146
lambda: collections.defaultdict(set)
147147
),
148148
)
149-
for new_urn, original_urn in table_renames.items():
150-
self.aggregator.add_table_rename(
151-
original_urn=original_urn, new_urn=new_urn
152-
)
149+
for entry in table_renames.values():
150+
self.aggregator.add_table_rename(entry)
153151

154152
if self.config.table_lineage_mode in {
155153
LineageMode.SQL_BASED,

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

+69-6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
PreparsedQuery,
5353
SqlAggregatorReport,
5454
SqlParsingAggregator,
55+
TableRename,
56+
TableSwap,
5557
)
5658
from datahub.sql_parsing.sql_parsing_common import QueryType
5759
from datahub.sql_parsing.sqlglot_lineage import (
@@ -116,6 +118,8 @@ class SnowflakeQueriesExtractorReport(Report):
116118
audit_log_load_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
117119
sql_aggregator: Optional[SqlAggregatorReport] = None
118120

121+
num_ddl_queries_dropped: int = 0
122+
119123

120124
@dataclass
121125
class SnowflakeQueriesSourceReport(SourceReport):
@@ -225,7 +229,9 @@ def get_workunits_internal(
225229
audit_log_file = self.local_temp_path / "audit_log.sqlite"
226230
use_cached_audit_log = audit_log_file.exists()
227231

228-
queries: FileBackedList[Union[KnownLineageMapping, PreparsedQuery]]
232+
queries: FileBackedList[
233+
Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
234+
]
229235
if use_cached_audit_log:
230236
logger.info("Using cached audit log")
231237
shared_connection = ConnectionWrapper(audit_log_file)
@@ -235,7 +241,7 @@ def get_workunits_internal(
235241

236242
shared_connection = ConnectionWrapper(audit_log_file)
237243
queries = FileBackedList(shared_connection)
238-
entry: Union[KnownLineageMapping, PreparsedQuery]
244+
entry: Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
239245

240246
with self.report.copy_history_fetch_timer:
241247
for entry in self.fetch_copy_history():
@@ -296,7 +302,7 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]:
296302

297303
def fetch_query_log(
298304
self,
299-
) -> Iterable[PreparsedQuery]:
305+
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]:
300306
query_log_query = _build_enriched_query_log_query(
301307
start_time=self.config.window.start_time,
302308
end_time=self.config.window.end_time,
@@ -324,12 +330,16 @@ def fetch_query_log(
324330
exc=e,
325331
)
326332
else:
327-
yield entry
333+
if entry:
334+
yield entry
328335

329-
def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
336+
def _parse_audit_log_row(
337+
self, row: Dict[str, Any]
338+
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]:
330339
json_fields = {
331340
"DIRECT_OBJECTS_ACCESSED",
332341
"OBJECTS_MODIFIED",
342+
"OBJECT_MODIFIED_BY_DDL",
333343
}
334344

335345
res = {}
@@ -341,6 +351,17 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
341351

342352
direct_objects_accessed = res["direct_objects_accessed"]
343353
objects_modified = res["objects_modified"]
354+
object_modified_by_ddl = res["object_modified_by_ddl"]
355+
356+
if object_modified_by_ddl and not objects_modified:
357+
ddl_entry: Optional[Union[TableRename, TableSwap]] = None
358+
with self.structured_reporter.report_exc(
359+
"Error fetching ddl lineage from Snowflake"
360+
):
361+
ddl_entry = self.parse_ddl_query(
362+
res["query_text"], object_modified_by_ddl
363+
)
364+
return ddl_entry
344365

345366
upstreams = []
346367
column_usage = {}
@@ -437,6 +458,45 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
437458
)
438459
return entry
439460

461+
def parse_ddl_query(
462+
self, query: str, object_modified_by_ddl: dict
463+
) -> Optional[Union[TableRename, TableSwap]]:
464+
if object_modified_by_ddl[
465+
"operationType"
466+
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
467+
urn1 = self.identifiers.gen_dataset_urn(
468+
self.identifiers.get_dataset_identifier_from_qualified_name(
469+
object_modified_by_ddl["objectName"]
470+
)
471+
)
472+
473+
urn2 = self.identifiers.gen_dataset_urn(
474+
self.identifiers.get_dataset_identifier_from_qualified_name(
475+
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
476+
)
477+
)
478+
479+
return TableSwap(urn1, urn2, query)
480+
elif object_modified_by_ddl[
481+
"operationType"
482+
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
483+
original_un = self.identifiers.gen_dataset_urn(
484+
self.identifiers.get_dataset_identifier_from_qualified_name(
485+
object_modified_by_ddl["objectName"]
486+
)
487+
)
488+
489+
new_urn = self.identifiers.gen_dataset_urn(
490+
self.identifiers.get_dataset_identifier_from_qualified_name(
491+
object_modified_by_ddl["properties"]["objectName"]["value"]
492+
)
493+
)
494+
495+
return TableRename(original_un, new_urn, query)
496+
else:
497+
self.report.num_ddl_queries_dropped += 1
498+
return None
499+
440500
def close(self) -> None:
441501
self._exit_stack.close()
442502

@@ -542,6 +602,7 @@ def _build_enriched_query_log_query(
542602
user_name,
543603
direct_objects_accessed,
544604
objects_modified,
605+
object_modified_by_ddl
545606
FROM
546607
snowflake.account_usage.access_history
547608
WHERE
@@ -563,8 +624,9 @@ def _build_enriched_query_log_query(
563624
) as direct_objects_accessed,
564625
-- TODO: Drop the columns.baseSources subfield.
565626
FILTER(objects_modified, o -> o:objectDomain IN {SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER}) as objects_modified,
627+
case when object_modified_by_ddl:objectDomain IN {SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER} then object_modified_by_ddl else null end as object_modified_by_ddl
566628
FROM raw_access_history
567-
WHERE ( array_size(direct_objects_accessed) > 0 or array_size(objects_modified) > 0 )
629+
WHERE ( array_size(direct_objects_accessed) > 0 or array_size(objects_modified) > 0 or object_modified_by_ddl is not null )
568630
)
569631
, query_access_history AS (
570632
SELECT
@@ -586,6 +648,7 @@ def _build_enriched_query_log_query(
586648
q.role_name AS "ROLE_NAME",
587649
a.direct_objects_accessed,
588650
a.objects_modified,
651+
a.object_modified_by_ddl
589652
FROM deduplicated_queries q
590653
JOIN filtered_access_history a USING (query_id)
591654
)

0 commit comments

Comments
 (0)