Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/clickhouse-usage): add sql aggregator to clickhouse usage #12526

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import SourceReport

Check warning on line 24 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L24

Added line #L24 was not covered by tests
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.clickhouse import ClickHouseConfig
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.two_tier_sql_source import TwoTierSQLAlchemySource

Check warning on line 28 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L27-L28

Added lines #L27 - L28 were not covered by tests
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery

Check warning on line 33 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L33

Added line #L33 was not covered by tests

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -84,7 +87,7 @@
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@dataclasses.dataclass
class ClickHouseUsageSource(Source):
class ClickHouseUsageSource(TwoTierSQLAlchemySource):

Check warning on line 90 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L90

Added line #L90 was not covered by tests
"""
This plugin has the below functionalities -
1. For a specific dataset this plugin ingests the following statistics -
Expand All @@ -104,7 +107,7 @@
"""

config: ClickHouseUsageConfig
report: SourceReport = dataclasses.field(default_factory=SourceReport)
report: SQLSourceReport = dataclasses.field(default_factory=SQLSourceReport)

Check warning on line 110 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L110

Added line #L110 was not covered by tests

@classmethod
def create(cls, config_dict, ctx):
Expand All @@ -118,13 +121,27 @@
if not access_events:
return

for event in access_events:
self.aggregator.add_observed_query(

Check warning on line 125 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L124-L125

Added lines #L124 - L125 were not covered by tests
observed=ObservedQuery(
default_db=event.database,
default_schema=None,
query=event.query,
)
)

joined_access_event = self._get_joined_access_event(access_events)
aggregated_info = self._aggregate_access_events(joined_access_event)

for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
yield self._make_usage_stat(aggregate)

for mcp in self.aggregator.gen_metadata():
wu = mcp.as_workunit()
self.report.report_workunit(wu)
yield wu

Check warning on line 143 in metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py#L140-L143

Added lines #L140 - L143 were not covered by tests

def _make_usage_query(self) -> str:
return clickhouse_usage_sql_comment.format(
query_log_table=self.config.query_log_table,
Expand Down
Loading