Skip to content

Commit

Permalink
users path normalize for columns in arrow tables (#1947)
Browse files Browse the repository at this point in the history
* users path normalize for columns in arrow tables

* adds sqlglot to pipeline dev group

* improves normalization tests, improves docstrings
  • Loading branch information
rudolfix authored Oct 11, 2024
1 parent bc13e1c commit b717627
Show file tree
Hide file tree
Showing 5 changed files with 12,646 additions and 6 deletions.
13 changes: 10 additions & 3 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ def should_normalize_arrow_schema(
naming: NamingConvention,
add_load_id: bool = False,
) -> Tuple[bool, Mapping[str, str], Dict[str, str], Dict[str, bool], bool, TTableSchemaColumns]:
"""Figure out if any of the normalization steps must be executed. This prevents
from rewriting arrow tables when no changes are needed. Refer to `normalize_py_arrow_item`
for a list of normalizations. Note that `column` must be already normalized.
"""
rename_mapping = get_normalized_arrow_fields_mapping(schema, naming)
# no clashes in rename ensured above
rev_mapping = {v: k for k, v in rename_mapping.items()}
nullable_mapping = {k: is_nullable_column(v) for k, v in columns.items()}
# All fields from arrow schema that have nullable set to different value than in columns
Expand Down Expand Up @@ -301,7 +306,8 @@ def normalize_py_arrow_item(
caps: DestinationCapabilitiesContext,
load_id: Optional[str] = None,
) -> TAnyArrowItem:
"""Normalize arrow `item` schema according to the `columns`.
"""Normalize arrow `item` schema according to the `columns`. Note that
columns must be already normalized.
1. arrow schema field names will be normalized according to `naming`
2. arrows columns will be reordered according to `columns`
Expand Down Expand Up @@ -366,13 +372,14 @@ def normalize_py_arrow_item(

def get_normalized_arrow_fields_mapping(schema: pyarrow.Schema, naming: NamingConvention) -> StrStr:
"""Normalizes schema field names and returns mapping from original to normalized name. Raises on name collisions"""
norm_f = naming.normalize_identifier
# use normalize_path to be compatible with how regular columns are normalized in dlt.Schema
norm_f = naming.normalize_path
name_mapping = {n.name: norm_f(n.name) for n in schema}
# verify if names uniquely normalize
normalized_names = set(name_mapping.values())
if len(name_mapping) != len(normalized_names):
raise NameNormalizationCollision(
f"Arrow schema fields normalized from {list(name_mapping.keys())} to"
f"Arrow schema fields normalized from:\n{list(name_mapping.keys())}:\nto:\n"
f" {list(normalized_names)}"
)
return name_mapping
Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pandas = [
{version = ">2.1", markers = "python_version >= '3.12'"},
{version = "<2.1", markers = "python_version < '3.12'"}
]
sqlglot = {version = ">=20.0.0"}

[tool.poetry.group.airflow]
optional = true
Expand Down
Loading

0 comments on commit b717627

Please sign in to comment.