Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added new groups #1667

Merged
merged 2 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.groups.schema import (
DbtGroupSchema,
GroupItemSchema,
GroupsSchema,
OwnersGroupSchema,
TagsGroupSchema,
TreeGroupSchema,
)
from elementary.monitor.api.groups.tree_builder import TreeBuilder
from elementary.monitor.api.models.schema import (
NormalizedExposureSchema,
NormalizedModelSchema,
Expand All @@ -35,38 +36,36 @@ def get_groups(self, artifacts: List[GROUPABLE_ARTIFACT]) -> GroupsSchema:
dbt_group = self.get_dbt_group(artifacts)
tags_group = self.get_tags_group(artifacts)
owners_group = self.get_owners_group(artifacts)
return GroupsSchema(dbt=dbt_group, tags=tags_group, owners=owners_group)
dwh_group = self.get_dwh_group(artifacts)
return GroupsSchema(
dbt=dbt_group, dwh=dwh_group, tags=tags_group, owners=owners_group
)

def get_dbt_group(
self,
artifacts: List[GROUPABLE_ARTIFACT],
) -> DbtGroupSchema:
group: DbtGroupSchema = dict()
) -> TreeGroupSchema:
tree_builder = TreeBuilder[GroupItemSchema](seperator=posixpath.sep)
for artifact in artifacts:
if artifact.unique_id is None:
continue
self._update_dbt_group(dbt_group=group, artifact=artifact)
return group

def _update_dbt_group(
self,
dbt_group: dict,
artifact: GROUPABLE_ARTIFACT,
) -> None:
if artifact.unique_id is None or artifact.normalized_full_path is None:
return
tree_builder.add(
path=artifact.normalized_full_path, data=self._get_group_item(artifact)
)
return tree_builder.get_tree()

artifact_full_path_split = artifact.normalized_full_path.split(posixpath.sep)
for part in artifact_full_path_split[:-1]:
if part not in dbt_group:
dbt_group[part] = {}
dbt_group = dbt_group[part]

if FILES_GROUP_KEYWORD in dbt_group:
if artifact.unique_id not in dbt_group[FILES_GROUP_KEYWORD]:
dbt_group[FILES_GROUP_KEYWORD].append(self._get_group_item(artifact))
else:
dbt_group[FILES_GROUP_KEYWORD] = [self._get_group_item(artifact)]
def get_dwh_group(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema:
tree_builder = TreeBuilder[GroupItemSchema](seperator=".")
relevant_artifacts = (
artifact
for artifact in artifacts
if artifact.unique_id is not None
and artifact.fqn is not None
and isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema))
)
for artifact in relevant_artifacts:
tree_builder.add(path=artifact.fqn, data=self._get_group_item(artifact))
return tree_builder.get_tree()

def get_tags_group(
self,
Expand Down
8 changes: 5 additions & 3 deletions elementary/monitor/api/groups/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

from elementary.utils.pydantic_shim import BaseModel

Expand All @@ -8,12 +8,14 @@ class GroupItemSchema(BaseModel):
resource_type: Optional[str]


DbtGroupSchema = Dict[str, dict]
TreeGroupSchema = Dict[str, Any]
TagsGroupSchema = Dict[str, List[GroupItemSchema]]
OwnersGroupSchema = Dict[str, List[GroupItemSchema]]


class GroupsSchema(BaseModel):
dbt: DbtGroupSchema = dict()
dbt: TreeGroupSchema = dict()
tags: TagsGroupSchema = dict()
owners: OwnersGroupSchema = dict()
dwh: TreeGroupSchema = dict()
bi: Optional[TreeGroupSchema] = None
31 changes: 31 additions & 0 deletions elementary/monitor/api/groups/tree_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Dict, Generic, List, Optional, TypeVar, Union

T = TypeVar("T")
TreeT = Dict[str, Union[List[T], "TreeT"]]


class TreeBuilder(Generic[T]):
def __init__(self, seperator: str, files_keywork: str = "__files__") -> None:
self.seperator = seperator
self.files_keywork = files_keywork
self._tree: TreeT = {}

def add(self, path: Optional[str], data: Optional[T]) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically the logic from _update_dbt_group extracted to a generic class

if path is None or data is None:
return
parts = path.split(self.seperator)
current: dict = self._tree
for part in parts[:-1]:
if isinstance(current, list):
raise ValueError(
f"Path parts cannot contain files keyword: {self.files_keywork}"
)
current = current.setdefault(part, {})
if self.files_keywork in current:
if id not in current[self.files_keywork]:
current[self.files_keywork].append(data)
else:
current[self.files_keywork] = [data]

def get_tree(self) -> TreeT:
return self._tree
26 changes: 17 additions & 9 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, Iterable, List, Optional, Tuple, Union

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.filters.filters import FiltersAPI
from elementary.monitor.api.groups.groups import GroupsAPI
from elementary.monitor.api.groups.schema import GroupsSchema
from elementary.monitor.api.invocations.invocations import InvocationsAPI
from elementary.monitor.api.lineage.lineage import LineageAPI
from elementary.monitor.api.models.models import ModelsAPI
Expand All @@ -30,10 +31,23 @@
from elementary.monitor.api.tests.tests import TestsAPI
from elementary.monitor.api.totals_schema import TotalsSchema
from elementary.monitor.data_monitoring.schema import SelectorFilterSchema
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
from elementary.utils.time import get_now_utc_iso_format


class ReportAPI(APIClient):
def _get_groups(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separated into a function to make it possible to easily overwrite in cloud

self,
models: Iterable[NormalizedModelSchema],
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *singular_tests]
)

def get_report_data(
self,
days_back: int = 7,
Expand All @@ -59,7 +73,6 @@ def get_report_data(
invocations_per_test=test_runs_amount,
)
models_api = ModelsAPI(dbt_runner=self.dbt_runner)
groups_api = GroupsAPI(dbt_runner=self.dbt_runner)
lineage_api = LineageAPI(dbt_runner=self.dbt_runner)
filters_api = FiltersAPI(dbt_runner=self.dbt_runner)
invocations_api = InvocationsAPI(dbt_runner=self.dbt_runner)
Expand All @@ -73,13 +86,8 @@ def get_report_data(
lineage_node_ids.extend(exposures.keys())
singular_tests = tests_api.get_singular_tests()

groups = groups_api.get_groups(
artifacts=[
*models.values(),
*sources.values(),
*exposures.values(),
*singular_tests,
]
groups = self._get_groups(
models.values(), sources.values(), exposures.values(), singular_tests
)

models_runs = models_api.get_models_runs(
Expand Down
Loading