Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a268e3e
merged changes from master
brock-acryl Jan 13, 2025
b7dfa7c
- moved stream_pattern from sql_config.py to snowflake_config.py
brock-acryl Jan 14, 2025
30f2e53
added streams to docs
brock-acryl Jan 14, 2025
e0b3c3b
- removed unused method
brock-acryl Jan 15, 2025
f0124e2
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 15, 2025
2ff19ba
merge changes
brock-acryl Jan 15, 2025
fd87ca9
lintfix
brock-acryl Jan 16, 2025
11c6b8c
Refactored _process_schema into smaller functions
brock-acryl Jan 16, 2025
b06d0bd
fixed streams_for_database
brock-acryl Jan 16, 2025
2968f6c
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 16, 2025
5f25f3c
updated pytests and golden files
brock-acryl Jan 16, 2025
18770a9
lintfix
brock-acryl Jan 17, 2025
f726f38
code review updates.
brock-acryl Jan 17, 2025
59c21c7
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 17, 2025
0d95fcd
lint
brock-acryl Jan 17, 2025
6ad3f70
lint
brock-acryl Jan 17, 2025
c20aa2d
updated tests
brock-acryl Jan 17, 2025
67b8212
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 17, 2025
af9d421
updated tests
brock-acryl Jan 17, 2025
c1f0be8
updated reporting
brock-acryl Jan 18, 2025
b97724a
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 18, 2025
07cf0bd
Merge branch 'snowflake-streams-v2' of github.com:brock-acryl/datahub…
brock-acryl Jan 18, 2025
623ecb5
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 22, 2025
ade6503
- Updated docs with required permissions
brock-acryl Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The details of each granted privilege can be viewed in [snowflake docs](https://
If the warehouse is already running during ingestion or has auto-resume enabled,
this permission is not required.
- `usage` is required for us to run queries using the warehouse
- `usage` on `database` and `schema` are required because without it tables and views inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view exists then we will not be able to get metadata for the table/view.
- `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream.
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DatasetSubTypes(StrEnum):
SAC_LIVE_DATA_MODEL = "Live Data Model"
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"

# TODO: Create separate entity...
NOTEBOOK = "Notebook"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeObjectDomain(StrEnum):
SCHEMA = "schema"
COLUMN = "column"
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"


GENERIC_PERMISSION_ERROR_KEY = "permission-error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class SnowflakeFilterConfig(SQLFilterConfig):
)
# table_pattern and view_pattern are inherited from SQLFilterConfig

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
Expand Down Expand Up @@ -268,6 +273,16 @@ class SnowflakeV2Config(
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)

include_streams: bool = Field(
default=True,
description="If enabled, streams will be ingested as separate entities from tables/views.",
)

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion.",
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is redundant, since it inherits from SnowflakeFilterConfig

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the code


structured_property_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
ObservedQuery,
PreparsedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
Expand Down Expand Up @@ -398,6 +399,36 @@
pass
else:
return None

user = CorpUserUrn(

Check warning on line 403 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L403

Added line #L403 was not covered by tests
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

# Check if any of the accessed objects are streams
has_stream_objects = any(

Check warning on line 410 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L410

Added line #L410 was not covered by tests
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
)

# If a stream is used, default to query parsing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment as to why this was required - as to the fact that snowflake objects_modified does not include correct stream references however direct_objects_accessed does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in code

if has_stream_objects:
logger.debug("Found matching stream object")
self.aggregator.add_observed_query(

Check warning on line 417 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L415-L417

Added lines #L415 - L417 were not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you modify this to yield ObservedQuery and update signature of _parse_audit_log_row that it can return Optional[Union[TableRename, TableSwap, PreparsedQuery, ObservedQuery]] and any other required typing changes for this to work.

It would mean that we would add to aggregator only at one place and it would be easier to debug audit log.

observed=ObservedQuery(
query=res["query_text"],
session_id=res["session_id"],
timestamp=res["query_start_time"].astimezone(timezone.utc),
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
res["query_text"], self.identifiers.platform, fast=True
),
),
)
return None

Check warning on line 430 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L430

Added line #L430 was not covered by tests

upstreams = []
column_usage = {}

Expand Down Expand Up @@ -460,12 +491,6 @@
)
)

user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

brock-acryl marked this conversation as resolved.
Show resolved Hide resolved
timestamp: datetime = res["query_start_time"]
timestamp = timestamp.astimezone(timezone.utc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datahub.utilities.prefix_batch_builder import PrefixGroup

SHOW_VIEWS_MAX_PAGE_SIZE = 10000
SHOW_STREAM_MAX_PAGE_SIZE = 10000


def create_deny_regex_sql_filter(
Expand Down Expand Up @@ -36,6 +37,7 @@ class SnowflakeQuery:
SnowflakeObjectDomain.VIEW.capitalize(),
SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(),
SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(),
SnowflakeObjectDomain.STREAM.capitalize(),
}

ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format(
Expand All @@ -44,7 +46,8 @@ class SnowflakeQuery:
ACCESS_HISTORY_TABLE_DOMAINS_FILTER = (
"("
f"'{SnowflakeObjectDomain.TABLE.capitalize()}',"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}'"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}',"
f"'{SnowflakeObjectDomain.STREAM.capitalize()}',"
")"
)

Expand Down Expand Up @@ -952,3 +955,19 @@ def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str:
@staticmethod
def get_all_users() -> str:
return """SELECT name as "NAME", email as "EMAIL" FROM SNOWFLAKE.ACCOUNT_USAGE.USERS"""

@staticmethod
def streams_for_database(
db_name: str,
limit: int = SHOW_STREAM_MAX_PAGE_SIZE,
stream_pagination_marker: Optional[str] = None,
) -> str:
# SHOW STREAMS can return a maximum of 10000 rows.
# https://docs.snowflake.com/en/sql-reference/sql/show-streams#usage-notes
assert limit <= SHOW_STREAM_MAX_PAGE_SIZE

# To work around this, we paginate through the results using the FROM clause.
from_clause = (
f"""FROM '{stream_pagination_marker}'""" if stream_pagination_marker else ""
)
return f"""SHOW STREAMS IN DATABASE {db_name} LIMIT {limit} {from_clause};"""
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SnowflakeReport(SQLSourceReport, BaseTimeWindowReport):
num_table_to_view_edges_scanned: int = 0
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
num_stream_edges_scanned: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, removed

ignore_start_time_lineage: Optional[bool] = None
upstream_lineage_in_report: Optional[bool] = None
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
Expand Down Expand Up @@ -103,6 +104,7 @@ class SnowflakeV2Report(
schemas_scanned: int = 0
databases_scanned: int = 0
tags_scanned: int = 0
streams_scanned: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
Expand All @@ -112,6 +114,7 @@ class SnowflakeV2Report(
table_lineage_query_secs: float = -1
external_lineage_queries_secs: float = -1
num_tables_with_known_upstreams: int = 0
num_streams_with_known_upstreams: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, removed

num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0

Expand All @@ -129,6 +132,8 @@ class SnowflakeV2Report(
num_get_tags_for_object_queries: int = 0
num_get_tags_on_columns_for_table_queries: int = 0

num_get_streams_for_schema_queries: int = 0

rows_zero_objects_modified: int = 0

_processed_tags: MutableSet[str] = field(default_factory=set)
Expand All @@ -155,6 +160,8 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
return
self._scanned_tags.add(name)
self.tags_scanned += 1
elif ent_type == "stream":
self.streams_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
comment: Optional[str]
tables: List[str] = field(default_factory=list)
views: List[str] = field(default_factory=list)
streams: List[str] = field(default_factory=list)
tags: Optional[List[SnowflakeTag]] = None


Expand All @@ -131,6 +132,29 @@
tags: Optional[List[SnowflakeTag]] = None


@dataclass
class SnowflakeStream:
name: str
created: datetime
owner: str
comment: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is comment always present ? Can this be None ? If so, better to mark with type Optional[str]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made Optional[str]

source_type: str
type: str
stale: str
mode: str
invalid_reason: str
owner_role_type: str
database_name: str
schema_name: str
table_name: str
columns: List[SnowflakeColumn] = field(default_factory=list)
stale_after: Optional[datetime] = None
base_tables: Optional[str] = None
tags: Optional[List[SnowflakeTag]] = None
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None


class _SnowflakeTagCache:
def __init__(self) -> None:
# self._database_tags[<database_name>] = list of tags applied to database
Expand Down Expand Up @@ -208,6 +232,7 @@
self.get_tables_for_database,
self.get_views_for_database,
self.get_columns_for_schema,
self.get_streams_for_database,
self.get_pk_constraints_for_schema,
self.get_fk_constraints_for_schema,
]
Expand Down Expand Up @@ -594,3 +619,63 @@
tags[column_name].append(snowflake_tag)

return tags

@serialized_lru_cache(maxsize=1)
def get_streams_for_database(
self, db_name: str
) -> Dict[str, List[SnowflakeStream]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE

streams: Dict[str, List[SnowflakeStream]] = {}

first_iteration = True
stream_pagination_marker: Optional[str] = None
while first_iteration or stream_pagination_marker is not None:
cur = self.connection.query(
SnowflakeQuery.streams_for_database(
db_name,
limit=page_limit,
stream_pagination_marker=stream_pagination_marker,
)
)

first_iteration = False
stream_pagination_marker = None

result_set_size = 0
for stream in cur:
result_set_size += 1

stream_name = stream["name"]
schema_name = stream["schema_name"]
if schema_name not in streams:
streams[schema_name] = []
streams[stream["schema_name"]].append(
SnowflakeStream(
name=stream["name"],
created=stream["created_on"],
owner=stream["owner"],
comment=stream["comment"],
source_type=stream["source_type"],
type=stream["type"],
stale=stream["stale"],
mode=stream["mode"],
database_name=stream["database_name"],
schema_name=stream["schema_name"],
invalid_reason=stream["invalid_reason"],
owner_role_type=stream["owner_role_type"],
stale_after=stream["stale_after"],
table_name=stream["table_name"],
base_tables=stream["base_tables"],
last_altered=stream["created_on"],
)
)

if result_set_size >= page_limit:
# If we hit the limit, we need to send another request to get the next page.
logger.info(

Check warning on line 676 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L676

Added line #L676 was not covered by tests
f"Fetching next page of views for {db_name} - after {stream_name}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f"Fetching next page of views for {db_name} - after {stream_name}"
f"Fetching next page of streams for {db_name} - after {stream_name}"

)
stream_pagination_marker = stream_name

Check warning on line 679 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L679

Added line #L679 was not covered by tests

return streams
Loading
Loading