Skip to content

Commit

Permalink
Merge pull request #105 from dlt-hub/rfix/usability-fixes-2
Browse files Browse the repository at this point in the history
usability fixes 2
  • Loading branch information
rudolfix authored Dec 4, 2022
2 parents 92515b9 + c02fb25 commit cfcc6a6
Show file tree
Hide file tree
Showing 43 changed files with 1,137 additions and 405 deletions.
2 changes: 1 addition & 1 deletion dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def deploy_command(pipeline_script_path: str, deployment_method: str, schedule:
# full_refresh = False
pipelines_dir: str = None
pipeline_name: str = None
# restore_from_destination = False

if n.PIPELINE in visitor.known_calls:
for call_args in visitor.known_calls[n.PIPELINE]:
f_r_node = call_args.arguments.get("full_refresh")
Expand Down
12 changes: 9 additions & 3 deletions dlt/common/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ def __init__(self, destination_name: str = None, credentials: Optional[Credentia

@configspec(init=True)
class DestinationClientDwhConfiguration(DestinationClientConfiguration):
dataset_name: str = None # dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix
default_schema_name: Optional[str] = None # name of default schema to be used to name effective dataset to load data to
dataset_name: str = None
"""dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix"""
default_schema_name: Optional[str] = None
"""name of default schema to be used to name effective dataset to load data to"""

if TYPE_CHECKING:
def __init__(
Expand Down Expand Up @@ -101,7 +103,11 @@ def __init__(self, schema: Schema, config: DestinationClientConfiguration) -> No
self.config = config

@abstractmethod
def initialize_storage(self, wipe_data: bool = False) -> None:
def initialize_storage(self) -> None:
pass

@abstractmethod
def is_storage_initialized(self) -> bool:
pass

@abstractmethod
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/normalizers/names/snake_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def normalize_make_dataset_name(dataset_name: str, default_schema_name: str, sch
norm_name = normalize_schema_name(dataset_name)
if norm_name != dataset_name:
raise InvalidDatasetName(dataset_name, norm_name)
if default_schema_name is None or schema_name != default_schema_name:
# if default schema is None then suffix is not added
if default_schema_name is not None and schema_name != default_schema_name:
norm_name += "_" + schema_name

return norm_name
Expand Down
14 changes: 11 additions & 3 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class LoadInfo(NamedTuple):
dataset_name: str
loads_ids: Dict[str, bool]
failed_jobs: Dict[str, Sequence[Tuple[str, str]]]
first_run: bool

def __str__(self) -> str:
msg = f"{len(self.loads_ids)} load package(s) were loaded to destination {self.destination_name} and into dataset {self.dataset_name}\n"
Expand All @@ -45,6 +46,14 @@ def __str__(self) -> str:
return msg



class TPipelineLocalState(TypedDict, total=False):
first_run: bool
"""Indicates a first run of the pipeline, where run ends with successful loading of data"""
_last_extracted_at: datetime.datetime
"""Timestamp indicating when the state was synced with the destination. Lack of timestamp means not synced state."""


class TPipelineState(TypedDict, total=False):
"""Schema for a pipeline state that is stored within the pipeline working directory"""
pipeline_name: str
Expand All @@ -58,9 +67,8 @@ class TPipelineState(TypedDict, total=False):
# properties starting with _ are not automatically applied to pipeline object when state is restored
_state_version: int
_state_engine_version: int
_last_extracted_at: datetime.datetime
"""Timestamp indicating when the state was synced with the destination. Lack of timestamp means not synced state."""

_local: TPipelineLocalState
"""A section of state that is not synchronized with the destination and does not participate in change merging and version control"""

class SupportsPipeline(Protocol):
"""A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties"""
Expand Down
145 changes: 89 additions & 56 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,44 @@
class Schema:
ENGINE_VERSION: ClassVar[int] = 5

def __init__(self, name: str, normalizers: TNormalizersConfig = None, normalize_name: bool = False) -> None:
self._schema_tables: TSchemaTables = {}
self._schema_name: str = None
self._stored_version = 1 # version at load/creation time
self._stored_version_hash: str = None # version hash at load/creation time
self._imported_version_hash: str = None # version hash of recently imported schema
self._schema_description: str = None # optional schema description
# schema settings to hold default hints, preferred types and other settings
self._settings: TSchemaSettings = {}

# list of preferred types: map regex on columns into types
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
# compiled default hints
self._compiled_hints: Dict[TColumnHint, Sequence[REPattern]] = {}
# compiled exclude filters per table
self._compiled_excludes: Dict[str, Sequence[REPattern]] = {}
# compiled include filters per table
self._compiled_includes: Dict[str, Sequence[REPattern]] = {}
# name normalization functions
normalize_table_name: TNormalizeNameFunc
normalize_column_name: TNormalizeNameFunc
normalize_schema_name: TNormalizeNameFunc
normalize_make_dataset_name: TNormalizeMakePath
normalize_make_path: TNormalizeMakePath
normalize_break_path: TNormalizeBreakPath
# json normalization function
normalize_data_item: TNormalizeJSONFunc

_schema_tables: TSchemaTables
_schema_name: str
_stored_version: int # version at load/creation time
_stored_version_hash: str # version hash at load/creation time
_imported_version_hash: str # version hash of recently imported schema
_schema_description: str # optional schema description
# schema settings to hold default hints, preferred types and other settings
_settings: TSchemaSettings

# list of preferred types: map regex on columns into types
_compiled_preferred_types: List[Tuple[REPattern, TDataType]]
# compiled default hints
_compiled_hints: Dict[TColumnHint, Sequence[REPattern]]
# compiled exclude filters per table
_compiled_excludes: Dict[str, Sequence[REPattern]]
# compiled include filters per table
_compiled_includes: Dict[str, Sequence[REPattern]]

# normalizers config
_normalizers_config: TNormalizersConfig

# normalizers config
self._normalizers_config: TNormalizersConfig = normalizers
# name normalization functions
self.normalize_table_name: TNormalizeNameFunc = None
self.normalize_column_name: TNormalizeNameFunc = None
self.normalize_schema_name: TNormalizeNameFunc = None
self.normalize_make_dataset_name: TNormalizeMakePath = None
self.normalize_make_path: TNormalizeMakePath = None
self.normalize_break_path: TNormalizeBreakPath = None
# json normalization function
self.normalize_data_item: TNormalizeJSONFunc = None

# add version tables
self._add_standard_tables()
# add standard hints
self._add_standard_hints()
# configure normalizers, including custom config if present
self._configure_normalizers()
# verify schema name after configuring normalizers
self._set_schema_name(name, normalize_name)
# compile all known regexes
self._compile_regexes()
# set initial version hash
self._stored_version_hash = self.version_hash
def __init__(self, name: str, normalizers: TNormalizersConfig = None, normalize_name: bool = False) -> None:
self._reset_schema(name, normalizers, normalize_name)

@classmethod
def from_dict(cls, d: DictStrAny) -> "Schema":
# upgrade engine if needed
stored_schema = utils.upgrade_engine_version(d, d["engine_version"], cls.ENGINE_VERSION)
stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION)
# verify schema
utils.validate_stored_schema(stored_schema)
# add defaults
Expand All @@ -79,21 +69,13 @@ def from_dict(cls, d: DictStrAny) -> "Schema":
def from_stored_schema(cls, stored_schema: TStoredSchema) -> "Schema":
# create new instance from dict
self: Schema = cls(stored_schema["name"], normalizers=stored_schema.get("normalizers", None))
self._schema_tables = stored_schema.get("tables") or {}
if VERSION_TABLE_NAME not in self._schema_tables:
raise SchemaCorruptedException(f"Schema must contain table {VERSION_TABLE_NAME}")
if LOADS_TABLE_NAME not in self._schema_tables:
raise SchemaCorruptedException(f"Schema must contain table {LOADS_TABLE_NAME}")
self._stored_version = stored_schema["version"]
self._stored_version_hash = stored_schema["version_hash"]
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
# compile regexes
self._compile_regexes()

self._from_stored_schema(stored_schema)
return self

def replace_schema_content(self, schema: "Schema") -> None:
self._reset_schema(schema.name, schema._normalizers_config)
self._from_stored_schema(schema.to_dict())

def to_dict(self, remove_defaults: bool = False) -> TStoredSchema:
stored_schema: TStoredSchema = {
"version": self._stored_version,
Expand Down Expand Up @@ -439,6 +421,57 @@ def _configure_normalizers(self) -> None:
self.normalize_data_item = json_module.normalize_data_item
json_module.extend_schema(self)

def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None, normalize_name: bool = False) -> None:
self._schema_tables: TSchemaTables = {}
self._schema_name: str = None
self._stored_version = 1
self._stored_version_hash: str = None
self._imported_version_hash: str = None
self._schema_description: str = None

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
self._compiled_hints: Dict[TColumnHint, Sequence[REPattern]] = {}
self._compiled_excludes: Dict[str, Sequence[REPattern]] = {}
self._compiled_includes: Dict[str, Sequence[REPattern]] = {}

self._normalizers_config: TNormalizersConfig = normalizers
self.normalize_table_name: TNormalizeNameFunc = None
self.normalize_column_name: TNormalizeNameFunc = None
self.normalize_schema_name: TNormalizeNameFunc = None
self.normalize_make_dataset_name: TNormalizeMakePath = None
self.normalize_make_path: TNormalizeMakePath = None
self.normalize_break_path: TNormalizeBreakPath = None
# json normalization function
self.normalize_data_item: TNormalizeJSONFunc = None

# add version tables
self._add_standard_tables()
# add standard hints
self._add_standard_hints()
# configure normalizers, including custom config if present
self._configure_normalizers()
# verify schema name after configuring normalizers
self._set_schema_name(name, normalize_name)
# compile all known regexes
self._compile_regexes()
# set initial version hash
self._stored_version_hash = self.version_hash

def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._schema_tables = stored_schema.get("tables") or {}
if VERSION_TABLE_NAME not in self._schema_tables:
raise SchemaCorruptedException(f"Schema must contain table {VERSION_TABLE_NAME}")
if LOADS_TABLE_NAME not in self._schema_tables:
raise SchemaCorruptedException(f"Schema must contain table {LOADS_TABLE_NAME}")
self._stored_version = stored_schema["version"]
self._stored_version_hash = stored_schema["version_hash"]
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
# compile regexes
self._compile_regexes()

def _set_schema_name(self, name: str, normalize_name: bool) -> None:
normalized_name = self.normalize_schema_name(name)
if name != normalized_name:
Expand Down
15 changes: 10 additions & 5 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,11 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:
raise ParentTableNotFoundException(table_name, parent_table_name)


def upgrade_engine_version(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:
def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:
if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

if from_engine == 1 and to_engine > 1:
schema_dict["engine_version"] = 2
schema_dict["includes"] = []
schema_dict["excludes"] = []
from_engine = 2
Expand Down Expand Up @@ -233,20 +232,21 @@ def migrate_filters(group: str, filters: List[str]) -> None:
migrate_filters("includes", includes)

# upgraded
schema_dict["engine_version"] = 3
from_engine = 3
if from_engine == 3 and to_engine > 3:
# set empty version hash to pass validation, in engine 4 this hash is mandatory
schema_dict.setdefault("version_hash", "")
schema_dict["engine_version"] = 4
from_engine = 4
if from_engine == 4 and to_engine > 4:
# replace schema versions table
schema_dict["tables"][VERSION_TABLE_NAME] = version_table()
schema_dict["engine_version"] = 5
schema_dict["tables"][LOADS_TABLE_NAME] = load_table()
from_engine = 5

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
raise SchemaEngineNoUpgradePathException(schema_dict["name"], schema_dict["engine_version"], from_engine, to_engine)

return cast(TStoredSchema, schema_dict)


Expand Down Expand Up @@ -554,6 +554,11 @@ def load_table() -> TTableSchema:
"data_type": "text",
"nullable": False
}),
add_missing_hints({
"name": "schema_name",
"data_type": "text",
"nullable": True
}),
add_missing_hints({
"name": "status",
"data_type": "bigint",
Expand Down
18 changes: 17 additions & 1 deletion dlt/common/signals.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import signal
from contextlib import contextmanager
from threading import Event
from typing import Any, Callable, TYPE_CHECKING
from typing import Any, TYPE_CHECKING, Iterator

if not TYPE_CHECKING:
from dlt.common import logger
Expand All @@ -9,6 +10,7 @@
from dlt.common.exceptions import SignalReceivedException

_received_signal: int = 0
_raise_immediately: bool = False
exit_event = Event()


Expand All @@ -27,6 +29,9 @@ def signal_receiver(sig: int, frame: Any) -> None:

logger.info("Sleeping threads signalled")

if _raise_immediately and _received_signal == signal.SIGINT:
raise_if_signalled()


def raise_if_signalled() -> None:
if _received_signal:
Expand All @@ -36,3 +41,14 @@ def raise_if_signalled() -> None:
def register_signals() -> None:
signal.signal(signal.SIGINT, signal_receiver)
signal.signal(signal.SIGTERM, signal_receiver)


@contextmanager
def raise_immediately() -> Iterator[None]:
global _raise_immediately

try:
_raise_immediately = True
yield
finally:
_raise_immediately = False
Loading

0 comments on commit cfcc6a6

Please sign in to comment.