Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

brock-acryl
Copy link

@brock-acryl brock-acryl commented Jan 10, 2025

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub
    Properly picks up:
  • Streams and columns in a stream (including metadata columns)
  • Stream upstream lineage from show streams statement
  • Stream downstream lineage by parsing queries (tested inserts, CTAS, inserts with unions)
  • Stream properties
Screenshot 2025-01-10 at 11 11 23 AM

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels Jan 10, 2025
@datahub-cyborg datahub-cyborg bot added the pending-submitter-response Issue/request has been reviewed but requires a response from the submitter label Jan 10, 2025
@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 11, 2025
@brock-acryl brock-acryl reopened this Jan 13, 2025
Copy link

codecov bot commented Jan 13, 2025

Codecov Report

Attention: Patch coverage is 89.01734% with 19 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...ingestion/source/snowflake/snowflake_schema_gen.py 90.00% 10 Missing ⚠️
...ub/ingestion/source/snowflake/snowflake_queries.py 0.00% 6 Missing ⚠️
...hub/ingestion/source/snowflake/snowflake_schema.py 95.45% 2 Missing ⚠️
...ahub/ingestion/source/snowflake/snowflake_utils.py 80.00% 1 Missing ⚠️
Files with missing lines Coverage Δ
...on/src/datahub/ingestion/source/common/subtypes.py 100.00% <100.00%> (ø)
...rc/datahub/ingestion/source/snowflake/constants.py 100.00% <100.00%> (ø)
...hub/ingestion/source/snowflake/snowflake_config.py 98.01% <100.00%> (+0.69%) ⬆️
...ahub/ingestion/source/snowflake/snowflake_query.py 93.95% <100.00%> (+0.20%) ⬆️
...hub/ingestion/source/snowflake/snowflake_report.py 99.09% <100.00%> (+0.04%) ⬆️
...datahub/ingestion/source/snowflake/snowflake_v2.py 89.12% <100.00%> (+0.39%) ⬆️
...ahub/ingestion/source/snowflake/snowflake_utils.py 89.20% <80.00%> (+0.15%) ⬆️
...hub/ingestion/source/snowflake/snowflake_schema.py 88.85% <95.45%> (+1.07%) ⬆️
...ub/ingestion/source/snowflake/snowflake_queries.py 43.23% <0.00%> (-0.83%) ⬇️
...ingestion/source/snowflake/snowflake_schema_gen.py 84.61% <90.00%> (+2.52%) ⬆️

... and 44 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update aff3fae...ade6503. Read the comment docs.

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 14, 2025
- moved stream lineage from snowflake_v2.py & snowflake_lineage_v2.py to snowflake_schema_gen.py
- updated snowflake_schema_gen.py to use snowflake_utils.py
@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 14, 2025
- changed from clone table logic to manually mapping columns since metadata columns were attempting to be mapped from the stream source
Copy link
Collaborator

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple quick comments from my skim over this

@mayurinehate should be back monday and can do the final reviews + merge this

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion.",
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is redundant, since it inherits from SnowflakeFilterConfig

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the code

custom_properties["BASE_TABLES"] = table.base_tables

if table.stale_after:
custom_properties["STALE_AFTER"] = table.stale_after.isoformat()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be easier to do something like this - and avoid all the if statements

custom_properties = {
	k: v
	for k, v in {
		"TABLE_NAME": table.table_name,
		...
	}
	if v
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaned up code

"""
Populate Streams upstream tables excluding the metadata columns
"""
if self.aggregator:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would the aggregator be null?

Copy link
Author

@brock-acryl brock-acryl Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, lint throws the error:
src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py:1475: error: Item "None" of "SqlParsingAggregator | None" has no attribute "add_known_query_lineage"

@@ -58,6 +59,7 @@
SnowflakeIdentifierBuilder,
SnowflakeStructuredReportMixin,
SnowsightUrlBuilder,
_split_qualified_name,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just make this method "public" by removing the _ prefix?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this public

obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
)

# If a stream is used, default to query parsing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment as to why this was required - as to the fact that snowflake objects_modified does not include correct stream references however direct_objects_accessed does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in code

# If a stream is used, default to query parsing.
if has_stream_objects:
logger.debug("Found matching stream object")
self.aggregator.add_observed_query(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you modify this to yield ObservedQuery and update signature of _parse_audit_log_row that it can return Optional[Union[TableRename, TableSwap, PreparsedQuery, ObservedQuery]] and any other required typing changes for this to work.

It would mean that we would add to aggregator only at one place and it would be easier to debug audit log.

@@ -64,6 +64,7 @@ class SnowflakeReport(SQLSourceReport, BaseTimeWindowReport):
num_table_to_view_edges_scanned: int = 0
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
num_stream_edges_scanned: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, removed

@@ -112,6 +114,7 @@ class SnowflakeV2Report(
table_lineage_query_secs: float = -1
external_lineage_queries_secs: float = -1
num_tables_with_known_upstreams: int = 0
num_streams_with_known_upstreams: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, removed

name: str
created: datetime
owner: str
comment: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is comment always present ? Can this be None ? If so, better to mark with type Optional[str]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made Optional[str]

source_db, source_schema, source_name = source_parts

# Get columns from source object
source_columns = self.get_columns_for_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly concerned about this call which would trigger fetch of columns of entire database containing this table but need to think through any better alternatives with less duplicates.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When calling columns_for_schema, a list is passed containing the table name. Shouldn't this then filter the tables?

if column_lineage:
self.aggregator.add_known_query_lineage(
known_query_lineage=KnownQueryLineageInfo(
query_id=f"stream_lineage_{stream.name}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is stream_name fully qualified ? @hsheth2 is there better way for adding this to aggregator ? Using add_known_lineage_mapping will generate different query id every time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brock-acryl is there no way to get stream definition query ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams do not have a definition like views and tables, they only have the output of show streams which is the same as describe stream.

Comment on lines 555 to 560
# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
# but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors,
# it should be pretty straightforward to refactor this and only initialize the aggregator once.
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
queries_extractor.close()
Copy link
Collaborator

@mayurinehate mayurinehate Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is change in indentation by accident ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


pipeline = Pipeline(snowflake_pipeline_config)
pipeline.run()
assert "permission-error" in [
assert [] == [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts why this got removed ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snowflake show commands return empty lists and not permission errors. Without this change, the test fails.

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 21, 2025
- added comments
- removed unused num_stream reports
- made SnowflakeStream comments optional
- defined tables and view argument datatypes
- updated allowed pattern
- fixed indentation
@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants