Skip to content

Commit

Permalink
Merge pull request #801 from dlt-hub/rfix/step-info-refactor
Browse files Browse the repository at this point in the history
step info (extract, normalize, load) refactor
  • Loading branch information
sh-rp authored Dec 5, 2023
2 parents fcb7d5e + c6b1916 commit 88ba90f
Show file tree
Hide file tree
Showing 22 changed files with 662 additions and 412 deletions.
139 changes: 122 additions & 17 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import ABC, abstractmethod
import os
import datetime # noqa: 251
import humanize
Expand All @@ -7,19 +8,21 @@
Callable,
ClassVar,
Dict,
Generic,
List,
NamedTuple,
Optional,
Protocol,
Sequence,
TYPE_CHECKING,
Tuple,
TypeVar,
TypedDict,
Mapping,
)
from typing_extensions import NotRequired

from dlt.common import pendulum, logger
from dlt.common import pendulum
from dlt.common.configuration import configspec
from dlt.common.configuration import known_sections
from dlt.common.configuration.container import Container
Expand All @@ -28,7 +31,7 @@
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.configuration.paths import get_dlt_data_dir
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.destination import Destination, TDestinationReferenceArg, TDestination
from dlt.common.destination import TDestinationReferenceArg, TDestination
from dlt.common.exceptions import (
DestinationHasFailedJobs,
PipelineStateNotAvailable,
Expand All @@ -39,41 +42,103 @@
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
from dlt.common.source import get_current_pipe_name
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.typing import DictStrAny, REPattern
from dlt.common.typing import DictStrAny, REPattern, SupportsHumanize
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts


class StepInfo(SupportsHumanize):
pipeline: "SupportsPipeline"
loads_ids: List[str]
"""ids of the loaded packages"""
load_packages: List[LoadPackageInfo]
"""Information on loaded packages"""
started_at: datetime.datetime
first_run: bool

def __str__(self) -> str:
return self.asstr(verbosity=0)


class ExtractDataInfo(TypedDict):
name: str
data_type: str


class ExtractInfo(NamedTuple):
"""A tuple holding information on extracted data items. Returned by pipeline `extract` method."""
class ExtractMetrics(TypedDict):
schema_name: str


class _ExtractInfo(NamedTuple):
pipeline: "SupportsPipeline"
metrics: Dict[str, ExtractMetrics]
extract_data_info: List[ExtractDataInfo]
loads_ids: List[str]
"""ids of the loaded packages"""
load_packages: List[LoadPackageInfo]
"""Information on loaded packages"""
started_at: datetime.datetime
first_run: bool


class ExtractInfo(StepInfo, _ExtractInfo):
"""A tuple holding information on extracted data items. Returned by pipeline `extract` method."""

def asdict(self) -> DictStrAny:
return {}
"""A dictionary representation of ExtractInfo that can be loaded with `dlt`"""
d = self._asdict()
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
d["load_packages"] = [package.asdict() for package in self.load_packages]
d.pop("metrics")
d.pop("extract_data_info")
return d

def asstr(self, verbosity: int = 0) -> str:
return ""

def __str__(self) -> str:
return self.asstr(verbosity=0)

class NormalizeMetrics(TypedDict):
row_counts: RowCounts


class NormalizeInfo(NamedTuple):
class _NormalizeInfo(NamedTuple):
pipeline: "SupportsPipeline"
metrics: Dict[str, NormalizeMetrics]
loads_ids: List[str]
"""ids of the loaded packages"""
load_packages: List[LoadPackageInfo]
"""Information on loaded packages"""
started_at: datetime.datetime
first_run: bool


class NormalizeInfo(StepInfo, _NormalizeInfo):
"""A tuple holding information on normalized data items. Returned by pipeline `normalize` method."""

row_counts: Dict[str, int] = {}
@property
def row_counts(self) -> RowCounts:
if not self.metrics:
return {}
counts: RowCounts = {}
for metrics in self.metrics.values():
merge_row_counts(counts, metrics.get("row_counts", {}))
return counts

def asdict(self) -> DictStrAny:
"""A dictionary representation of NormalizeInfo that can be loaded with `dlt`"""
d = self._asdict()
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
d["load_packages"] = [package.asdict() for package in self.load_packages]
# list representation creates a nice table
d["row_counts"] = [{"table_name": k, "count": v} for k, v in self.row_counts.items()]
d["row_counts"] = []
for load_id, metrics in self.metrics.items():
d["row_counts"].extend(
[
{"load_id": load_id, "table_name": k, "count": v}
for k, v in metrics["row_counts"].items()
]
)
return d

def asstr(self, verbosity: int = 0) -> str:
Expand All @@ -85,13 +150,8 @@ def asstr(self, verbosity: int = 0) -> str:
msg = "No data found to normalize"
return msg

def __str__(self) -> str:
return self.asstr(verbosity=0)


class LoadInfo(NamedTuple):
"""A tuple holding the information on recently loaded packages. Returned by pipeline `run` and `load` methods"""

class _LoadInfo(NamedTuple):
pipeline: "SupportsPipeline"
destination_type: str
destination_displayable_credentials: str
Expand All @@ -109,6 +169,10 @@ class LoadInfo(NamedTuple):
started_at: datetime.datetime
first_run: bool


class LoadInfo(StepInfo, _LoadInfo):
"""A tuple holding the information on recently loaded packages. Returned by pipeline `run` and `load` methods"""

def asdict(self) -> DictStrAny:
"""A dictionary representation of LoadInfo that can be loaded with `dlt`"""
d = self._asdict()
Expand Down Expand Up @@ -175,6 +239,46 @@ def __str__(self) -> str:
return self.asstr(verbosity=1)


TStepMetrics = TypeVar("TStepMetrics")
TStepInfo = TypeVar("TStepInfo", bound=StepInfo)


class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo]):
"""Implemented by classes that generate StepInfo with metrics and package infos"""

_current_load_id: str
_load_id_metrics: Dict[str, TStepMetrics]
"""Completed load ids metrics"""

def __init__(self) -> None:
self._load_id_metrics = {}

def _step_info_start_load_id(self, load_id: str) -> None:
self._current_load_id = load_id
self._load_id_metrics[load_id] = None

def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> None:
assert self._current_load_id == load_id, (
f"Current load id mismatch {self._current_load_id} != {load_id} when completing step"
" info"
)
self._load_id_metrics[load_id] = metrics
self._current_load_id = None

def _step_info_metrics(self, load_id: str) -> TStepMetrics:
return self._load_id_metrics[load_id]

@abstractmethod
def get_step_info(
self,
pipeline: "SupportsPipeline",
started_at: datetime.datetime = None,
completed_at: datetime.datetime = None,
) -> TStepInfo:
"""Returns and instance of StepInfo with metrics and package infos"""
pass


class TPipelineLocalState(TypedDict, total=False):
first_run: bool
"""Indicates a first run of the pipeline, where run ends with successful loading of data"""
Expand Down Expand Up @@ -281,6 +385,7 @@ def __call__(
columns: Sequence[TColumnSchema] = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
) -> LoadInfo: ...


Expand Down
25 changes: 18 additions & 7 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns
from dlt.common.storages import FileStorage
from dlt.common.storages.exceptions import LoadPackageNotFound
from dlt.common.typing import DictStrAny, StrAny
from dlt.common.typing import DictStrAny, StrAny, SupportsHumanize
from dlt.common.utils import flatten_list_or_items

# folders to manage load jobs in a single load package
Expand Down Expand Up @@ -103,20 +103,31 @@ def __str__(self) -> str:
return self.asstr(verbosity=0)


class LoadPackageInfo(NamedTuple):
class _LoadPackageInfo(NamedTuple):
load_id: str
package_path: str
state: TLoadPackageState
schema_name: str
schema_hash: str
schema: Schema
schema_update: TSchemaTables
completed_at: datetime.datetime
jobs: Dict[TJobState, List[LoadJobInfo]]


class LoadPackageInfo(SupportsHumanize, _LoadPackageInfo):
@property
def schema_name(self) -> str:
return self.schema.name

@property
def schema_hash(self) -> str:
return self.schema.stored_version_hash

def asdict(self) -> DictStrAny:
d = self._asdict()
# job as list
d["jobs"] = [job.asdict() for job in flatten_list_or_items(iter(self.jobs.values()))] # type: ignore
d["schema_hash"] = self.schema_hash
d["schema_name"] = self.schema_name
# flatten update into list of columns
tables: List[DictStrAny] = deepcopy(list(self.schema_update.values())) # type: ignore
for table in tables:
Expand All @@ -131,8 +142,9 @@ def asdict(self) -> DictStrAny:
columns.append(column)
table["columns"] = columns
d.pop("schema_update")
d.pop("schema")
d["tables"] = tables
d["schema_hash"] = self.schema_hash

return d

def asstr(self, verbosity: int = 0) -> str:
Expand Down Expand Up @@ -402,8 +414,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo:
load_id,
self.storage.make_full_path(package_path),
package_state,
schema.name,
schema.version_hash,
schema,
applied_update,
package_created_at,
all_jobs,
Expand Down
14 changes: 7 additions & 7 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
TValue = TypeVar("TValue")

# row counts
TRowCount = Dict[str, int]
RowCounts = Dict[str, int]


def chunks(seq: Sequence[T], n: int) -> Iterator[Sequence[T]]:
Expand Down Expand Up @@ -477,15 +477,15 @@ def identity(x: TAny) -> TAny:
return x


def increase_row_count(row_counts: TRowCount, table_name: str, count: int) -> None:
row_counts[table_name] = row_counts.get(table_name, 0) + count
def increase_row_count(row_counts: RowCounts, counter_name: str, count: int) -> None:
row_counts[counter_name] = row_counts.get(counter_name, 0) + count


def merge_row_count(row_counts_1: TRowCount, row_counts_2: TRowCount) -> None:
def merge_row_counts(row_counts_1: RowCounts, row_counts_2: RowCounts) -> None:
"""merges row counts_2 into row_counts_1"""
keys = set(row_counts_1.keys()) | set(row_counts_2.keys())
for key in keys:
row_counts_1[key] = row_counts_1.get(key, 0) + row_counts_2.get(key, 0)
# only keys present in row_counts_2 are modifed
for counter_name in row_counts_2.keys():
row_counts_1[counter_name] = row_counts_1.get(counter_name, 0) + row_counts_2[counter_name]


def extend_list_deduplicated(original_list: List[Any], extending_list: Iterable[Any]) -> List[Any]:
Expand Down
Loading

0 comments on commit 88ba90f

Please sign in to comment.