From 309c54135c7eb25c4b2526ebccb28d61ddb4b375 Mon Sep 17 00:00:00 2001 From: Elon Gliksberg Date: Thu, 5 Sep 2024 12:04:09 +0300 Subject: [PATCH] Added seeds to lineage. --- elementary/monitor/api/groups/groups.py | 2 + elementary/monitor/api/lineage/schema.py | 17 +- elementary/monitor/api/models/models.py | 45 +- elementary/monitor/api/models/schema.py | 6 + elementary/monitor/api/report/report.py | 19 +- .../monitor/data_monitoring/report/index.html | 832 +++++++++--------- .../macros/alerts/population/model_alerts.sql | 4 +- .../macros/get_nodes_depends_on_nodes.sql | 7 + .../monitor/dbt_project/macros/get_seeds.sql | 24 + .../dbt_project/models/alerts/alerts_v2.sql | 2 + elementary/monitor/fetchers/models/models.py | 17 +- elementary/monitor/fetchers/models/schema.py | 6 + 12 files changed, 529 insertions(+), 452 deletions(-) create mode 100644 elementary/monitor/dbt_project/macros/get_seeds.sql diff --git a/elementary/monitor/api/groups/groups.py b/elementary/monitor/api/groups/groups.py index 083da6d78..ba1d8a804 100644 --- a/elementary/monitor/api/groups/groups.py +++ b/elementary/monitor/api/groups/groups.py @@ -14,6 +14,7 @@ from elementary.monitor.api.models.schema import ( NormalizedExposureSchema, NormalizedModelSchema, + NormalizedSeedSchema, NormalizedSourceSchema, ) from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema @@ -28,6 +29,7 @@ NormalizedSourceSchema, NormalizedExposureSchema, NormalizedTestSchema, + NormalizedSeedSchema, ] diff --git a/elementary/monitor/api/lineage/schema.py b/elementary/monitor/api/lineage/schema.py index b24270ad4..e8c341bbf 100644 --- a/elementary/monitor/api/lineage/schema.py +++ b/elementary/monitor/api/lineage/schema.py @@ -1,4 +1,3 @@ -import re from typing import List, Literal, Optional, Tuple import networkx as nx @@ -6,13 +5,10 @@ from elementary.utils.pydantic_shim import BaseModel, validator NodeUniqueIdType = str -NodeType = Literal["model", "source", "exposure"] +NodeType = Literal["seed", "model", "source", "exposure"] NodeSubType = Literal["table", "view"] -_SEED_PATH_PATTERN = re.compile(r"^seed\.") - - class LineageNodeSchema(BaseModel): id: NodeUniqueIdType type: NodeType @@ -51,15 +47,4 @@ class NodeDependsOnNodesSchema(BaseModel): @validator("depends_on_nodes", pre=True, always=True) def set_depends_on_nodes(cls, depends_on_nodes): formatted_depends_on = depends_on_nodes or [] - formatted_depends_on = [ - cls._format_node_id(node_id) for node_id in formatted_depends_on - ] return [node_id for node_id in formatted_depends_on if node_id] - - @classmethod - def _format_node_id(cls, node_id: str): - # Currently we don't save seeds in our artifacts. - # We remove seeds from the lineage graph (as long as we don't support them). - if re.search(_SEED_PATH_PATTERN, node_id): - return None - return node_id diff --git a/elementary/monitor/api/models/models.py b/elementary/monitor/api/models/models.py index 8fd4f7b7b..5d9c54d15 100644 --- a/elementary/monitor/api/models/models.py +++ b/elementary/monitor/api/models/models.py @@ -2,7 +2,7 @@ import os import statistics from collections import defaultdict -from typing import Dict, List, Optional, Set, Union, overload +from typing import Dict, List, Optional, Set, Union, cast, overload from elementary.clients.api.api_client import APIClient from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner @@ -13,6 +13,7 @@ ModelRunsWithTotalsSchema, NormalizedExposureSchema, NormalizedModelSchema, + NormalizedSeedSchema, NormalizedSourceSchema, TotalsModelRunsSchema, ) @@ -22,7 +23,11 @@ from elementary.monitor.fetchers.models.schema import ( ModelRunSchema as FetcherModelRunSchema, ) -from elementary.monitor.fetchers.models.schema import ModelSchema, SourceSchema +from elementary.monitor.fetchers.models.schema import ( + ModelSchema, + SeedSchema, + SourceSchema, +) from elementary.utils.log import get_logger logger = get_logger(__name__) @@ -30,6 +35,7 @@ class ModelsAPI(APIClient): _ARTIFACT_TYPE_DIR_MAP = { + SeedSchema: "seeds", SourceSchema: "sources", ModelSchema: "models", ExposureSchema: "exposures", @@ -117,6 +123,16 @@ def _get_model_runs_totals( success_runs = len([run for run in runs if run.status == "success"]) return TotalsModelRunsSchema(errors=error_runs, success=success_runs) + def get_seeds(self) -> Dict[str, NormalizedSeedSchema]: + seed_results = self.models_fetcher.get_seeds() + seeds = dict() + if seed_results: + for seed_result in seed_results: + normalized_seed = self._normalize_dbt_artifact_dict(seed_result) + seed_unique_id = cast(str, normalized_seed.unique_id) + seeds[seed_unique_id] = normalized_seed + return seeds + def get_models( self, exclude_elementary_models: bool = False ) -> Dict[str, NormalizedModelSchema]: @@ -127,12 +143,7 @@ def get_models( if models_results: for model_result in models_results: normalized_model = self._normalize_dbt_artifact_dict(model_result) - - model_unique_id = normalized_model.unique_id - if model_unique_id is None: - # Shouldn't happen, but handling this case for mypy - continue - + model_unique_id = cast(str, normalized_model.unique_id) models[model_unique_id] = normalized_model return models @@ -222,6 +233,12 @@ def _exposure_has_upstream_node( for dep in exposure.depends_on_nodes ) + @overload + def _normalize_dbt_artifact_dict( + self, artifact: SeedSchema + ) -> NormalizedSeedSchema: + ... + @overload def _normalize_dbt_artifact_dict( self, artifact: ModelSchema @@ -241,9 +258,15 @@ def _normalize_dbt_artifact_dict( ... def _normalize_dbt_artifact_dict( - self, artifact: Union[ModelSchema, ExposureSchema, SourceSchema] - ) -> Union[NormalizedModelSchema, NormalizedExposureSchema, NormalizedSourceSchema]: + self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema] + ) -> Union[ + NormalizedSeedSchema, + NormalizedModelSchema, + NormalizedExposureSchema, + NormalizedSourceSchema, + ]: schema_to_normalized_schema_map = { + SeedSchema: NormalizedSeedSchema, ExposureSchema: NormalizedExposureSchema, ModelSchema: NormalizedModelSchema, SourceSchema: NormalizedSourceSchema, @@ -285,7 +308,7 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str @classmethod def _fqn( cls, - artifact: Union[ModelSchema, ExposureSchema, SourceSchema], + artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema], ) -> str: if isinstance(artifact, ExposureSchema): path = (artifact.meta or {}).get("path") diff --git a/elementary/monitor/api/models/schema.py b/elementary/monitor/api/models/schema.py index c1bc1c694..f9bbf9f19 100644 --- a/elementary/monitor/api/models/schema.py +++ b/elementary/monitor/api/models/schema.py @@ -6,6 +6,7 @@ from elementary.monitor.fetchers.models.schema import ( ExposureSchema, ModelSchema, + SeedSchema, SourceSchema, ) from elementary.utils.pydantic_shim import BaseModel, Field, validator @@ -35,6 +36,11 @@ def format_normalized_full_path_sep(cls, normalized_full_path: str) -> str: return posixpath.sep.join(normalized_full_path.split(os.path.sep)) +# NormalizedArtifactSchema must be first in the inheritance order +class NormalizedSeedSchema(NormalizedArtifactSchema, SeedSchema): + artifact_type: str = Field("seed", const=True) # type: ignore # noqa + + # NormalizedArtifactSchema must be first in the inheritance order class NormalizedModelSchema(NormalizedArtifactSchema, ModelSchema): artifact_type: str = Field("model", const=True) # type: ignore # noqa diff --git a/elementary/monitor/api/report/report.py b/elementary/monitor/api/report/report.py index 02a747816..df09cb410 100644 --- a/elementary/monitor/api/report/report.py +++ b/elementary/monitor/api/report/report.py @@ -13,6 +13,7 @@ ModelRunsSchema, NormalizedExposureSchema, NormalizedModelSchema, + NormalizedSeedSchema, NormalizedSourceSchema, ) from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema @@ -41,11 +42,12 @@ def _get_groups( models: Iterable[NormalizedModelSchema], sources: Iterable[NormalizedSourceSchema], exposures: Iterable[NormalizedExposureSchema], + seeds: Iterable[NormalizedSeedSchema], singular_tests: Iterable[NormalizedTestSchema], ) -> GroupsSchema: groups_api = GroupsAPI(self.dbt_runner) return groups_api.get_groups( - artifacts=[*models, *sources, *exposures, *singular_tests] + artifacts=[*models, *sources, *exposures, *seeds, *singular_tests] ) def get_report_data( @@ -78,6 +80,8 @@ def get_report_data( invocations_api = InvocationsAPI(dbt_runner=self.dbt_runner) lineage_node_ids: List[str] = [] + seeds = models_api.get_seeds() + lineage_node_ids.extend(seeds.keys()) models = models_api.get_models(exclude_elementary_models) lineage_node_ids.extend(models.keys()) sources = models_api.get_sources() @@ -87,7 +91,11 @@ def get_report_data( singular_tests = tests_api.get_singular_tests() groups = self._get_groups( - models.values(), sources.values(), exposures.values(), singular_tests + models.values(), + sources.values(), + exposures.values(), + seeds.values(), + singular_tests, ) models_runs = models_api.get_models_runs( @@ -133,7 +141,9 @@ def get_report_data( ) serializable_groups = groups.dict() - serializable_models = self._serialize_models(models, sources, exposures) + serializable_models = self._serialize_models( + models, sources, exposures, seeds + ) serializable_model_runs = self._serialize_models_runs(models_runs.runs) serializable_model_runs_totals = models_runs.dict(include={"totals"})[ "totals" @@ -191,8 +201,9 @@ def _serialize_models( models: Dict[str, NormalizedModelSchema], sources: Dict[str, NormalizedSourceSchema], exposures: Dict[str, NormalizedExposureSchema], + seeds: Dict[str, NormalizedSeedSchema], ) -> Dict[str, dict]: - nodes = dict(**models, **sources, **exposures) + nodes = dict(**models, **sources, **exposures, **seeds) serializable_nodes = dict() for key in nodes.keys(): serializable_nodes[key] = dict(nodes[key]) diff --git a/elementary/monitor/data_monitoring/report/index.html b/elementary/monitor/data_monitoring/report/index.html index 462ab1af5..04d502768 100644 --- a/elementary/monitor/data_monitoring/report/index.html +++ b/elementary/monitor/data_monitoring/report/index.html @@ -30,7 +30,7 @@