From eea7d6d64ef1419d5420f354eb181a53cf4e5a0e Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 5 Dec 2023 23:42:19 +0100 Subject: [PATCH 1/3] adds utils to generate exception traces --- dlt/common/configuration/exceptions.py | 11 +++- dlt/common/exceptions.py | 33 +++++++++- dlt/common/pipeline.py | 9 ++- dlt/common/utils.py | 65 +++++++++++++++++++ .../configuration/test_configuration.py | 29 ++++++++- tests/common/test_utils.py | 53 +++++++++++++++ 6 files changed, 194 insertions(+), 6 deletions(-) diff --git a/dlt/common/configuration/exceptions.py b/dlt/common/configuration/exceptions.py index 91aa3e7ad3..a3927102c3 100644 --- a/dlt/common/configuration/exceptions.py +++ b/dlt/common/configuration/exceptions.py @@ -1,5 +1,5 @@ import os -from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence +from typing import Any, Dict, Mapping, Type, Tuple, NamedTuple, Sequence from dlt.common.exceptions import DltException, TerminalException from dlt.common.utils import main_module_file_path @@ -79,6 +79,15 @@ def __str__(self) -> str: ) return msg + def attrs(self) -> Dict[str, Any]: + attrs_ = super().attrs() + if "traces" in attrs_: + for _, traces in self.traces.items(): + for idx, trace in enumerate(traces): + # drop all values as they may contain secrets + traces[idx] = trace._replace(value=None) # type: ignore[index] + return attrs_ + class UnmatchedConfigHintResolversException(ConfigurationException): """Raised when using `@resolve_type` on a field that doesn't exist in the spec""" diff --git a/dlt/common/exceptions.py b/dlt/common/exceptions.py index 36e98d8c00..c14a743f33 100644 --- a/dlt/common/exceptions.py +++ b/dlt/common/exceptions.py @@ -1,4 +1,27 @@ -from typing import Any, AnyStr, List, Sequence, Optional, Iterable +from typing import Any, AnyStr, Dict, List, Sequence, Optional, Iterable, TypedDict + + +class ExceptionTrace(TypedDict, total=False): + """Exception trace. NOTE: we intend to change it with an extended line by line trace with code snippets""" + + message: str + exception_type: str + docstring: str + stack_trace: List[str] + is_terminal: bool + """Says if exception is terminal if happened to a job during load step""" + exception_attrs: Dict[str, Any] + """Public attributes of an exception deriving from DltException (not starting with _)""" + load_id: str + """Load id if found in exception attributes""" + pipeline_name: str + """Pipeline name if found in exception attributes or in the active pipeline (Container)""" + source_name: str + """Source name if found in exception attributes or in Container""" + resource_name: str + """Resource name if found in exception attributes""" + job_id: str + """Job id if found in exception attributes""" class DltException(Exception): @@ -6,6 +29,14 @@ def __reduce__(self) -> Any: """Enables exceptions with parametrized constructor to be pickled""" return type(self).__new__, (type(self), *self.args), self.__dict__ + def attrs(self) -> Dict[str, Any]: + """Returns "public" attributes of the DltException""" + return { + k: v + for k, v in vars(self).items() + if not k.startswith("_") and not callable(v) and not hasattr(self.__class__, k) + } + class UnsupportedProcessStartMethodException(DltException): def __init__(self, method: str) -> None: diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 972ca67816..3bbff3a218 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -35,7 +35,6 @@ from dlt.common.exceptions import ( DestinationHasFailedJobs, PipelineStateNotAvailable, - ResourceNameNotAvailable, SourceSectionNotAvailable, ) from dlt.common.schema import Schema @@ -90,6 +89,7 @@ def asdict(self) -> DictStrAny: d = self._asdict() d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name} d["load_packages"] = [package.asdict() for package in self.load_packages] + # TODO: transform and leave metrics when we have them implemented d.pop("metrics") d.pop("extract_data_info") return d @@ -268,6 +268,11 @@ def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> No def _step_info_metrics(self, load_id: str) -> TStepMetrics: return self._load_id_metrics[load_id] + @property + def current_load_id(self) -> str: + """Returns currently processing load id""" + return self._current_load_id + @abstractmethod def get_step_info( self, @@ -575,8 +580,6 @@ def resource_state( # backtrace to find the shallowest resource if not resource_name: resource_name = get_current_pipe_name() - if not resource_name: - raise ResourceNameNotAvailable() return state_.setdefault("resources", {}).setdefault(resource_name, {}) # type: ignore diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 0cd91d3390..72fee608a8 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -8,6 +8,7 @@ from functools import wraps from os import environ from types import ModuleType +import traceback import zlib from typing import ( @@ -28,6 +29,7 @@ ) from collections.abc import Mapping as C_Mapping +from dlt.common.exceptions import DltException, ExceptionTrace, TerminalException from dlt.common.typing import AnyFun, StrAny, DictStrAny, StrStr, TAny, TFun @@ -510,3 +512,66 @@ def maybe_context(manager: ContextManager[TAny]) -> Iterator[TAny]: def without_none(d: Mapping[TKey, Optional[TValue]]) -> Mapping[TKey, TValue]: """Return a new dict with all `None` values removed""" return {k: v for k, v in d.items() if v is not None} + + +def get_full_class_name(obj: Any) -> str: + cls = obj.__class__ + module = cls.__module__ + # exclude 'builtins' for built-in types. + if module is None or module == "builtins": + return cls.__name__ # type: ignore[no-any-return] + return module + "." + cls.__name__ # type: ignore[no-any-return] + + +def get_exception_trace(exc: BaseException) -> ExceptionTrace: + """Get exception trace and additional information for DltException(s)""" + trace: ExceptionTrace = {"message": str(exc), "exception_type": get_full_class_name(exc)} + if exc.__traceback__: + tb_extract = traceback.extract_tb(exc.__traceback__) + trace["stack_trace"] = traceback.format_list(tb_extract) + trace["is_terminal"] = isinstance(exc, TerminalException) + + # get attrs and other props + if isinstance(exc, DltException): + if exc.__doc__: + trace["docstring"] = exc.__doc__ + attrs = exc.attrs() + str_attrs = {} + for k, v in attrs.items(): + if v is None: + continue + try: + from dlt.common.json import json + + # must be json serializable, other attrs are skipped + if not isinstance(v, str): + json.dumps(v) + str_attrs[k] = v + except Exception: + continue + # extract special attrs + if k in ["load_id", "pipeline_name", "source_name", "resource_name", "job_id"]: + trace[k] = v # type: ignore[literal-required] + + trace["exception_attrs"] = str_attrs + return trace + + +def get_exception_trace_chain( + exc: BaseException, traces: List[ExceptionTrace] = None, seen: Set[int] = None +) -> List[ExceptionTrace]: + """Get traces for exception chain. The function will recursively visit all __cause__ and __context__ exceptions. The top level + exception trace is first on the list + """ + traces = traces or [] + seen = seen or set() + # prevent cycles + if id(exc) in seen: + return traces + seen.add(id(exc)) + traces.append(get_exception_trace(exc)) + if exc.__cause__: + return get_exception_trace_chain(exc.__cause__, traces, seen) + elif exc.__context__: + return get_exception_trace_chain(exc.__context__, traces, seen) + return traces diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index 6bce289966..e6091b3c70 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -20,7 +20,7 @@ from dlt.common.configuration.specs.gcp_credentials import ( GcpServiceAccountCredentialsWithoutDefaults, ) -from dlt.common.utils import custom_environ +from dlt.common.utils import custom_environ, get_exception_trace, get_exception_trace_chain from dlt.common.typing import AnyType, DictStrAny, StrAny, TSecretValue, extract_inner_type from dlt.common.configuration.exceptions import ( ConfigFieldMissingTypeHintException, @@ -664,12 +664,24 @@ def test_raises_on_unresolved_field(environment: Any, env_provider: ConfigProvid # toml providers were empty and are not returned in trace # assert trace[1] == LookupTrace("secrets.toml", [], "NoneConfigVar", None) # assert trace[2] == LookupTrace("config.toml", [], "NoneConfigVar", None) + # check the exception trace + exception_traces = get_exception_trace_chain(cf_missing_exc.value) + assert len(exception_traces) == 1 + exception_trace = exception_traces[0] + assert exception_trace["docstring"] == ConfigFieldMissingException.__doc__ + # serialized traces + assert "NoneConfigVar" in exception_trace["exception_attrs"]["traces"] + assert exception_trace["exception_attrs"]["spec_name"] == "WrongConfiguration" + assert exception_trace["exception_attrs"]["fields"] == ["NoneConfigVar"] def test_raises_on_many_unresolved_fields(environment: Any, env_provider: ConfigProvider) -> None: # via make configuration with pytest.raises(ConfigFieldMissingException) as cf_missing_exc: resolve.resolve_configuration(CoercionTestConfiguration()) + # check the exception trace + exception_trace = get_exception_trace(cf_missing_exc.value) + assert cf_missing_exc.value.spec_name == "CoercionTestConfiguration" # get all fields that must be set val_fields = [ @@ -685,10 +697,25 @@ def test_raises_on_many_unresolved_fields(environment: Any, env_provider: Config environ_provider.EnvironProvider.get_key_name(exp_field), None, ) + # field must be in exception trace + assert tr_field in exception_trace["exception_attrs"]["fields"] + assert tr_field in exception_trace["exception_attrs"]["traces"] # assert traces[tr_field][1] == LookupTrace("secrets.toml", [], toml.TomlFileProvider.get_key_name(exp_field), None) # assert traces[tr_field][2] == LookupTrace("config.toml", [], toml.TomlFileProvider.get_key_name(exp_field), None) +def test_removes_trace_value_from_exception_trace_attrs( + environment: Any, env_provider: ConfigProvider +) -> None: + with pytest.raises(ConfigFieldMissingException) as cf_missing_exc: + resolve.resolve_configuration(CoercionTestConfiguration()) + cf_missing_exc.value.traces["str_val"][0] = cf_missing_exc.value.traces["str_val"][0]._replace(value="SECRET") # type: ignore[index] + assert cf_missing_exc.value.traces["str_val"][0].value == "SECRET" + attrs_ = cf_missing_exc.value.attrs() + # values got cleared up + assert attrs_["traces"]["str_val"][0].value is None + + def test_accepts_optional_missing_fields(environment: Any) -> None: # ConfigurationWithOptionalTypes has values for all non optional fields present C = ConfigurationWithOptionalTypes() diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index b65052f149..7cd8e9f1a2 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -3,6 +3,7 @@ import binascii import pytest from typing import Dict +from dlt.common.exceptions import IdentifierTooLongException, PipelineException, TerminalValueError from dlt.common.runners import Venv from dlt.common.utils import ( @@ -18,6 +19,8 @@ increase_row_count, merge_row_counts, extend_list_deduplicated, + get_exception_trace, + get_exception_trace_chain, ) @@ -224,3 +227,53 @@ def test_extend_list_deduplicated() -> None: "three", ] assert extend_list_deduplicated([], ["one", "two", "three"]) == ["one", "two", "three"] + + +def test_exception_traces() -> None: + # bare exception without stack trace + trace = get_exception_trace(Exception("Message")) + assert trace["message"] == "Message" + assert trace["exception_type"] == "Exception" + assert "stack_trace" not in trace + assert trace["is_terminal"] is False + + # dlt exception with traceback + try: + raise IdentifierTooLongException("postgres", "table", "too_long_table", 8) + except Exception as exc: + trace = get_exception_trace(exc) + assert trace["exception_type"] == "dlt.common.exceptions.IdentifierTooLongException" + assert isinstance(trace["stack_trace"], list) + assert trace["exception_attrs"] == { + "destination_name": "postgres", + "identifier_type": "table", + "identifier_name": "too_long_table", + "max_identifier_length": 8, + } + assert trace["is_terminal"] is True + + # dlt exception with additional props + try: + raise PipelineException("test_pipeline", "Message") + except Exception as exc: + trace = get_exception_trace(exc) + assert trace["pipeline_name"] == "test_pipeline" + + +def test_exception_trace_chain() -> None: + try: + raise TerminalValueError("Val") + except Exception: + try: + raise IdentifierTooLongException("postgres", "table", "too_long_table", 8) + except Exception as exc: + try: + # explicit cause + raise PipelineException("test_pipeline", "Message") from exc + except Exception as exc: + traces = get_exception_trace_chain(exc) + # outer exception first + assert len(traces) == 3 + assert traces[0]["exception_type"] == "dlt.common.exceptions.PipelineException" + assert traces[1]["exception_type"] == "dlt.common.exceptions.IdentifierTooLongException" + assert traces[2]["exception_type"] == "dlt.common.exceptions.TerminalValueError" From 7593676e34424137ba372a95070ba0d73f825bfb Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 5 Dec 2023 23:43:08 +0100 Subject: [PATCH 2/3] adds extended exception trace to step trace --- dlt/common/pipeline.py | 1 + dlt/extract/extract.py | 1 - dlt/normalize/normalize.py | 3 +- dlt/pipeline/exceptions.py | 18 +++- dlt/pipeline/pipeline.py | 41 +++++--- dlt/pipeline/trace.py | 134 +++++++++++++++++++------- dlt/pipeline/typing.py | 2 +- tests/pipeline/test_pipeline.py | 7 +- tests/pipeline/test_pipeline_trace.py | 5 +- 9 files changed, 157 insertions(+), 55 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 3bbff3a218..6feee2a812 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -252,6 +252,7 @@ class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo]): def __init__(self) -> None: self._load_id_metrics = {} + self._current_load_id = None def _step_info_start_load_id(self, load_id: str) -> None: self._current_load_id = load_id diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 9815e47440..9dd9cca5b7 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -304,4 +304,3 @@ def get_step_info( started_at, pipeline.first_run, ) - return ExtractInfo(None) diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 64ef0f11fb..e03278acbf 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -26,7 +26,6 @@ from dlt.common.pipeline import ( NormalizeInfo, NormalizeMetrics, - StepInfo, SupportsPipeline, WithStepInfo, ) @@ -194,6 +193,7 @@ def _get_items_normalizer( columns = schema.get_table_columns(table_name) load_storage.write_empty_file(load_id, schema.name, table_name, columns) except Exception: + # TODO: raise a wrapper exception with job_id, load_id, line_no and schema name logger.exception( f"Exception when processing file {extracted_items_file}, line {line_no}" ) @@ -202,7 +202,6 @@ def _get_items_normalizer( load_storage.close_writers(load_id) logger.info(f"Processed total {total_items} items in {len(extracted_items_files)} files") - return schema_updates, total_items, load_storage.closed_files(), row_counts def update_table(self, schema: Schema, schema_updates: List[TSchemaUpdate]) -> None: diff --git a/dlt/pipeline/exceptions.py b/dlt/pipeline/exceptions.py index 8872933299..4071b10f64 100644 --- a/dlt/pipeline/exceptions.py +++ b/dlt/pipeline/exceptions.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Dict from dlt.common.exceptions import PipelineException from dlt.common.pipeline import StepInfo, SupportsPipeline from dlt.pipeline.typing import TPipelineStep @@ -48,23 +48,37 @@ def __init__(self, pipeline_name: str, destination_name: str) -> None: class PipelineStepFailed(PipelineException): + """Raised by run, extract, normalize and load Pipeline methods.""" + def __init__( self, pipeline: SupportsPipeline, step: TPipelineStep, + load_id: str, exception: BaseException, step_info: StepInfo = None, ) -> None: self.pipeline = pipeline self.step = step + self.load_id = load_id self.exception = exception self.step_info = step_info + + package_str = f" when processing package {load_id}" if load_id else "" super().__init__( pipeline.pipeline_name, - f"Pipeline execution failed at stage {step} with" + f"Pipeline execution failed at stage {step}{package_str} with" f" exception:\n\n{type(exception)}\n{exception}", ) + def attrs(self) -> Dict[str, Any]: + # remove attr that should not be published + attrs_ = super().attrs() + attrs_.pop("pipeline") + attrs_.pop("exception") + attrs_.pop("step_info") + return attrs_ + class PipelineStateEngineNoUpgradePathException(PipelineException): def __init__( diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index f34d24e75b..b70a0057c7 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -197,17 +197,22 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any: try: if trace_step: # if there was a step, finish it - end_trace_step(self._trace, trace_step, self, step_info, send_state) + self._trace = end_trace_step( + self._trace, trace_step, self, step_info, send_state + ) if is_new_trace: - assert ( - trace is self._trace - ), f"Messed up trace reference {id(self._trace)} vs {id(trace)}" - end_trace(trace, self, self._pipeline_storage.storage_path, send_state) + assert trace.transaction_id == self._trace.transaction_id, ( + f"Messed up trace reference {self._trace.transaction_id} vs" + f" {trace.transaction_id}" + ) + trace = end_trace( + trace, self, self._pipeline_storage.storage_path, send_state + ) finally: # always end trace if is_new_trace: assert ( - self._trace == trace + self._trace.transaction_id == trace.transaction_id ), f"Messed up trace reference {id(self._trace)} vs {id(trace)}" # if we end new trace that had only 1 step, add it to previous trace # this way we combine several separate calls to extract, normalize, load as single trace @@ -392,7 +397,11 @@ def extract( return self._get_step_info(extract_step) except Exception as exc: raise PipelineStepFailed( - self, "extract", exc, self._get_step_info(extract_step) + self, + "extract", + extract_step.current_load_id, + exc, + self._get_step_info(extract_step), ) from exc @with_runtime_trace() @@ -433,7 +442,11 @@ def normalize( return self._get_step_info(normalize_step) except Exception as n_ex: raise PipelineStepFailed( - self, "normalize", n_ex, self._get_step_info(normalize_step) + self, + "normalize", + normalize_step.current_load_id, + n_ex, + self._get_step_info(normalize_step), ) from n_ex @with_runtime_trace(send_state=True) @@ -471,7 +484,7 @@ def load( raise_on_failed_jobs=raise_on_failed_jobs, _load_storage_config=self._load_storage_config, ) - load = Load( + load_step: Load = Load( self.destination, staging_destination=self.staging, collector=self.collector, @@ -482,12 +495,14 @@ def load( ) try: with signals.delayed_signals(): - runner.run_pool(load.config, load) - info: LoadInfo = self._get_step_info(load) + runner.run_pool(load_step.config, load_step) + info: LoadInfo = self._get_step_info(load_step) self.first_run = False return info except Exception as l_ex: - raise PipelineStepFailed(self, "load", l_ex, self._get_step_info(load)) from l_ex + raise PipelineStepFailed( + self, "load", load_step.current_load_id, l_ex, self._get_step_info(load_step) + ) from l_ex @with_runtime_trace() @with_config_section(("run",)) @@ -705,7 +720,7 @@ def sync_destination( bump_version_if_modified(state) self._save_state(state) except Exception as ex: - raise PipelineStepFailed(self, "run", ex, None) from ex + raise PipelineStepFailed(self, "sync", None, ex, None) from ex def activate(self) -> None: """Activates the pipeline diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index 11d6386aff..0f672c4a4b 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -1,3 +1,5 @@ +import contextlib +from copy import copy import os import pickle import datetime # noqa: 251 @@ -5,20 +7,26 @@ from typing import Any, List, NamedTuple, Optional, Protocol, Sequence import humanize -from dlt.common import pendulum -from dlt.common.runtime.logger import suppress_and_warn -from dlt.common.runtime.exec_info import TExecutionContext, get_execution_context +from dlt.common import pendulum, json from dlt.common.configuration import is_secret_hint +from dlt.common.configuration.exceptions import ContextDefaultCannotBeCreated +from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.configuration.utils import _RESOLVED_TRACES +from dlt.common.configuration.container import Container +from dlt.common.exceptions import ExceptionTrace, ResourceNameNotAvailable +from dlt.common.runtime.logger import suppress_and_warn +from dlt.common.runtime.exec_info import TExecutionContext, get_execution_context from dlt.common.pipeline import ( ExtractInfo, LoadInfo, NormalizeInfo, + PipelineContext, StepInfo, SupportsPipeline, ) -from dlt.common.typing import DictStrAny, StrAny -from dlt.common.utils import uniq_id +from dlt.common.source import get_current_pipe_name +from dlt.common.typing import DictStrAny, StrAny, SupportsHumanize +from dlt.common.utils import uniq_id, get_exception_trace_chain from dlt.pipeline.typing import TPipelineStep from dlt.pipeline.exceptions import PipelineStepFailed @@ -51,8 +59,7 @@ def __str__(self) -> str: return self.asstr(verbosity=0) -@dataclasses.dataclass(init=True) -class _PipelineStepTrace: +class _PipelineStepTrace(NamedTuple): span_id: str step: TPipelineStep started_at: datetime.datetime @@ -61,6 +68,12 @@ class _PipelineStepTrace: """A step outcome info ie. LoadInfo""" step_exception: Optional[str] = None """For failing steps contains exception string""" + exception_traces: List[ExceptionTrace] = None + """For failing steps contains traces of exception chain causing it""" + + +class PipelineStepTrace(SupportsHumanize, _PipelineStepTrace): + """Trace of particular pipeline step, contains timing information, the step outcome info or exception in case of failing step with custom asdict()""" def asstr(self, verbosity: int = 0) -> str: completed_str = "FAILED" if self.step_exception else "COMPLETED" @@ -80,24 +93,30 @@ def asstr(self, verbosity: int = 0) -> str: msg += f"\nspan id: {self.span_id}" return msg - def __str__(self) -> str: - return self.asstr(verbosity=0) - - -class PipelineStepTrace(_PipelineStepTrace): - """Trace of particular pipeline step, contains timing information, the step outcome info or exception in case of failing step with custom asdict()""" - def asdict(self) -> DictStrAny: """A dictionary representation of PipelineStepTrace that can be loaded with `dlt`""" - d = dataclasses.asdict(self) + d = self._asdict() if self.step_info: # name property depending on step name - generates nicer data d[f"{self.step}_info"] = d.pop("step_info") + # replace the attributes in exception traces with json dumps + if self.exception_traces: + # do not modify original traces + d["exception_traces"] = copy(d["exception_traces"]) + traces: List[ExceptionTrace] = d["exception_traces"] + for idx in range(len(traces)): + if traces[idx].get("exception_attrs"): + # trace: ExceptionTrace + trace = traces[idx] = copy(traces[idx]) + trace["exception_attrs"] = str(trace["exception_attrs"]) # type: ignore[typeddict-item] + return d + def __str__(self) -> str: + return self.asstr(verbosity=0) + -@dataclasses.dataclass(init=True) -class PipelineTrace: +class _PipelineTrace(NamedTuple): """Pipeline runtime trace containing data on "extract", "normalize" and "load" steps and resolved config and secret values.""" transaction_id: str @@ -111,6 +130,8 @@ class PipelineTrace: """A list of resolved config values""" engine_version: int = TRACE_ENGINE_VERSION + +class PipelineTrace(SupportsHumanize, _PipelineTrace): def asstr(self, verbosity: int = 0) -> str: last_step = self.steps[-1] completed_str = "FAILED" if last_step.step_exception else "COMPLETED" @@ -139,7 +160,9 @@ def last_pipeline_step_trace(self, step_name: TPipelineStep) -> PipelineStepTrac def asdict(self) -> DictStrAny: """A dictionary representation of PipelineTrace that can be loaded with `dlt`""" - return dataclasses.asdict(self) + d = self._asdict() + d["steps"] = [step.asdict() for step in self.steps] + return d @property def last_extract_info(self) -> ExtractInfo: @@ -195,7 +218,12 @@ def on_end_trace( def start_trace(step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineTrace: trace = PipelineTrace( - uniq_id(), pipeline.pipeline_name, get_execution_context(), pendulum.now(), steps=[] + uniq_id(), + pipeline.pipeline_name, + get_execution_context(), + pendulum.now(), + steps=[], + resolved_config_values=[], ) for module in TRACKING_MODULES: with suppress_and_warn(): @@ -219,24 +247,29 @@ def end_trace_step( pipeline: SupportsPipeline, step_info: Any, send_state: bool, -) -> None: +) -> PipelineTrace: # saves runtime trace of the pipeline if isinstance(step_info, PipelineStepFailed): + exception_traces = get_exception_traces(step_info) step_exception = str(step_info) step_info = step_info.step_info elif isinstance(step_info, Exception): + exception_traces = get_exception_traces(step_info) step_exception = str(step_info) if step_info.__context__: step_exception += "caused by: " + str(step_info.__context__) step_info = None else: step_info = step_info + exception_traces = None step_exception = None - step.finished_at = pendulum.now() - step.step_exception = step_exception - step.step_info = step_info - + step = step._replace( + finished_at=pendulum.now(), + step_exception=step_exception, + exception_traces=exception_traces, + step_info=step_info, + ) resolved_values = map( lambda v: SerializableResolvedValueTrace( v.key, @@ -250,22 +283,24 @@ def end_trace_step( _RESOLVED_TRACES.values(), ) - trace.resolved_config_values = list(resolved_values) + trace.resolved_config_values[:] = list(resolved_values) trace.steps.append(step) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_end_trace_step(trace, step, pipeline, step_info, send_state) + return trace def end_trace( trace: PipelineTrace, pipeline: SupportsPipeline, trace_path: str, send_state: bool -) -> None: - trace.finished_at = pendulum.now() +) -> PipelineTrace: + trace = trace._replace(finished_at=pendulum.now()) if trace_path: save_trace(trace_path, trace) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_end_trace(trace, pipeline, send_state) + return trace def merge_traces(last_trace: PipelineTrace, new_trace: PipelineTrace) -> PipelineTrace: @@ -274,13 +309,12 @@ def merge_traces(last_trace: PipelineTrace, new_trace: PipelineTrace) -> Pipelin return new_trace last_trace.steps.extend(new_trace.steps) - # remember only last 100 steps - last_trace.steps = last_trace.steps[-100:] - # keep the finished up from previous trace - last_trace.finished_at = new_trace.finished_at - last_trace.resolved_config_values = new_trace.resolved_config_values - - return last_trace + # remember only last 100 steps and keep the finished up from previous trace + return last_trace._replace( + steps=last_trace.steps[-100:], + finished_at=new_trace.finished_at, + resolved_config_values=new_trace.resolved_config_values, + ) def save_trace(trace_path: str, trace: PipelineTrace) -> None: @@ -295,3 +329,35 @@ def load_trace(trace_path: str) -> PipelineTrace: except (AttributeError, FileNotFoundError): # on incompatible pickling / file not found return no trace return None + + +def get_exception_traces(exc: BaseException, container: Container = None) -> List[ExceptionTrace]: + """Gets exception trace chain and extend it with data available in Container context""" + traces = get_exception_trace_chain(exc) + container = container or Container() + + # get resource name + resource_name: str = None + with contextlib.suppress(ResourceNameNotAvailable): + resource_name = get_current_pipe_name() + # get source name + source_name: str = None + with contextlib.suppress(ContextDefaultCannotBeCreated): + sections_context = container[ConfigSectionContext] + source_name = sections_context.source_state_key + # get pipeline name + proxy = container[PipelineContext] + if proxy.is_active(): + pipeline_name = proxy.pipeline().pipeline_name + else: + pipeline_name = None + + # apply context to trace + for trace in traces: + # only to dlt exceptions + if "exception_attrs" in trace: + trace.setdefault("resource_name", resource_name) + trace.setdefault("pipeline_name", pipeline_name) + trace.setdefault("source_name", source_name) + + return traces diff --git a/dlt/pipeline/typing.py b/dlt/pipeline/typing.py index 5cd2b2b503..f0192a504d 100644 --- a/dlt/pipeline/typing.py +++ b/dlt/pipeline/typing.py @@ -1,3 +1,3 @@ from typing import Literal -TPipelineStep = Literal["run", "extract", "normalize", "load"] +TPipelineStep = Literal["sync", "extract", "normalize", "load"] diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 15f82655f0..3ebd4f53a2 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -534,8 +534,13 @@ def data_piece_2(): yield [6, 7, 8] raise NotImplementedError() - with pytest.raises(PipelineStepFailed): + with pytest.raises(PipelineStepFailed) as pip_ex: p.run([data_piece_1, data_piece_2], write_disposition="replace") + # male sure that exception has right step info + assert pip_ex.value.load_id in pip_ex.value.step_info.loads_ids + # print(pip_ex.value.load_id) + # print(pip_ex.value.step_info.asdict()) + # print(p._last_trace.last_pipeline_step_trace("extract").exception_traces) # first run didn't really happen assert p.first_run is True diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index d78be702cd..8840c0a26b 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -415,7 +415,10 @@ def assert_trace_printable(trace: PipelineTrace) -> None: str(trace) trace.asstr(0) trace.asstr(1) - json.dumps(trace) + trace.asdict() with io.BytesIO() as b: json.typed_dump(trace, b) b.getvalue() + json.dumps(trace) + # assert trace_dict == json.loads(trace_str) + # print(trace_dict) From f60b9353828955bb04886d3afd844f58a2c3ae5c Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 6 Dec 2023 11:53:56 +0100 Subject: [PATCH 3/3] skips config location checks for unknown modules --- dlt/common/configuration/exceptions.py | 27 +++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/dlt/common/configuration/exceptions.py b/dlt/common/configuration/exceptions.py index a3927102c3..1d8423057f 100644 --- a/dlt/common/configuration/exceptions.py +++ b/dlt/common/configuration/exceptions.py @@ -60,19 +60,20 @@ def __str__(self) -> str: msg += f"\t\tIn {tr.provider} key {tr.key} was not found.\n" # check if entry point is run with path. this is common problem so warn the user main_path = main_module_file_path() - main_dir = os.path.dirname(main_path) - abs_main_dir = os.path.abspath(main_dir) - if abs_main_dir != os.getcwd(): - # directory was specified - msg += ( - "WARNING: dlt looks for .dlt folder in your current working directory and your cwd" - " (%s) is different from directory of your pipeline script (%s).\n" - % (os.getcwd(), abs_main_dir) - ) - msg += ( - "If you keep your secret files in the same folder as your pipeline script but run" - " your script from some other folder, secrets/configs will not be found\n" - ) + if main_path: + main_dir = os.path.dirname(main_path) + abs_main_dir = os.path.abspath(main_dir) + if abs_main_dir != os.getcwd(): + # directory was specified + msg += ( + "WARNING: dlt looks for .dlt folder in your current working directory and your" + " cwd (%s) is different from directory of your pipeline script (%s).\n" + % (os.getcwd(), abs_main_dir) + ) + msg += ( + "If you keep your secret files in the same folder as your pipeline script but" + " run your script from some other folder, secrets/configs will not be found\n" + ) msg += ( "Please refer to https://dlthub.com/docs/general-usage/credentials for more" " information\n"