-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318
base: master
Are you sure you want to change the base?
feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318
Conversation
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
Outdated
Show resolved
Hide resolved
c9e611d
to
457f96e
Compare
metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
Outdated
Show resolved
Hide resolved
- moved stream lineage from snowflake_v2.py & snowflake_lineage_v2.py to snowflake_schema_gen.py - updated snowflake_schema_gen.py to use snowflake_utils.py
- changed from clone table logic to manually mapping columns since metadata columns were attempting to be mapped from the stream source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple quick comments from my skim over this
@mayurinehate should be back monday and can do the final reviews + merge this
stream_pattern: AllowDenyPattern = Field( | ||
default=AllowDenyPattern.allow_all(), | ||
description="Regex patterns for streams to filter in ingestion.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is redundant, since it inherits from SnowflakeFilterConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the code
custom_properties["BASE_TABLES"] = table.base_tables | ||
|
||
if table.stale_after: | ||
custom_properties["STALE_AFTER"] = table.stale_after.isoformat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be easier to do something like this - and avoid all the if statements
custom_properties = {
k: v
for k, v in {
"TABLE_NAME": table.table_name,
...
}
if v
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaned up code
""" | ||
Populate Streams upstream tables excluding the metadata columns | ||
""" | ||
if self.aggregator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would the aggregator be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, lint throws the error:
src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py:1475: error: Item "None" of "SqlParsingAggregator | None" has no attribute "add_known_query_lineage"
@@ -58,6 +59,7 @@ | |||
SnowflakeIdentifierBuilder, | |||
SnowflakeStructuredReportMixin, | |||
SnowsightUrlBuilder, | |||
_split_qualified_name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just make this method "public" by removing the _
prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this public
… into snowflake-streams-v2 merge local
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed | ||
) | ||
|
||
# If a stream is used, default to query parsing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment as to why this was required - as to the fact that snowflake objects_modified does not include correct stream references however direct_objects_accessed does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment in code
# If a stream is used, default to query parsing. | ||
if has_stream_objects: | ||
logger.debug("Found matching stream object") | ||
self.aggregator.add_observed_query( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you modify this to yield ObservedQuery and update signature of _parse_audit_log_row
that it can return Optional[Union[TableRename, TableSwap, PreparsedQuery, ObservedQuery]]
and any other required typing changes for this to work.
It would mean that we would add to aggregator only at one place and it would be easier to debug audit log.
@@ -64,6 +64,7 @@ class SnowflakeReport(SQLSourceReport, BaseTimeWindowReport): | |||
num_table_to_view_edges_scanned: int = 0 | |||
num_view_to_table_edges_scanned: int = 0 | |||
num_external_table_edges_scanned: int = 0 | |||
num_stream_edges_scanned: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, removed
@@ -112,6 +114,7 @@ class SnowflakeV2Report( | |||
table_lineage_query_secs: float = -1 | |||
external_lineage_queries_secs: float = -1 | |||
num_tables_with_known_upstreams: int = 0 | |||
num_streams_with_known_upstreams: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, removed
name: str | ||
created: datetime | ||
owner: str | ||
comment: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is comment always present ? Can this be None ? If so, better to mark with type Optional[str]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made Optional[str]
source_db, source_schema, source_name = source_parts | ||
|
||
# Get columns from source object | ||
source_columns = self.get_columns_for_table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly concerned about this call which would trigger fetch of columns of entire database containing this table but need to think through any better alternatives with less duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When calling columns_for_schema, a list is passed containing the table name. Shouldn't this then filter the tables?
if column_lineage: | ||
self.aggregator.add_known_query_lineage( | ||
known_query_lineage=KnownQueryLineageInfo( | ||
query_id=f"stream_lineage_{stream.name}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is stream_name fully qualified ? @hsheth2 is there better way for adding this to aggregator ? Using add_known_lineage_mapping
will generate different query id every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brock-acryl is there no way to get stream definition query ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streams do not have a definition like views and tables, they only have the output of show streams which is the same as describe stream.
metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py
Show resolved
Hide resolved
# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs | ||
# but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, | ||
# it should be pretty straightforward to refactor this and only initialize the aggregator once. | ||
self.report.queries_extractor = queries_extractor.report | ||
yield from queries_extractor.get_workunits_internal() | ||
queries_extractor.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is change in indentation by accident ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
pipeline = Pipeline(snowflake_pipeline_config) | ||
pipeline.run() | ||
assert "permission-error" in [ | ||
assert [] == [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any thoughts why this got removed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
snowflake show commands return empty lists and not permission errors. Without this change, the test fails.
- added comments - removed unused num_stream reports - made SnowflakeStream comments optional - defined tables and view argument datatypes - updated allowed pattern - fixed indentation
Checklist
Properly picks up: