diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 93142a347ca0e6..9c251c040bed13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -115,26 +115,30 @@ class GEProfilingConfig(GEProfilingBaseConfig): ) max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field( default=None, - description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.", + description="A positive integer that specifies the maximum number of columns to profile for " + "any table. `None` implies all columns. The cost of profiling goes up significantly as the " + "number of columns to profile goes up.", ) profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field( default=None, - description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.", + description="Profile table only if it has been updated since these many number of days. " + "If set to `null`, no constraint of last modified time for tables to profile. " + "Supported only in `snowflake` and `BigQuery`.", ) profile_table_size_limit: Optional[int] = Field( default=5, description="Profile tables only if their size is less than specified GBs. If set to `null`, " - "no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`" - "Supported for `oracle` based on calculated size from gathered stats.", + "no limit on the size of tables to profile. Supported only in `Snowflake`, `BigQuery` and " + "`Databricks`. Supported for `Oracle` based on calculated size from gathered stats.", ) profile_table_row_limit: Optional[int] = Field( default=5000000, - description="Profile tables only if their row count is less than specified count. If set to `null`, " - "no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`" - "Supported for `oracle` based on gathered stats.", + description="Profile tables only if their row count is less than specified count. " + "If set to `null`, no limit on the row count of tables to profile. Supported only in " + "`Snowflake`, `BigQuery`. Supported for `Oracle` based on gathered stats.", ) profile_table_row_count_estimate_only: bool = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py index e24ca8330777ed..a8658d476b87b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py @@ -3,6 +3,7 @@ from dataclasses import dataclass, field from typing import Iterable, List, Optional +from databricks.sdk.service.catalog import DataSourceFormat from sqlalchemy import create_engine from sqlalchemy.engine import Connection @@ -34,6 +35,11 @@ def __init__(self, table: Table): self.size_in_bytes = None self.rows_count = None self.ddl = None + self.data_source_format = table.data_source_format + + @property + def is_delta_table(self) -> bool: + return self.data_source_format == DataSourceFormat.DELTA class UnityCatalogGEProfiler(GenericProfiler): @@ -110,13 +116,20 @@ def get_unity_profile_request( profile_table_level_only = self.profiling_config.profile_table_level_only dataset_name = table.ref.qualified_table_name - try: - table.size_in_bytes = _get_dataset_size_in_bytes(table, conn) - except Exception as e: - logger.warning(f"Failed to get table size for {dataset_name}: {e}") + if table.is_delta_table: + try: + table.size_in_bytes = _get_dataset_size_in_bytes(table, conn) + except Exception as e: + self.report.warning( + title="Incomplete Dataset Profile", + message="Failed to get table size", + context=dataset_name, + exc=e, + ) if table.size_in_bytes is None: self.report.num_profile_missing_size_in_bytes += 1 + if not self.is_dataset_eligible_for_profiling( dataset_name, size_in_bytes=table.size_in_bytes, @@ -143,6 +156,23 @@ def get_unity_profile_request( self.report.report_dropped(dataset_name) return None + if profile_table_level_only and table.is_delta_table: + # For requests with profile_table_level_only set, dataset profile is generated + # by looking at table.rows_count. For delta tables (a typical databricks table) + # count(*) is an efficient query to compute row count. + try: + table.rows_count = _get_dataset_row_count(table, conn) + except Exception as e: + self.report.warning( + title="Incomplete Dataset Profile", + message="Failed to get table row count", + context=dataset_name, + exc=e, + ) + + if table.rows_count is None: + self.report.num_profile_missing_row_count += 1 + self.report.report_entity_profiled(dataset_name) logger.debug(f"Preparing profiling request for {dataset_name}") return TableProfilerRequest( @@ -160,6 +190,9 @@ def _get_dataset_size_in_bytes( conn.dialect.identifier_preparer.quote(c) for c in [table.ref.catalog, table.ref.schema, table.ref.table] ) + # This query only works for delta table. + # Ref: https://docs.databricks.com/en/delta/table-details.html + # Note: Any change here should also update _get_dataset_row_count row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone() if row is None: return None @@ -168,3 +201,21 @@ def _get_dataset_size_in_bytes( return int(row._asdict()["sizeInBytes"]) except Exception: return None + + +def _get_dataset_row_count( + table: UnityCatalogSQLGenericTable, conn: Connection +) -> Optional[int]: + name = ".".join( + conn.dialect.identifier_preparer.quote(c) + for c in [table.ref.catalog, table.ref.schema, table.ref.table] + ) + # This query only works efficiently for delta table + row = conn.execute(f"select count(*) as numRows from {name}").fetchone() + if row is None: + return None + else: + try: + return int(row._asdict()["numRows"]) + except Exception: + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index f16769341853a1..2288514fb82388 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -52,6 +52,7 @@ class UnityCatalogReport(IngestionStageReport, SQLSourceReport): default_factory=LossyDict ) num_profile_missing_size_in_bytes: int = 0 + num_profile_missing_row_count: int = 0 num_profile_failed_unsupported_column_type: int = 0 num_profile_failed_int_casts: int = 0