Skip to content

Commit

Permalink
Merge branch 'master' into feature/ing-705
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 6, 2024
2 parents 75bdd7f + 596de89 commit 126de6f
Show file tree
Hide file tree
Showing 10 changed files with 7,591 additions and 364 deletions.
24 changes: 19 additions & 5 deletions datahub-web-react/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4510,7 +4510,14 @@ brace-expansion@^1.1.7:
balanced-match "^1.0.0"
concat-map "0.0.1"

braces@^3.0.2, braces@~3.0.2:
braces@^3.0.3:
version "3.0.3"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789"
integrity sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==
dependencies:
fill-range "^7.1.1"

braces@~3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.2.tgz#3454e1a462ee8d599e236df336cd9ea4f8afe107"
integrity sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==
Expand Down Expand Up @@ -6052,6 +6059,13 @@ fill-range@^7.0.1:
dependencies:
to-regex-range "^5.0.1"

fill-range@^7.1.1:
version "7.1.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.1.1.tgz#44265d3cac07e3ea7dc247516380643754a05292"
integrity sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==
dependencies:
to-regex-range "^5.0.1"

filter-obj@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/filter-obj/-/filter-obj-1.1.0.tgz#9b311112bc6c6127a16e016c6c5d7f19e0805c5b"
Expand Down Expand Up @@ -7654,11 +7668,11 @@ micromark@^2.11.3, micromark@~2.11.0, micromark@~2.11.3:
parse-entities "^2.0.0"

micromatch@^4.0.4, micromatch@^4.0.5:
version "4.0.5"
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.5.tgz#bc8999a7cbbf77cdc89f132f6e467051b49090c6"
integrity sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==
version "4.0.8"
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.8.tgz#d66fa18f3a47076789320b9b1af32bd86d9fa202"
integrity sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==
dependencies:
braces "^3.0.2"
braces "^3.0.3"
picomatch "^2.3.1"

[email protected]:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ disallow_untyped_defs = yes

[tool:pytest]
asyncio_mode = auto
addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers
addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:faker
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
include_query_usage_statistics: bool = False
include_query_usage_statistics: bool = True
include_operations: bool = True

region_qualifiers: List[str] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pydantic
from typing_extensions import Self

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
BucketDuration,
Expand Down Expand Up @@ -67,8 +67,16 @@ class SnowflakeQueriesExtractorConfig(ConfigModel):
# TODO: Support stateful ingestion for the time windows.
window: BaseTimeWindowConfig = BaseTimeWindowConfig()

# TODO: make this a proper allow/deny pattern
deny_usernames: List[str] = []
pushdown_deny_usernames: List[str] = pydantic.Field(
default=[],
description="List of snowflake usernames which will not be considered for lineage/usage/queries extraction. "
"This is primarily useful for improving performance by filtering out users with extremely high query volumes.",
)

user_email_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for user emails to filter in usage.",
)

temporary_tables_pattern: List[str] = pydantic.Field(
default=DEFAULT_TEMP_TABLES_PATTERNS,
Expand All @@ -88,7 +96,7 @@ class SnowflakeQueriesExtractorConfig(ConfigModel):
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
include_query_usage_statistics: bool = False
include_query_usage_statistics: bool = True
include_operations: bool = True


Expand Down Expand Up @@ -150,6 +158,7 @@ def __init__(
bucket_duration=self.config.window.bucket_duration,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
user_email_pattern=self.config.user_email_pattern,
# TODO make the rest of the fields configurable
),
generate_operations=self.config.include_operations,
Expand Down Expand Up @@ -281,7 +290,7 @@ def fetch_query_log(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
bucket_duration=self.config.window.bucket_duration,
deny_usernames=self.config.deny_usernames,
deny_usernames=self.config.pushdown_deny_usernames,
)

with self.structured_reporter.report_exc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_stats,
include_operations=self.config.include_operational_stats,
user_email_pattern=self.config.user_email_pattern,
),
structured_report=self.report,
filters=self.filters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class SqlAggregatorReport(Report):
# Usage-related.
usage_skipped_missing_timestamp: int = 0
num_query_usage_stats_generated: int = 0
num_query_usage_stats_outside_window: int = 0

# Operation-related.
num_operations_generated: int = 0
Expand Down Expand Up @@ -452,6 +453,7 @@ def _need_schemas(self) -> bool:
or self.generate_usage_statistics
or self.generate_queries
or self.generate_operations
or self.generate_query_usage_statistics
)

def register_schema(
Expand Down Expand Up @@ -1042,9 +1044,9 @@ def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]:
queries_generated: Set[QueryId] = set()

yield from self._gen_lineage_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)
yield from self._gen_usage_statistics_mcps()
yield from self._gen_operation_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)

def _gen_lineage_mcps(
self, queries_generated: Set[QueryId]
Expand Down Expand Up @@ -1331,9 +1333,15 @@ def _gen_query(
query_counter = self._query_usage_counts.get(query_id)
if not query_counter:
return
for bucket in self.usage_config.buckets():
count = query_counter.get(bucket)
if not count:

all_buckets = self.usage_config.buckets()

for bucket, count in query_counter.items():
if bucket not in all_buckets:
# What happens if we get a query with a timestamp that's outside our configured window?
# Theoretically this should never happen, since the audit logs are also fetched
# for the window. However, it's useful to have reporting for it, just in case.
self.report.num_query_usage_stats_outside_window += 1
continue

yield MetadataChangeProposalWrapper(
Expand Down
Loading

0 comments on commit 126de6f

Please sign in to comment.