Skip to content

Commit

Permalink
sqla Ignore other schema and missing referenced tables
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Oct 10, 2024
1 parent 22f70c8 commit 195f88a
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 8 deletions.
34 changes: 26 additions & 8 deletions dlt/sources/sql_database/schema_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
Callable,
Union,
TypedDict,
Dict,
)
from typing_extensions import TypeAlias
from dlt.common.libs.sql_alchemy import Table, Column, Row, sqltypes, Select, TypeEngine

from sqlalchemy.exc import NoReferencedTableError


from dlt.common.libs.sql_alchemy import Table, Column, Row, sqltypes, Select, TypeEngine
from dlt.common import logger
from dlt.common.schema.typing import TColumnSchema, TTableSchemaColumns, TTableReference

Expand Down Expand Up @@ -177,21 +180,36 @@ def get_table_references(
"""Resolve table references from SQLAlchemy foreign key constraints in the table"""
if reflection_level == "minimal":
return None
result: List[TTableReference] = []
ref_tables: Dict[str, TTableReference] = {}
for fk_constraint in table.foreign_key_constraints:
referenced_table = fk_constraint.referred_table.name
try:
referenced_table = fk_constraint.referred_table.name
except NoReferencedTableError as e:
logger.warning(
"Foreign key constraint from table %s could not be resolved to a referenced table, error message: %s",
table.name,
e,
)
continue

if fk_constraint.referred_table.schema != table.schema:
continue

elements = fk_constraint.elements
referenced_columns = [element.column.name for element in elements]
columns = [col.name for col in fk_constraint.columns]

result.append(
{
if referenced_table in ref_tables:
# Merge multiple foreign keys to the same table
existing_ref = ref_tables[referenced_table]
existing_ref["columns"].extend(columns) # type: ignore[attr-defined]
existing_ref["referenced_columns"].extend(referenced_columns) # type: ignore[attr-defined]
else:
ref_tables[referenced_table] = {
"referenced_table": referenced_table,
"referenced_columns": referenced_columns,
"columns": columns,
}
)
return result
return list(ref_tables.values())


def table_to_resource_hints(
Expand Down
118 changes: 118 additions & 0 deletions tests/sources/sql_database/test_schema_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import sqlalchemy as sa

from dlt.sources.sql_database.schema_types import get_table_references


def test_get_table_references() -> None:
# Test converting foreign keys to reference hints
metadata = sa.MetaData()

parent = sa.Table(
"parent",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
)

child = sa.Table(
"child",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("parent_id", sa.Integer, sa.ForeignKey("parent.id")),
)

refs = get_table_references(parent)
assert refs == []

refs = get_table_references(child)
assert refs == [
{
"columns": ["parent_id"],
"referenced_table": "parent",
"referenced_columns": ["id"],
}
]

# When referred table has not been reflected the reference is not resolved
metadata = sa.MetaData()
child = child.tometadata(metadata)

refs = get_table_references(child)

# Refs are not resolved
assert refs == []

# Multiple fks to the same table are merged into one reference
metadata = sa.MetaData()

parent = sa.Table(
"parent",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("country", sa.String),
sa.UniqueConstraint("id", "country"),
)
parent_2 = sa.Table( # noqa: F841
"parent_2",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
)
child = sa.Table(
"child",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("country", sa.String),
sa.Column("parent_id", sa.Integer, sa.ForeignKey("parent.id")),
sa.Column("parent_country", sa.String, sa.ForeignKey("parent.country")),
sa.Column("parent_2_id", sa.Integer, sa.ForeignKey("parent_2.id")),
)
refs = get_table_references(child)
refs = sorted(refs, key=lambda x: x["referenced_table"])
assert refs[0]["referenced_table"] == "parent"
# Sqla aonstraints are not in fixed order
assert set(refs[0]["columns"]) == {"parent_id", "parent_country"}
assert set(refs[0]["referenced_columns"]) == {"id", "country"}
# Ensure columns and referenced columns are the same order
col_mapping = {
col: ref_col for col, ref_col in zip(refs[0]["columns"], refs[0]["referenced_columns"])
}
expected_col_mapping = {"parent_id": "id", "parent_country": "country"}
assert col_mapping == expected_col_mapping

assert refs[1] == {
"columns": ["parent_2_id"],
"referenced_table": "parent_2",
"referenced_columns": ["id"],
}

# Compsite foreign keys give one reference
metadata = sa.MetaData()
parent.to_metadata(metadata)
child = sa.Table(
"child",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("parent_id", sa.Integer),
sa.Column("parent_country", sa.String),
sa.ForeignKeyConstraint(["parent_id", "parent_country"], ["parent.id", "parent.country"]),
)

refs = get_table_references(child)
assert refs[0]["referenced_table"] == "parent"
col_mapping = {
col: ref_col for col, ref_col in zip(refs[0]["columns"], refs[0]["referenced_columns"])
}
expected_col_mapping = {"parent_id": "id", "parent_country": "country"}
assert col_mapping == expected_col_mapping

# Foreign key to different schema is not resolved
metadata = sa.MetaData()
parent = parent.tometadata(metadata, schema="first_schema")
child = sa.Table(
"child",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("parent_id", sa.Integer, sa.ForeignKey("first_schema.parent.id")),
)

refs = get_table_references(child)
assert refs == []

0 comments on commit 195f88a

Please sign in to comment.