Skip to content

Commit

Permalink
Merge pull request #1667 from elementary-data/ele-3527-add-new-groups…
Browse files Browse the repository at this point in the history
…-to-cli

Added new groups
  • Loading branch information
ofek1weiss committed Aug 11, 2024
2 parents 7495249 + a9d0dd9 commit 6382426
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 37 deletions.
49 changes: 24 additions & 25 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.groups.schema import (
DbtGroupSchema,
GroupItemSchema,
GroupsSchema,
OwnersGroupSchema,
TagsGroupSchema,
TreeGroupSchema,
)
from elementary.monitor.api.groups.tree_builder import TreeBuilder
from elementary.monitor.api.models.schema import (
NormalizedExposureSchema,
NormalizedModelSchema,
Expand All @@ -35,38 +36,36 @@ def get_groups(self, artifacts: List[GROUPABLE_ARTIFACT]) -> GroupsSchema:
dbt_group = self.get_dbt_group(artifacts)
tags_group = self.get_tags_group(artifacts)
owners_group = self.get_owners_group(artifacts)
return GroupsSchema(dbt=dbt_group, tags=tags_group, owners=owners_group)
dwh_group = self.get_dwh_group(artifacts)
return GroupsSchema(
dbt=dbt_group, dwh=dwh_group, tags=tags_group, owners=owners_group
)

def get_dbt_group(
self,
artifacts: List[GROUPABLE_ARTIFACT],
) -> DbtGroupSchema:
group: DbtGroupSchema = dict()
) -> TreeGroupSchema:
tree_builder = TreeBuilder[GroupItemSchema](separator=posixpath.sep)
for artifact in artifacts:
if artifact.unique_id is None:
continue
self._update_dbt_group(dbt_group=group, artifact=artifact)
return group

def _update_dbt_group(
self,
dbt_group: dict,
artifact: GROUPABLE_ARTIFACT,
) -> None:
if artifact.unique_id is None or artifact.normalized_full_path is None:
return
tree_builder.add(
path=artifact.normalized_full_path, data=self._get_group_item(artifact)
)
return tree_builder.get_tree()

artifact_full_path_split = artifact.normalized_full_path.split(posixpath.sep)
for part in artifact_full_path_split[:-1]:
if part not in dbt_group:
dbt_group[part] = {}
dbt_group = dbt_group[part]

if FILES_GROUP_KEYWORD in dbt_group:
if artifact.unique_id not in dbt_group[FILES_GROUP_KEYWORD]:
dbt_group[FILES_GROUP_KEYWORD].append(self._get_group_item(artifact))
else:
dbt_group[FILES_GROUP_KEYWORD] = [self._get_group_item(artifact)]
def get_dwh_group(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema:
tree_builder = TreeBuilder[GroupItemSchema](separator=".")
relevant_artifacts = (
artifact
for artifact in artifacts
if artifact.unique_id is not None
and artifact.fqn is not None
and isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema))
)
for artifact in relevant_artifacts:
tree_builder.add(path=artifact.fqn, data=self._get_group_item(artifact))
return tree_builder.get_tree()

def get_tags_group(
self,
Expand Down
8 changes: 5 additions & 3 deletions elementary/monitor/api/groups/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

from elementary.utils.pydantic_shim import BaseModel

Expand All @@ -8,12 +8,14 @@ class GroupItemSchema(BaseModel):
resource_type: Optional[str]


DbtGroupSchema = Dict[str, dict]
TreeGroupSchema = Dict[str, Any]
TagsGroupSchema = Dict[str, List[GroupItemSchema]]
OwnersGroupSchema = Dict[str, List[GroupItemSchema]]


class GroupsSchema(BaseModel):
dbt: DbtGroupSchema = dict()
dbt: TreeGroupSchema = dict()
tags: TagsGroupSchema = dict()
owners: OwnersGroupSchema = dict()
dwh: TreeGroupSchema = dict()
bi: Optional[TreeGroupSchema] = None
31 changes: 31 additions & 0 deletions elementary/monitor/api/groups/tree_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Dict, Generic, List, Optional, TypeVar, Union

T = TypeVar("T")
TreeT = Dict[str, Union[List[T], "TreeT"]]


class TreeBuilder(Generic[T]):
def __init__(self, separator: str, files_keyword: str = "__files__") -> None:
self.separator = separator
self.files_keyword = files_keyword
self._tree: TreeT = {}

def add(self, path: Optional[str], data: Optional[T]) -> None:
if path is None or data is None:
return
parts = path.split(self.separator)
current: dict = self._tree
for part in parts[:-1]:
if isinstance(current, list):
raise ValueError(
f"Path parts cannot contain files keyword: {self.files_keyword}"
)
current = current.setdefault(part, {})
if self.files_keyword in current:
if id not in current[self.files_keyword]:
current[self.files_keyword].append(data)
else:
current[self.files_keyword] = [data]

def get_tree(self) -> TreeT:
return self._tree
26 changes: 17 additions & 9 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, Iterable, List, Optional, Tuple, Union

from elementary.clients.api.api_client import APIClient
from elementary.monitor.api.filters.filters import FiltersAPI
from elementary.monitor.api.groups.groups import GroupsAPI
from elementary.monitor.api.groups.schema import GroupsSchema
from elementary.monitor.api.invocations.invocations import InvocationsAPI
from elementary.monitor.api.lineage.lineage import LineageAPI
from elementary.monitor.api.models.models import ModelsAPI
Expand All @@ -30,10 +31,23 @@
from elementary.monitor.api.tests.tests import TestsAPI
from elementary.monitor.api.totals_schema import TotalsSchema
from elementary.monitor.data_monitoring.schema import SelectorFilterSchema
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
from elementary.utils.time import get_now_utc_iso_format


class ReportAPI(APIClient):
def _get_groups(
self,
models: Iterable[NormalizedModelSchema],
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *singular_tests]
)

def get_report_data(
self,
days_back: int = 7,
Expand All @@ -59,7 +73,6 @@ def get_report_data(
invocations_per_test=test_runs_amount,
)
models_api = ModelsAPI(dbt_runner=self.dbt_runner)
groups_api = GroupsAPI(dbt_runner=self.dbt_runner)
lineage_api = LineageAPI(dbt_runner=self.dbt_runner)
filters_api = FiltersAPI(dbt_runner=self.dbt_runner)
invocations_api = InvocationsAPI(dbt_runner=self.dbt_runner)
Expand All @@ -73,13 +86,8 @@ def get_report_data(
lineage_node_ids.extend(exposures.keys())
singular_tests = tests_api.get_singular_tests()

groups = groups_api.get_groups(
artifacts=[
*models.values(),
*sources.values(),
*exposures.values(),
*singular_tests,
]
groups = self._get_groups(
models.values(), sources.values(), exposures.values(), singular_tests
)

models_runs = models_api.get_models_runs(
Expand Down

0 comments on commit 6382426

Please sign in to comment.