Skip to content

Commit

Permalink
add snapshot to the report - models, lineage, and groups
Browse files Browse the repository at this point in the history
  • Loading branch information
NoyaArie committed Sep 15, 2024
1 parent 9ac1d91 commit 22d0ac0
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 6 deletions.
2 changes: 2 additions & 0 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
Expand All @@ -30,6 +31,7 @@
NormalizedExposureSchema,
NormalizedTestSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
]


Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/api/lineage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from elementary.utils.pydantic_shim import BaseModel, validator

NodeUniqueIdType = str
NodeType = Literal["seed", "model", "source", "exposure"]
NodeType = Literal["snapshot", "seed", "model", "source", "exposure"]
NodeSubType = Literal["table", "view"]


Expand Down
32 changes: 30 additions & 2 deletions elementary/monitor/api/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
Expand All @@ -26,6 +27,7 @@
from elementary.monitor.fetchers.models.schema import (
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand All @@ -36,6 +38,7 @@
class ModelsAPI(APIClient):
_ARTIFACT_TYPE_DIR_MAP = {
SeedSchema: "seeds",
SnapshotSchema: "snapshots",
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
Expand Down Expand Up @@ -133,6 +136,18 @@ def get_seeds(self) -> Dict[str, NormalizedSeedSchema]:
seeds[seed_unique_id] = normalized_seed
return seeds

def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]:
snapshot_results = self.models_fetcher.get_snapshots()
snapshots = dict()
if snapshot_results:
for snapshot_result in snapshot_results:
logger.info(f"3 ----- snapshot_result: {snapshot_result}")
normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result)
snapshot_unique_id = cast(str, normalized_snapshot.unique_id)
logger.info(f"4 ----- snapshot_unique_id: {snapshot_unique_id}")
snapshots[snapshot_unique_id] = normalized_snapshot
return snapshots

def get_models(
self, exclude_elementary_models: bool = False
) -> Dict[str, NormalizedModelSchema]:
Expand Down Expand Up @@ -239,6 +254,12 @@ def _normalize_dbt_artifact_dict(
) -> NormalizedSeedSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: SnapshotSchema
) -> NormalizedSnapshotSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: ModelSchema
Expand All @@ -258,15 +279,20 @@ def _normalize_dbt_artifact_dict(
...

def _normalize_dbt_artifact_dict(
self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
self,
artifact: Union[
SeedSchema, SnapshotSchema, ModelSchema, ExposureSchema, SourceSchema
],
) -> Union[
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedModelSchema,
NormalizedExposureSchema,
NormalizedSourceSchema,
]:
schema_to_normalized_schema_map = {
SeedSchema: NormalizedSeedSchema,
SnapshotSchema: NormalizedSnapshotSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
Expand Down Expand Up @@ -308,7 +334,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
artifact: Union[
ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema
],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/api/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ExposureSchema,
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -41,6 +42,11 @@ class NormalizedSeedSchema(NormalizedArtifactSchema, SeedSchema):
artifact_type: str = Field("seed", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema):
artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedModelSchema(NormalizedArtifactSchema, ModelSchema):
artifact_type: str = Field("model", const=True) # type: ignore # noqa
Expand Down
19 changes: 16 additions & 3 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
Expand Down Expand Up @@ -47,11 +48,19 @@ def _get_groups(
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
seeds: Iterable[NormalizedSeedSchema],
snapshots: Iterable[NormalizedSnapshotSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
artifacts=[
*models,
*sources,
*exposures,
*seeds,
*snapshots,
*singular_tests,
]
)

def get_report_data(
Expand Down Expand Up @@ -86,6 +95,8 @@ def get_report_data(
lineage_node_ids: List[str] = []
seeds = models_api.get_seeds()
lineage_node_ids.extend(seeds.keys())
snapshots = models_api.get_snapshots()
lineage_node_ids.extend(snapshots.keys())
models = models_api.get_models(exclude_elementary_models)
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
Expand All @@ -99,6 +110,7 @@ def get_report_data(
sources.values(),
exposures.values(),
seeds.values(),
snapshots.values(),
singular_tests,
)

Expand Down Expand Up @@ -147,7 +159,7 @@ def get_report_data(

serializable_groups = groups.dict()
serializable_models = self._serialize_models(
models, sources, exposures, seeds
models, sources, exposures, seeds, snapshots
)
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
Expand Down Expand Up @@ -209,8 +221,9 @@ def _serialize_models(
sources: Dict[str, NormalizedSourceSchema],
exposures: Dict[str, NormalizedExposureSchema],
seeds: Dict[str, NormalizedSeedSchema],
snapshots: Dict[str, NormalizedSnapshotSchema],
) -> Dict[str, dict]:
nodes = dict(**models, **sources, **exposures, **seeds)
nodes = dict(**models, **sources, **exposures, **seeds, **snapshots)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
'seed' as type
from {{ ref('elementary', 'dbt_seeds') }}
union all
select
unique_id,
null as depends_on_nodes,
null as materialization,
'snapshot' as type
from {{ ref('elementary', 'dbt_snapshots') }}
union all
select
unique_id,
depends_on_nodes,
Expand Down
24 changes: 24 additions & 0 deletions elementary/monitor/dbt_project/macros/get_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{% macro get_snapshots() %}
{% set get_snapshots_query %}
with dbt_artifacts_snapshots as (
select
name,
unique_id,
database_name,
schema_name,
case when alias is not null then alias
else name end as table_name,
owner as owners,
tags,
package_name,
description,
original_path as full_path
from {{ ref('elementary', 'dbt_snapshots') }}
)

select * from dbt_artifacts_snapshots
{% endset %}

{% set snapshots_agate = run_query(get_snapshots_query) %}
{% do return(elementary.agate_to_dicts(snapshots_agate)) %}
{% endmacro %}
11 changes: 11 additions & 0 deletions elementary/monitor/fetchers/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ModelSchema,
ModelTestCoverage,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand Down Expand Up @@ -43,6 +44,16 @@ def get_seeds(self) -> List[SeedSchema]:
seeds = [SeedSchema(**seed) for seed in seeds]
return seeds

def get_snapshots(self) -> List[SnapshotSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_snapshots"
)
snapshots = (
json.loads(run_operation_response[0]) if run_operation_response else []
)
snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots]
return snapshots

def get_models(self, exclude_elementary_models: bool = False) -> List[ModelSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_models",
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/fetchers/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def load_meta(cls, meta):
ArtifactSchemaType = TypeVar("ArtifactSchemaType", bound=ArtifactSchema)


class SnapshotSchema(ArtifactSchema):
database_name: Optional[str] = None
schema_name: str
table_name: str


class SeedSchema(ArtifactSchema):
database_name: Optional[str] = None
schema_name: str
Expand Down

0 comments on commit 22d0ac0

Please sign in to comment.