-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318
base: master
Are you sure you want to change the base?
Changes from 11 commits
a268e3e
b7dfa7c
30f2e53
e0b3c3b
f0124e2
2ff19ba
fd87ca9
11c6b8c
b06d0bd
2968f6c
5f25f3c
18770a9
f726f38
59c21c7
0d95fcd
6ad3f70
c20aa2d
67b8212
af9d421
c1f0be8
b97724a
07cf0bd
623ecb5
ade6503
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,7 @@ | |
from datahub.sql_parsing.schema_resolver import SchemaResolver | ||
from datahub.sql_parsing.sql_parsing_aggregator import ( | ||
KnownLineageMapping, | ||
ObservedQuery, | ||
PreparsedQuery, | ||
SqlAggregatorReport, | ||
SqlParsingAggregator, | ||
|
@@ -398,6 +399,36 @@ | |
pass | ||
else: | ||
return None | ||
|
||
user = CorpUserUrn( | ||
self.identifiers.get_user_identifier( | ||
res["user_name"], users.get(res["user_name"]) | ||
) | ||
) | ||
|
||
# Check if any of the accessed objects are streams | ||
has_stream_objects = any( | ||
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed | ||
) | ||
|
||
# If a stream is used, default to query parsing. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment as to why this was required - as to the fact that snowflake objects_modified does not include correct stream references however direct_objects_accessed does. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment in code |
||
if has_stream_objects: | ||
logger.debug("Found matching stream object") | ||
self.aggregator.add_observed_query( | ||
Check warning on line 417 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py Codecov / codecov/patchmetadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L415-L417
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you modify this to yield ObservedQuery and update signature of It would mean that we would add to aggregator only at one place and it would be easier to debug audit log. |
||
observed=ObservedQuery( | ||
query=res["query_text"], | ||
session_id=res["session_id"], | ||
timestamp=res["query_start_time"].astimezone(timezone.utc), | ||
user=user, | ||
default_db=res["default_db"], | ||
default_schema=res["default_schema"], | ||
query_hash=get_query_fingerprint( | ||
res["query_text"], self.identifiers.platform, fast=True | ||
), | ||
), | ||
) | ||
return None | ||
|
||
upstreams = [] | ||
column_usage = {} | ||
|
||
|
@@ -460,12 +491,6 @@ | |
) | ||
) | ||
|
||
user = CorpUserUrn( | ||
self.identifiers.get_user_identifier( | ||
res["user_name"], users.get(res["user_name"]) | ||
) | ||
) | ||
|
||
brock-acryl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
timestamp: datetime = res["query_start_time"] | ||
timestamp = timestamp.astimezone(timezone.utc) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,7 @@ class SnowflakeReport(SQLSourceReport, BaseTimeWindowReport): | |
num_table_to_view_edges_scanned: int = 0 | ||
num_view_to_table_edges_scanned: int = 0 | ||
num_external_table_edges_scanned: int = 0 | ||
num_stream_edges_scanned: int = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this used anywhere ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, removed |
||
ignore_start_time_lineage: Optional[bool] = None | ||
upstream_lineage_in_report: Optional[bool] = None | ||
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict) | ||
|
@@ -103,6 +104,7 @@ class SnowflakeV2Report( | |
schemas_scanned: int = 0 | ||
databases_scanned: int = 0 | ||
tags_scanned: int = 0 | ||
streams_scanned: int = 0 | ||
|
||
include_usage_stats: bool = False | ||
include_operational_stats: bool = False | ||
|
@@ -112,6 +114,7 @@ class SnowflakeV2Report( | |
table_lineage_query_secs: float = -1 | ||
external_lineage_queries_secs: float = -1 | ||
num_tables_with_known_upstreams: int = 0 | ||
num_streams_with_known_upstreams: int = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this used anywhere ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, removed |
||
num_upstream_lineage_edge_parsing_failed: int = 0 | ||
num_secure_views_missing_definition: int = 0 | ||
|
||
|
@@ -129,6 +132,8 @@ class SnowflakeV2Report( | |
num_get_tags_for_object_queries: int = 0 | ||
num_get_tags_on_columns_for_table_queries: int = 0 | ||
|
||
num_get_streams_for_schema_queries: int = 0 | ||
|
||
rows_zero_objects_modified: int = 0 | ||
|
||
_processed_tags: MutableSet[str] = field(default_factory=set) | ||
|
@@ -155,6 +160,8 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: | |
return | ||
self._scanned_tags.add(name) | ||
self.tags_scanned += 1 | ||
elif ent_type == "stream": | ||
self.streams_scanned += 1 | ||
else: | ||
raise KeyError(f"Unknown entity {ent_type}.") | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -118,6 +118,7 @@ | |||||
comment: Optional[str] | ||||||
tables: List[str] = field(default_factory=list) | ||||||
views: List[str] = field(default_factory=list) | ||||||
streams: List[str] = field(default_factory=list) | ||||||
tags: Optional[List[SnowflakeTag]] = None | ||||||
|
||||||
|
||||||
|
@@ -131,6 +132,29 @@ | |||||
tags: Optional[List[SnowflakeTag]] = None | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class SnowflakeStream: | ||||||
name: str | ||||||
created: datetime | ||||||
owner: str | ||||||
comment: str | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is comment always present ? Can this be None ? If so, better to mark with type Optional[str] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made Optional[str] |
||||||
source_type: str | ||||||
type: str | ||||||
stale: str | ||||||
mode: str | ||||||
invalid_reason: str | ||||||
owner_role_type: str | ||||||
database_name: str | ||||||
schema_name: str | ||||||
table_name: str | ||||||
columns: List[SnowflakeColumn] = field(default_factory=list) | ||||||
stale_after: Optional[datetime] = None | ||||||
base_tables: Optional[str] = None | ||||||
tags: Optional[List[SnowflakeTag]] = None | ||||||
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict) | ||||||
last_altered: Optional[datetime] = None | ||||||
|
||||||
|
||||||
class _SnowflakeTagCache: | ||||||
def __init__(self) -> None: | ||||||
# self._database_tags[<database_name>] = list of tags applied to database | ||||||
|
@@ -208,6 +232,7 @@ | |||||
self.get_tables_for_database, | ||||||
self.get_views_for_database, | ||||||
self.get_columns_for_schema, | ||||||
self.get_streams_for_database, | ||||||
self.get_pk_constraints_for_schema, | ||||||
self.get_fk_constraints_for_schema, | ||||||
] | ||||||
|
@@ -594,3 +619,63 @@ | |||||
tags[column_name].append(snowflake_tag) | ||||||
|
||||||
return tags | ||||||
|
||||||
@serialized_lru_cache(maxsize=1) | ||||||
def get_streams_for_database( | ||||||
self, db_name: str | ||||||
) -> Dict[str, List[SnowflakeStream]]: | ||||||
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE | ||||||
|
||||||
streams: Dict[str, List[SnowflakeStream]] = {} | ||||||
|
||||||
first_iteration = True | ||||||
stream_pagination_marker: Optional[str] = None | ||||||
while first_iteration or stream_pagination_marker is not None: | ||||||
cur = self.connection.query( | ||||||
SnowflakeQuery.streams_for_database( | ||||||
db_name, | ||||||
limit=page_limit, | ||||||
stream_pagination_marker=stream_pagination_marker, | ||||||
) | ||||||
) | ||||||
|
||||||
first_iteration = False | ||||||
stream_pagination_marker = None | ||||||
|
||||||
result_set_size = 0 | ||||||
for stream in cur: | ||||||
result_set_size += 1 | ||||||
|
||||||
stream_name = stream["name"] | ||||||
schema_name = stream["schema_name"] | ||||||
if schema_name not in streams: | ||||||
streams[schema_name] = [] | ||||||
streams[stream["schema_name"]].append( | ||||||
SnowflakeStream( | ||||||
name=stream["name"], | ||||||
created=stream["created_on"], | ||||||
owner=stream["owner"], | ||||||
comment=stream["comment"], | ||||||
source_type=stream["source_type"], | ||||||
type=stream["type"], | ||||||
stale=stream["stale"], | ||||||
mode=stream["mode"], | ||||||
database_name=stream["database_name"], | ||||||
schema_name=stream["schema_name"], | ||||||
invalid_reason=stream["invalid_reason"], | ||||||
owner_role_type=stream["owner_role_type"], | ||||||
stale_after=stream["stale_after"], | ||||||
table_name=stream["table_name"], | ||||||
base_tables=stream["base_tables"], | ||||||
last_altered=stream["created_on"], | ||||||
) | ||||||
) | ||||||
|
||||||
if result_set_size >= page_limit: | ||||||
# If we hit the limit, we need to send another request to get the next page. | ||||||
logger.info( | ||||||
f"Fetching next page of views for {db_name} - after {stream_name}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) | ||||||
stream_pagination_marker = stream_name | ||||||
|
||||||
return streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is redundant, since it inherits from
SnowflakeFilterConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the code