Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/dbt): add support for delta lake table_changes table valued function as DBT source #12512

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/_models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import functools
import re
from typing import Any, Optional

import sqlglot
from pydantic import BaseModel

from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2
from datahub.metadata.schema_classes import SchemaFieldDataTypeClass
from datahub.sql_parsing.sql_parsing_common import DIALECTS_WITH_DELTALAKE_TVF_SUPPORT
from datahub.sql_parsing.sqlglot_utils import (
is_dialect_instance,
)


class _ParserBaseModel(
Expand Down Expand Up @@ -78,6 +83,7 @@
table: sqlglot.exp.Table,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
dialect: sqlglot.Dialect = None,

Check warning on line 86 in metadata-ingestion/src/datahub/sql_parsing/_models.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/sql_parsing/_models.py#L86

Added line #L86 was not covered by tests
) -> "_TableName":
if isinstance(table.this, sqlglot.exp.Dot):
# For tables that are more than 3 parts, the extra parts will be in a Dot.
Expand All @@ -89,6 +95,12 @@
exp = exp.expression
parts.append(exp.name)
table_name = ".".join(parts)
elif (
is_dialect_instance(dialect, DIALECTS_WITH_DELTALAKE_TVF_SUPPORT)
and table.this.name == "table_changes"
and re.match(r"TABLE_CHANGES\s*\([^)]*\)", table.this)
):
table_name = table.this.expressions[0].alias_or_name

Check warning on line 103 in metadata-ingestion/src/datahub/sql_parsing/_models.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/sql_parsing/_models.py#L103

Added line #L103 was not covered by tests
else:
table_name = table.this.name
return cls(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
# automatically lowercase unquoted identifiers.
"snowflake",
}
# To support delta lake functions
DIALECTS_WITH_DELTALAKE_TVF_SUPPORT = {"hive", "spark"}

assert DIALECTS_WITH_DEFAULT_UPPERCASE_COLS.issubset(
DIALECTS_WITH_CASE_INSENSITIVE_COLS
)
Expand Down
18 changes: 11 additions & 7 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _table_level_lineage(
# Generate table-level lineage.
modified = (
{
_TableName.from_sqlglot_table(expr.this)
_TableName.from_sqlglot_table(table=expr.this, dialect=dialect)
for expr in statement.find_all(
sqlglot.exp.Create,
sqlglot.exp.Insert,
Expand All @@ -220,7 +220,7 @@ def _table_level_lineage(
# For statements that include a column list, like
# CREATE DDL statements and `INSERT INTO table (col1, col2) SELECT ...`
# the table name is nested inside a Schema object.
_TableName.from_sqlglot_table(expr.this.this)
_TableName.from_sqlglot_table(table=expr.this.this, dialect=dialect)
for expr in statement.find_all(
sqlglot.exp.Create,
sqlglot.exp.Insert,
Expand All @@ -231,7 +231,7 @@ def _table_level_lineage(
| {
# For drop statements, we only want it if a table/view is being dropped.
# Other "kinds" will not have table.name populated.
_TableName.from_sqlglot_table(expr.this)
_TableName.from_sqlglot_table(table=expr.this, dialect=dialect)
for expr in ([statement] if isinstance(statement, sqlglot.exp.Drop) else [])
if isinstance(expr.this, sqlglot.exp.Table)
and expr.this.this
Expand All @@ -241,7 +241,7 @@ def _table_level_lineage(

tables = (
{
_TableName.from_sqlglot_table(table)
_TableName.from_sqlglot_table(table=table, dialect=dialect)
for table in statement.find_all(sqlglot.exp.Table)
if not isinstance(table.parent, sqlglot.exp.Drop)
}
Expand Down Expand Up @@ -518,7 +518,9 @@ def _select_statement_cll( # noqa: C901
# )

# Generate SELECT lineage.
direct_raw_col_upstreams = _get_direct_raw_col_upstreams(lineage_node)
direct_raw_col_upstreams = _get_direct_raw_col_upstreams(
lineage_node, dialect
)

# column_logic = lineage_node.source

Expand Down Expand Up @@ -652,7 +654,7 @@ def _column_level_lineage(


def _get_direct_raw_col_upstreams(
lineage_node: sqlglot.lineage.Node,
lineage_node: sqlglot.lineage.Node, dialect: sqlglot.Dialect
) -> Set[_ColumnRef]:
# Using a set here to deduplicate upstreams.
direct_raw_col_upstreams: Set[_ColumnRef] = set()
Expand All @@ -663,7 +665,9 @@ def _get_direct_raw_col_upstreams(
pass

elif isinstance(node.expression, sqlglot.exp.Table):
table_ref = _TableName.from_sqlglot_table(node.expression)
table_ref = _TableName.from_sqlglot_table(
table=node.expression, dialect=dialect
)

if node.name == "*":
# This will happen if we couldn't expand the * to actual columns e.g. if
Expand Down
Loading