Skip to content

Commit

Permalink
Merge pull request #143 from dlt-hub/rfix/improves-naming
Browse files Browse the repository at this point in the history
improves naming convention protocol
  • Loading branch information
rudolfix authored Feb 21, 2023
2 parents 6336096 + 21e1f13 commit 6a35ac8
Show file tree
Hide file tree
Showing 23 changed files with 9,674 additions and 149 deletions.
4 changes: 2 additions & 2 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dlt.common.configuration import is_secret_hint, DOT_DLT, make_dot_dlt_path
from dlt.common.configuration.providers import CONFIG_TOML, SECRETS_TOML, ConfigTomlProvider, SecretsTomlProvider
from dlt.version import DLT_PKG_NAME, __version__
from dlt.common.normalizers.names.snake_case import normalize_schema_name
from dlt.common.normalizers.names.snake_case import normalize_identifier
from dlt.common.destination import DestinationReference
from dlt.common.reflection.utils import creates_func_def_name_node, rewrite_python_script
from dlt.common.schema.exceptions import InvalidSchemaName
Expand Down Expand Up @@ -131,7 +131,7 @@ def init_command(pipeline_name: str, destination_name: str, use_generic_template
init_script_name = pipeline_script

# normalize source name
norm_source_name = normalize_schema_name(pipeline_name)
norm_source_name = normalize_identifier(pipeline_name)
if norm_source_name != pipeline_name:
raise InvalidSchemaName(pipeline_name, norm_source_name)
dest_pipeline_script = norm_source_name + ".py"
Expand Down
12 changes: 6 additions & 6 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ def _flatten(schema: Schema, table: str, dict_row: TDataItemRow, _r_lvl: int) ->

def norm_row_dicts(dict_row: StrAny, __r_lvl: int, parent_name: Optional[str]) -> None:
for k, v in dict_row.items():
corrected_k = schema.normalize_column_name(k)
child_name = corrected_k if not parent_name else schema.normalize_make_path(parent_name, corrected_k)
corrected_k = schema.naming.normalize_identifier(k)
child_name = corrected_k if not parent_name else schema.naming.normalize_make_path(parent_name, corrected_k)
# for lists and dicts we must check if type is possibly complex
if isinstance(v, (dict, list)):
if not _is_complex_type(schema, table, child_name, __r_lvl):
Expand Down Expand Up @@ -143,7 +143,7 @@ def _normalize_list(
yield from _normalize_row(schema, v, extend, table, parent_table, parent_row_id, idx, _r_lvl)
elif isinstance(v, list):
# normalize lists of lists, we assume all lists in the list have the same type so they should go to the same table
list_table_name = schema.normalize_make_path(table, "list")
list_table_name = schema.naming.normalize_make_path(table, "list")
yield from _normalize_list(schema, v, extend, list_table_name, parent_table, parent_row_id, _r_lvl + 1)
else:
# list of simple types
Expand Down Expand Up @@ -197,11 +197,11 @@ def _normalize_row(

# normalize and yield lists
for k, list_content in lists.items():
yield from _normalize_list(schema, list_content, extend, schema.normalize_make_path(table, k), table, row_id, _r_lvl + 1)
yield from _normalize_list(schema, list_content, extend, schema.naming.normalize_make_path(table, k), table, row_id, _r_lvl + 1)


def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None:
validate_dict(RelationalNormalizerConfig, config, "./normalizers/json/config", validator_f=column_name_validator(schema.normalize_column_name))
validate_dict(RelationalNormalizerConfig, config, "./normalizers/json/config", validator_f=column_name_validator(schema.naming))


def update_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None:
Expand Down Expand Up @@ -247,4 +247,4 @@ def normalize_data_item(schema: Schema, item: TDataItem, load_id: str, table_nam
row = cast(TDataItemRowRoot, item)
# identify load id if loaded data must be processed after loading incrementally
row["_dlt_load_id"] = load_id
yield from _normalize_row(schema, cast(TDataItemRowChild, row), {}, schema.normalize_table_name(table_name))
yield from _normalize_row(schema, cast(TDataItemRowChild, row), {}, schema.naming.normalize_identifier(table_name))
8 changes: 1 addition & 7 deletions dlt/common/normalizers/names/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,2 @@
from typing import Callable, Sequence
from .typing import NamingConvention

# function signature to normalize names
TNormalizeNameFunc = Callable[[str], str]
# function signature to make paths
TNormalizeMakePath = Callable[..., str]
# function signature to break path into components
TNormalizeBreakPath = Callable[[str], Sequence[str]]
50 changes: 24 additions & 26 deletions dlt/common/normalizers/names/snake_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,41 @@
PATH_SEPARATOR = "__"


# fix a name so it's acceptable as database table name
def camel_to_snake(name: str) -> str:
name = SNAKE_CASE_BREAK_1.sub(r'\1_\2', name)
return SNAKE_CASE_BREAK_2.sub(r'\1_\2', name).lower()


@lru_cache(maxsize=None)
def normalize_table_name(name: str) -> str:
if not name:
raise ValueError(name)
def normalize_path(path: str) -> str:
"""Breaks path into identifiers using PATH_SEPARATOR, normalizes components and reconstitutes the path"""
return normalize_make_path(*map(normalize_identifier, normalize_break_path(path)))

def camel_to_snake(name: str) -> str:
name = SNAKE_CASE_BREAK_1.sub(r'\1_\2', name)
return SNAKE_CASE_BREAK_2.sub(r'\1_\2', name).lower()

# fix a name so it's an acceptable name for a database column
@lru_cache(maxsize=None)
def normalize_identifier(name: str) -> str:
"""Normalizes the identifier according to naming convention represented by this function"""
if not name:
raise ValueError(name)
# all characters that are not letters digits or a few special chars are replaced with underscore
# then convert to snake case
name = camel_to_snake(RE_NON_ALPHANUMERIC.sub("_", name))
# leading digits will be prefixed
if RE_LEADING_DIGITS.match(name):
name = "_" + name
# max 2 consecutive underscores are allowed
return RE_DOUBLE_UNDERSCORES.sub("__", name)


# fix a name so it's an acceptable name for a database column
@lru_cache(maxsize=None)
def normalize_column_name(name: str) -> str:
# replace consecutive underscores with single one to prevent name clashes with PATH_SEPARATOR
return RE_UNDERSCORES.sub("_", normalize_table_name(name))


# fix a name so it is acceptable as schema name
def normalize_schema_name(name: str) -> str:
return normalize_column_name(name)
return RE_UNDERSCORES.sub("_", name)


# build full db dataset (dataset) name out of (normalized) default dataset and schema name
def normalize_make_dataset_name(dataset_name: str, default_schema_name: str, schema_name: str) -> str:
"""Builds full db dataset (dataset) name out of (normalized) default dataset and schema name"""
if not schema_name:
raise ValueError("schema_name is None")
norm_name = normalize_schema_name(dataset_name)
raise ValueError("schema_name is None or empty")
if not dataset_name:
raise ValueError("dataset_name is None or empty")
norm_name = normalize_identifier(dataset_name)
if norm_name != dataset_name:
raise InvalidDatasetName(dataset_name, norm_name)
# if default schema is None then suffix is not added
Expand All @@ -64,13 +62,13 @@ def normalize_make_dataset_name(dataset_name: str, default_schema_name: str, sch
return norm_name


# this function builds path out of path elements using PATH_SEPARATOR
def normalize_make_path(*elems: Any) -> str:
return PATH_SEPARATOR.join(elems)
def normalize_make_path(*identifiers: Any) -> str:
"""Builds path out of path identifiers using PATH_SEPARATOR. Identifiers are not normalized"""
return PATH_SEPARATOR.join(identifiers)


# this function break path into elements
def normalize_break_path(path: str) -> Sequence[str]:
"""Breaks path into sequence of identifiers"""
return path.split(PATH_SEPARATOR)


Expand Down
25 changes: 25 additions & 0 deletions dlt/common/normalizers/names/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Any, Protocol, Sequence


class NamingConvention(Protocol):
PATH_SEPARATOR: str

def normalize_identifier(self, name: str) -> str:
"""Normalizes the identifier according to naming convention represented by this function"""
...

def normalize_path(self, path: str) -> str:
"""Breaks path into identifiers using PATH_SEPARATOR, normalizes components and reconstitutes the path"""
...

def normalize_make_path(self, *identifiers: Any) -> str:
"""Builds path out of path identifiers using PATH_SEPARATOR. Identifiers are not normalized"""
...

def normalize_break_path(self, path: str) -> Sequence[str]:
"""Breaks path into sequence of identifiers"""
...

def normalize_make_dataset_name(self, dataset_name: str, default_schema_name: str, schema_name: str) -> str:
"""Builds full db dataset (dataset) name out of (normalized) default dataset and schema name"""
pass
41 changes: 13 additions & 28 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dlt.common import json

from dlt.common.typing import DictStrAny, StrAny, REPattern, SupportsVariant, VARIANT_FIELD_FORMAT
from dlt.common.normalizers.names import TNormalizeBreakPath, TNormalizeMakePath, TNormalizeNameFunc
from dlt.common.normalizers.names import NamingConvention
from dlt.common.normalizers.json import TNormalizeJSONFunc
from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType,
Expand All @@ -19,12 +19,7 @@ class Schema:
ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION

# name normalization functions
normalize_table_name: TNormalizeNameFunc
normalize_column_name: TNormalizeNameFunc
normalize_schema_name: TNormalizeNameFunc
normalize_make_dataset_name: TNormalizeMakePath
normalize_make_path: TNormalizeMakePath
normalize_break_path: TNormalizeBreakPath
naming: NamingConvention
# json normalization function
normalize_data_item: TNormalizeJSONFunc

Expand Down Expand Up @@ -117,18 +112,18 @@ def _exclude(path: str, excludes: Sequence[REPattern], includes: Sequence[REPatt
return is_excluded and not is_included

# break table name in components
branch = self.normalize_break_path(table_name)
branch = self.naming.normalize_break_path(table_name)

# check if any of the rows is excluded by rules in any of the tables
for i in range(len(branch), 0, -1): # stop is exclusive in `range`
# start at the top level table
c_t = self.normalize_make_path(*branch[:i])
c_t = self.naming.normalize_make_path(*branch[:i])
excludes = self._compiled_excludes.get(c_t)
# only if there's possibility to exclude, continue
if excludes:
includes = self._compiled_includes.get(c_t) or []
for field_name in list(row.keys()):
path = self.normalize_make_path(*branch[i:], field_name)
path = self.naming.normalize_make_path(*branch[i:], field_name)
if _exclude(path, excludes, includes):
# TODO: copy to new instance
del row[field_name] # type: ignore
Expand Down Expand Up @@ -228,14 +223,14 @@ def merge_hints(self, new_hints: Mapping[TColumnHint, Sequence[TSimpleRegex]]) -

def normalize_table_identifiers(self, table: TTableSchema) -> TTableSchema:
# normalize all identifiers in table according to name normalizer of the schema
table["name"] = self.normalize_table_name(table["name"])
table["name"] = self.naming.normalize_path(table["name"])
parent = table.get("parent")
if parent:
table["parent"] = self.normalize_table_name(parent)
table["parent"] = self.naming.normalize_path(parent)
columns = table.get("columns")
if columns:
for c in columns.values():
c["name"] = self.normalize_column_name(c["name"])
c["name"] = self.naming.normalize_path(c["name"])
# re-index columns as the name changed
table["columns"] = {c["name"]:c for c in columns.values()}
return table
Expand Down Expand Up @@ -353,7 +348,7 @@ def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name:
# otherwise we must create variant extension to the table
# pass final=True so no more auto-variants can be created recursively
# TODO: generate callback so DLT user can decide what to do
variant_col_name = self.normalize_make_path(col_name, VARIANT_FIELD_FORMAT % py_type)
variant_col_name = self.naming.normalize_make_path(col_name, VARIANT_FIELD_FORMAT % py_type)
return self._coerce_non_null_value(table_columns, table_name, variant_col_name, v, final=True)

# if coerced value is variant, then extract variant value
Expand All @@ -362,7 +357,7 @@ def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name:
coerced_v = coerced_v()
if isinstance(coerced_v, tuple):
# variant recovered so call recursively with variant column name and variant value
variant_col_name = self.normalize_make_path(col_name, VARIANT_FIELD_FORMAT % coerced_v[0])
variant_col_name = self.naming.normalize_make_path(col_name, VARIANT_FIELD_FORMAT % coerced_v[0])
return self._coerce_non_null_value(table_columns, table_name, variant_col_name, coerced_v[1], final=True)

if not existing_column:
Expand Down Expand Up @@ -408,12 +403,7 @@ def _configure_normalizers(self) -> None:
# import desired modules
naming_module, json_module = utils.import_normalizers(self._normalizers_config)
# name normalization functions
self.normalize_table_name = naming_module.normalize_table_name
self.normalize_column_name = naming_module.normalize_column_name
self.normalize_schema_name = naming_module.normalize_schema_name
self.normalize_make_dataset_name = naming_module.normalize_make_dataset_name
self.normalize_make_path = naming_module.normalize_make_path
self.normalize_break_path = naming_module.normalize_break_path
self.naming = naming_module
# data item normalization function
self.normalize_data_item = json_module.normalize_data_item
json_module.extend_schema(self)
Expand All @@ -434,12 +424,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None, norma
self._type_detections: Sequence[TTypeDetections] = None

self._normalizers_config: TNormalizersConfig = normalizers
self.normalize_table_name: TNormalizeNameFunc = None
self.normalize_column_name: TNormalizeNameFunc = None
self.normalize_schema_name: TNormalizeNameFunc = None
self.normalize_make_dataset_name: TNormalizeMakePath = None
self.normalize_make_path: TNormalizeMakePath = None
self.normalize_break_path: TNormalizeBreakPath = None
self.naming = None
# json normalization function
self.normalize_data_item: TNormalizeJSONFunc = None

Expand Down Expand Up @@ -470,7 +455,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._compile_settings()

def _set_schema_name(self, name: str, normalize_name: bool) -> None:
normalized_name = self.normalize_schema_name(name)
normalized_name = self.naming.normalize_identifier(name)
if name != normalized_name:
if normalize_name:
name = normalized_name
Expand Down
11 changes: 7 additions & 4 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dlt.common.json._simplejson import custom_encode as json_custom_encode
from dlt.common.arithmetics import InvalidOperation
from dlt.common.exceptions import DictValidationException
from dlt.common.normalizers.names import TNormalizeNameFunc
from dlt.common.normalizers.names import NamingConvention
from dlt.common.typing import DictStrAny, REPattern
from dlt.common.time import parse_iso_like_datetime
from dlt.common.utils import map_nested_in_place, str2bool
Expand Down Expand Up @@ -129,13 +129,16 @@ def simple_regex_validator(path: str, pk: str, pv: Any, t: Any) -> bool:
return False


def column_name_validator(normalize_func: TNormalizeNameFunc) -> TCustomValidator:
def column_name_validator(naming: NamingConvention) -> TCustomValidator:

def validator(path: str, pk: str, pv: Any, t: Any) -> bool:
if t is TColumnName:
if not isinstance(pv, str):
raise DictValidationException(f"In {path}: field {pk} value {pv} has invalid type {type(pv).__name__} while str is expected", path, pk, pv)
if normalize_func(pv) != pv:
try:
if naming.normalize_path(pv) != pv:
raise DictValidationException(f"In {path}: field {pk}: {pv} is not a valid column name", path, pk, pv)
except ValueError:
raise DictValidationException(f"In {path}: field {pk}: {pv} is not a valid column name", path, pk, pv)
return True
else:
Expand Down Expand Up @@ -597,7 +600,7 @@ def default_normalizers() -> TNormalizersConfig:
}


def import_normalizers(normalizers_config: TNormalizersConfig) -> Tuple[ModuleType, ModuleType]:
def import_normalizers(normalizers_config: TNormalizersConfig) -> Tuple[NamingConvention, ModuleType]:
# TODO: type the modules with protocols
naming_module = import_module(normalizers_config["names"])
json_module = import_module(normalizers_config["json"]["module"])
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class BigQueryClient(SqlJobClientBase):

def __init__(self, schema: Schema, config: BigQueryClientConfiguration) -> None:
sql_client = BigQuerySqlClient(
schema.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
schema.naming.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
config.credentials
)
super().__init__(schema, config, sql_client)
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DuckDbClient(InsertValuesJobClient):

def __init__(self, schema: Schema, config: DuckDbClientConfiguration) -> None:
sql_client = DuckDbSqlClient(
schema.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
schema.naming.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
config.credentials
)
super().__init__(schema, config, sql_client)
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class PostgresClient(InsertValuesJobClient):

def __init__(self, schema: Schema, config: PostgresClientConfiguration) -> None:
sql_client = Psycopg2SqlClient(
schema.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
schema.naming.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
config.credentials
)
super().__init__(schema, config, sql_client)
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class RedshiftClient(InsertValuesJobClient):

def __init__(self, schema: Schema, config: RedshiftClientConfiguration) -> None:
sql_client = RedshiftSqlClient (
schema.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
schema.naming.normalize_make_dataset_name(config.dataset_name, config.default_schema_name, schema.name),
config.credentials
)
super().__init__(schema, config, sql_client)
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _write_item(table_name: str, item: TDataItems) -> None:
# note: normalize function should be cached so there's almost no penalty on frequent calling
# note: column schema is not required for jsonl writer used here
# event.pop(DLT_METADATA_FIELD, None) # type: ignore
storage.write_data_item(extract_id, schema.name, schema.normalize_table_name(table_name), item, None)
storage.write_data_item(extract_id, schema.name, schema.naming.normalize_identifier(table_name), item, None)

def _write_dynamic_table(resource: DltResource, item: TDataItem) -> None:
table_name = resource._table_name_hint_fun(item)
Expand Down
Loading

0 comments on commit 6a35ac8

Please sign in to comment.