Skip to content

Commit

Permalink
Merge pull request #806 from dlt-hub/rfix/adds-exception-traces
Browse files Browse the repository at this point in the history
adds exception traces
  • Loading branch information
sh-rp authored Dec 6, 2023
2 parents 88ba90f + f60b935 commit df39b72
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 74 deletions.
38 changes: 24 additions & 14 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence
from typing import Any, Dict, Mapping, Type, Tuple, NamedTuple, Sequence

from dlt.common.exceptions import DltException, TerminalException
from dlt.common.utils import main_module_file_path
Expand Down Expand Up @@ -60,25 +60,35 @@ def __str__(self) -> str:
msg += f"\t\tIn {tr.provider} key {tr.key} was not found.\n"
# check if entry point is run with path. this is common problem so warn the user
main_path = main_module_file_path()
main_dir = os.path.dirname(main_path)
abs_main_dir = os.path.abspath(main_dir)
if abs_main_dir != os.getcwd():
# directory was specified
msg += (
"WARNING: dlt looks for .dlt folder in your current working directory and your cwd"
" (%s) is different from directory of your pipeline script (%s).\n"
% (os.getcwd(), abs_main_dir)
)
msg += (
"If you keep your secret files in the same folder as your pipeline script but run"
" your script from some other folder, secrets/configs will not be found\n"
)
if main_path:
main_dir = os.path.dirname(main_path)
abs_main_dir = os.path.abspath(main_dir)
if abs_main_dir != os.getcwd():
# directory was specified
msg += (
"WARNING: dlt looks for .dlt folder in your current working directory and your"
" cwd (%s) is different from directory of your pipeline script (%s).\n"
% (os.getcwd(), abs_main_dir)
)
msg += (
"If you keep your secret files in the same folder as your pipeline script but"
" run your script from some other folder, secrets/configs will not be found\n"
)
msg += (
"Please refer to https://dlthub.com/docs/general-usage/credentials for more"
" information\n"
)
return msg

def attrs(self) -> Dict[str, Any]:
attrs_ = super().attrs()
if "traces" in attrs_:
for _, traces in self.traces.items():
for idx, trace in enumerate(traces):
# drop all values as they may contain secrets
traces[idx] = trace._replace(value=None) # type: ignore[index]
return attrs_


class UnmatchedConfigHintResolversException(ConfigurationException):
"""Raised when using `@resolve_type` on a field that doesn't exist in the spec"""
Expand Down
33 changes: 32 additions & 1 deletion dlt/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
from typing import Any, AnyStr, List, Sequence, Optional, Iterable
from typing import Any, AnyStr, Dict, List, Sequence, Optional, Iterable, TypedDict


class ExceptionTrace(TypedDict, total=False):
"""Exception trace. NOTE: we intend to change it with an extended line by line trace with code snippets"""

message: str
exception_type: str
docstring: str
stack_trace: List[str]
is_terminal: bool
"""Says if exception is terminal if happened to a job during load step"""
exception_attrs: Dict[str, Any]
"""Public attributes of an exception deriving from DltException (not starting with _)"""
load_id: str
"""Load id if found in exception attributes"""
pipeline_name: str
"""Pipeline name if found in exception attributes or in the active pipeline (Container)"""
source_name: str
"""Source name if found in exception attributes or in Container"""
resource_name: str
"""Resource name if found in exception attributes"""
job_id: str
"""Job id if found in exception attributes"""


class DltException(Exception):
def __reduce__(self) -> Any:
"""Enables exceptions with parametrized constructor to be pickled"""
return type(self).__new__, (type(self), *self.args), self.__dict__

def attrs(self) -> Dict[str, Any]:
"""Returns "public" attributes of the DltException"""
return {
k: v
for k, v in vars(self).items()
if not k.startswith("_") and not callable(v) and not hasattr(self.__class__, k)
}


class UnsupportedProcessStartMethodException(DltException):
def __init__(self, method: str) -> None:
Expand Down
10 changes: 7 additions & 3 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from dlt.common.exceptions import (
DestinationHasFailedJobs,
PipelineStateNotAvailable,
ResourceNameNotAvailable,
SourceSectionNotAvailable,
)
from dlt.common.schema import Schema
Expand Down Expand Up @@ -90,6 +89,7 @@ def asdict(self) -> DictStrAny:
d = self._asdict()
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
d["load_packages"] = [package.asdict() for package in self.load_packages]
# TODO: transform and leave metrics when we have them implemented
d.pop("metrics")
d.pop("extract_data_info")
return d
Expand Down Expand Up @@ -252,6 +252,7 @@ class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo]):

def __init__(self) -> None:
self._load_id_metrics = {}
self._current_load_id = None

def _step_info_start_load_id(self, load_id: str) -> None:
self._current_load_id = load_id
Expand All @@ -268,6 +269,11 @@ def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> No
def _step_info_metrics(self, load_id: str) -> TStepMetrics:
return self._load_id_metrics[load_id]

@property
def current_load_id(self) -> str:
"""Returns currently processing load id"""
return self._current_load_id

@abstractmethod
def get_step_info(
self,
Expand Down Expand Up @@ -575,8 +581,6 @@ def resource_state(
# backtrace to find the shallowest resource
if not resource_name:
resource_name = get_current_pipe_name()
if not resource_name:
raise ResourceNameNotAvailable()
return state_.setdefault("resources", {}).setdefault(resource_name, {}) # type: ignore


Expand Down
65 changes: 65 additions & 0 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from functools import wraps
from os import environ
from types import ModuleType
import traceback
import zlib

from typing import (
Expand All @@ -28,6 +29,7 @@
)
from collections.abc import Mapping as C_Mapping

from dlt.common.exceptions import DltException, ExceptionTrace, TerminalException
from dlt.common.typing import AnyFun, StrAny, DictStrAny, StrStr, TAny, TFun


Expand Down Expand Up @@ -510,3 +512,66 @@ def maybe_context(manager: ContextManager[TAny]) -> Iterator[TAny]:
def without_none(d: Mapping[TKey, Optional[TValue]]) -> Mapping[TKey, TValue]:
"""Return a new dict with all `None` values removed"""
return {k: v for k, v in d.items() if v is not None}


def get_full_class_name(obj: Any) -> str:
cls = obj.__class__
module = cls.__module__
# exclude 'builtins' for built-in types.
if module is None or module == "builtins":
return cls.__name__ # type: ignore[no-any-return]
return module + "." + cls.__name__ # type: ignore[no-any-return]


def get_exception_trace(exc: BaseException) -> ExceptionTrace:
"""Get exception trace and additional information for DltException(s)"""
trace: ExceptionTrace = {"message": str(exc), "exception_type": get_full_class_name(exc)}
if exc.__traceback__:
tb_extract = traceback.extract_tb(exc.__traceback__)
trace["stack_trace"] = traceback.format_list(tb_extract)
trace["is_terminal"] = isinstance(exc, TerminalException)

# get attrs and other props
if isinstance(exc, DltException):
if exc.__doc__:
trace["docstring"] = exc.__doc__
attrs = exc.attrs()
str_attrs = {}
for k, v in attrs.items():
if v is None:
continue
try:
from dlt.common.json import json

# must be json serializable, other attrs are skipped
if not isinstance(v, str):
json.dumps(v)
str_attrs[k] = v
except Exception:
continue
# extract special attrs
if k in ["load_id", "pipeline_name", "source_name", "resource_name", "job_id"]:
trace[k] = v # type: ignore[literal-required]

trace["exception_attrs"] = str_attrs
return trace


def get_exception_trace_chain(
exc: BaseException, traces: List[ExceptionTrace] = None, seen: Set[int] = None
) -> List[ExceptionTrace]:
"""Get traces for exception chain. The function will recursively visit all __cause__ and __context__ exceptions. The top level
exception trace is first on the list
"""
traces = traces or []
seen = seen or set()
# prevent cycles
if id(exc) in seen:
return traces
seen.add(id(exc))
traces.append(get_exception_trace(exc))
if exc.__cause__:
return get_exception_trace_chain(exc.__cause__, traces, seen)
elif exc.__context__:
return get_exception_trace_chain(exc.__context__, traces, seen)
return traces
1 change: 0 additions & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,3 @@ def get_step_info(
started_at,
pipeline.first_run,
)
return ExtractInfo(None)
3 changes: 1 addition & 2 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from dlt.common.pipeline import (
NormalizeInfo,
NormalizeMetrics,
StepInfo,
SupportsPipeline,
WithStepInfo,
)
Expand Down Expand Up @@ -194,6 +193,7 @@ def _get_items_normalizer(
columns = schema.get_table_columns(table_name)
load_storage.write_empty_file(load_id, schema.name, table_name, columns)
except Exception:
# TODO: raise a wrapper exception with job_id, load_id, line_no and schema name
logger.exception(
f"Exception when processing file {extracted_items_file}, line {line_no}"
)
Expand All @@ -202,7 +202,6 @@ def _get_items_normalizer(
load_storage.close_writers(load_id)

logger.info(f"Processed total {total_items} items in {len(extracted_items_files)} files")

return schema_updates, total_items, load_storage.closed_files(), row_counts

def update_table(self, schema: Schema, schema_updates: List[TSchemaUpdate]) -> None:
Expand Down
18 changes: 16 additions & 2 deletions dlt/pipeline/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Dict
from dlt.common.exceptions import PipelineException
from dlt.common.pipeline import StepInfo, SupportsPipeline
from dlt.pipeline.typing import TPipelineStep
Expand Down Expand Up @@ -48,23 +48,37 @@ def __init__(self, pipeline_name: str, destination_name: str) -> None:


class PipelineStepFailed(PipelineException):
"""Raised by run, extract, normalize and load Pipeline methods."""

def __init__(
self,
pipeline: SupportsPipeline,
step: TPipelineStep,
load_id: str,
exception: BaseException,
step_info: StepInfo = None,
) -> None:
self.pipeline = pipeline
self.step = step
self.load_id = load_id
self.exception = exception
self.step_info = step_info

package_str = f" when processing package {load_id}" if load_id else ""
super().__init__(
pipeline.pipeline_name,
f"Pipeline execution failed at stage {step} with"
f"Pipeline execution failed at stage {step}{package_str} with"
f" exception:\n\n{type(exception)}\n{exception}",
)

def attrs(self) -> Dict[str, Any]:
# remove attr that should not be published
attrs_ = super().attrs()
attrs_.pop("pipeline")
attrs_.pop("exception")
attrs_.pop("step_info")
return attrs_


class PipelineStateEngineNoUpgradePathException(PipelineException):
def __init__(
Expand Down
41 changes: 28 additions & 13 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,22 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any:
try:
if trace_step:
# if there was a step, finish it
end_trace_step(self._trace, trace_step, self, step_info, send_state)
self._trace = end_trace_step(
self._trace, trace_step, self, step_info, send_state
)
if is_new_trace:
assert (
trace is self._trace
), f"Messed up trace reference {id(self._trace)} vs {id(trace)}"
end_trace(trace, self, self._pipeline_storage.storage_path, send_state)
assert trace.transaction_id == self._trace.transaction_id, (
f"Messed up trace reference {self._trace.transaction_id} vs"
f" {trace.transaction_id}"
)
trace = end_trace(
trace, self, self._pipeline_storage.storage_path, send_state
)
finally:
# always end trace
if is_new_trace:
assert (
self._trace == trace
self._trace.transaction_id == trace.transaction_id
), f"Messed up trace reference {id(self._trace)} vs {id(trace)}"
# if we end new trace that had only 1 step, add it to previous trace
# this way we combine several separate calls to extract, normalize, load as single trace
Expand Down Expand Up @@ -392,7 +397,11 @@ def extract(
return self._get_step_info(extract_step)
except Exception as exc:
raise PipelineStepFailed(
self, "extract", exc, self._get_step_info(extract_step)
self,
"extract",
extract_step.current_load_id,
exc,
self._get_step_info(extract_step),
) from exc

@with_runtime_trace()
Expand Down Expand Up @@ -433,7 +442,11 @@ def normalize(
return self._get_step_info(normalize_step)
except Exception as n_ex:
raise PipelineStepFailed(
self, "normalize", n_ex, self._get_step_info(normalize_step)
self,
"normalize",
normalize_step.current_load_id,
n_ex,
self._get_step_info(normalize_step),
) from n_ex

@with_runtime_trace(send_state=True)
Expand Down Expand Up @@ -471,7 +484,7 @@ def load(
raise_on_failed_jobs=raise_on_failed_jobs,
_load_storage_config=self._load_storage_config,
)
load = Load(
load_step: Load = Load(
self.destination,
staging_destination=self.staging,
collector=self.collector,
Expand All @@ -482,12 +495,14 @@ def load(
)
try:
with signals.delayed_signals():
runner.run_pool(load.config, load)
info: LoadInfo = self._get_step_info(load)
runner.run_pool(load_step.config, load_step)
info: LoadInfo = self._get_step_info(load_step)
self.first_run = False
return info
except Exception as l_ex:
raise PipelineStepFailed(self, "load", l_ex, self._get_step_info(load)) from l_ex
raise PipelineStepFailed(
self, "load", load_step.current_load_id, l_ex, self._get_step_info(load_step)
) from l_ex

@with_runtime_trace()
@with_config_section(("run",))
Expand Down Expand Up @@ -705,7 +720,7 @@ def sync_destination(
bump_version_if_modified(state)
self._save_state(state)
except Exception as ex:
raise PipelineStepFailed(self, "run", ex, None) from ex
raise PipelineStepFailed(self, "sync", None, ex, None) from ex

def activate(self) -> None:
"""Activates the pipeline
Expand Down
Loading

0 comments on commit df39b72

Please sign in to comment.