Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/unity): add row count in table profile of delta tables #12480

Merged
merged 8 commits into from
Feb 4, 2025
Original file line number Diff line number Diff line change
@@ -115,26 +115,30 @@ class GEProfilingConfig(GEProfilingBaseConfig):
)
max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field(
default=None,
description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
description="A positive integer that specifies the maximum number of columns to profile for "
"any table. `None` implies all columns. The cost of profiling goes up significantly as the "
"number of columns to profile goes up.",
)

profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
default=None,
description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.",
description="Profile table only if it has been updated since these many number of days. "
"If set to `null`, no constraint of last modified time for tables to profile. "
"Supported only in `snowflake` and `BigQuery`.",
)

profile_table_size_limit: Optional[int] = Field(
default=5,
description="Profile tables only if their size is less than specified GBs. If set to `null`, "
"no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`"
"Supported for `oracle` based on calculated size from gathered stats.",
"no limit on the size of tables to profile. Supported only in `Snowflake`, `BigQuery` and "
"`Databricks`. Supported for `Oracle` based on calculated size from gathered stats.",
)

profile_table_row_limit: Optional[int] = Field(
default=5000000,
description="Profile tables only if their row count is less than specified count. If set to `null`, "
"no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`"
"Supported for `oracle` based on gathered stats.",
description="Profile tables only if their row count is less than specified count. "
"If set to `null`, no limit on the row count of tables to profile. Supported only in "
"`Snowflake`, `BigQuery`. Supported for `Oracle` based on gathered stats.",
)

profile_table_row_count_estimate_only: bool = Field(
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
from dataclasses import dataclass, field
from typing import Iterable, List, Optional

from databricks.sdk.service.catalog import DataSourceFormat
from sqlalchemy import create_engine
from sqlalchemy.engine import Connection

@@ -34,6 +35,11 @@
self.size_in_bytes = None
self.rows_count = None
self.ddl = None
self.data_source_format = table.data_source_format

Check warning on line 38 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L38

Added line #L38 was not covered by tests

@property
def is_delta_table(self) -> bool:
return self.data_source_format == DataSourceFormat.DELTA

Check warning on line 42 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L42

Added line #L42 was not covered by tests


class UnityCatalogGEProfiler(GenericProfiler):
@@ -110,13 +116,20 @@
profile_table_level_only = self.profiling_config.profile_table_level_only

dataset_name = table.ref.qualified_table_name
try:
table.size_in_bytes = _get_dataset_size_in_bytes(table, conn)
except Exception as e:
logger.warning(f"Failed to get table size for {dataset_name}: {e}")
if table.is_delta_table:
try:
table.size_in_bytes = _get_dataset_size_in_bytes(table, conn)
except Exception as e:
self.report.warning(

Check warning on line 123 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L119-L123

Added lines #L119 - L123 were not covered by tests
title="Incomplete Dataset Profile",
message="Failed to get table size",
context=dataset_name,
exc=e,
)

if table.size_in_bytes is None:
self.report.num_profile_missing_size_in_bytes += 1

if not self.is_dataset_eligible_for_profiling(
dataset_name,
size_in_bytes=table.size_in_bytes,
@@ -143,6 +156,23 @@
self.report.report_dropped(dataset_name)
return None

if profile_table_level_only and table.is_delta_table:

Check warning on line 159 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L159

Added line #L159 was not covered by tests
# For requests with profile_table_level_only set, dataset profile is generated
# by looking at table.rows_count. For delta tables (a typical databricks table)
# count(*) is an efficient query to compute row count.
try:
table.rows_count = _get_dataset_row_count(table, conn)
except Exception as e:
self.report.warning(

Check warning on line 166 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L163-L166

Added lines #L163 - L166 were not covered by tests
title="Incomplete Dataset Profile",
message="Failed to get table row count",
context=dataset_name,
exc=e,
)

if table.rows_count is None:
self.report.num_profile_missing_row_count += 1

Check warning on line 174 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L173-L174

Added lines #L173 - L174 were not covered by tests

self.report.report_entity_profiled(dataset_name)
logger.debug(f"Preparing profiling request for {dataset_name}")
return TableProfilerRequest(
@@ -160,6 +190,9 @@
conn.dialect.identifier_preparer.quote(c)
for c in [table.ref.catalog, table.ref.schema, table.ref.table]
)
# This query only works for delta table.
# Ref: https://docs.databricks.com/en/delta/table-details.html
# Note: Any change here should also update _get_dataset_row_count
row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone()
if row is None:
return None
@@ -168,3 +201,21 @@
return int(row._asdict()["sizeInBytes"])
except Exception:
return None


def _get_dataset_row_count(
table: UnityCatalogSQLGenericTable, conn: Connection
) -> Optional[int]:
name = ".".join(

Check warning on line 209 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L209

Added line #L209 was not covered by tests
conn.dialect.identifier_preparer.quote(c)
for c in [table.ref.catalog, table.ref.schema, table.ref.table]
)
# This query only works efficiently for delta table
row = conn.execute(f"select count(*) as numRows from {name}").fetchone()
if row is None:
return None

Check warning on line 216 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L214-L216

Added lines #L214 - L216 were not covered by tests
else:
try:
return int(row._asdict()["numRows"])
except Exception:
return None

Check warning on line 221 in metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py#L218-L221

Added lines #L218 - L221 were not covered by tests
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ class UnityCatalogReport(IngestionStageReport, SQLSourceReport):
default_factory=LossyDict
)
num_profile_missing_size_in_bytes: int = 0
num_profile_missing_row_count: int = 0
num_profile_failed_unsupported_column_type: int = 0
num_profile_failed_int_casts: int = 0