diff --git a/elementary/monitor/api/groups/groups.py b/elementary/monitor/api/groups/groups.py index 7c3a2cc7f..083da6d78 100644 --- a/elementary/monitor/api/groups/groups.py +++ b/elementary/monitor/api/groups/groups.py @@ -4,12 +4,13 @@ from elementary.clients.api.api_client import APIClient from elementary.monitor.api.groups.schema import ( - DbtGroupSchema, GroupItemSchema, GroupsSchema, OwnersGroupSchema, TagsGroupSchema, + TreeGroupSchema, ) +from elementary.monitor.api.groups.tree_builder import TreeBuilder from elementary.monitor.api.models.schema import ( NormalizedExposureSchema, NormalizedModelSchema, @@ -35,38 +36,36 @@ def get_groups(self, artifacts: List[GROUPABLE_ARTIFACT]) -> GroupsSchema: dbt_group = self.get_dbt_group(artifacts) tags_group = self.get_tags_group(artifacts) owners_group = self.get_owners_group(artifacts) - return GroupsSchema(dbt=dbt_group, tags=tags_group, owners=owners_group) + dwh_group = self.get_dwh_group(artifacts) + return GroupsSchema( + dbt=dbt_group, dwh=dwh_group, tags=tags_group, owners=owners_group + ) def get_dbt_group( self, artifacts: List[GROUPABLE_ARTIFACT], - ) -> DbtGroupSchema: - group: DbtGroupSchema = dict() + ) -> TreeGroupSchema: + tree_builder = TreeBuilder[GroupItemSchema](separator=posixpath.sep) for artifact in artifacts: if artifact.unique_id is None: continue - self._update_dbt_group(dbt_group=group, artifact=artifact) - return group - - def _update_dbt_group( - self, - dbt_group: dict, - artifact: GROUPABLE_ARTIFACT, - ) -> None: - if artifact.unique_id is None or artifact.normalized_full_path is None: - return + tree_builder.add( + path=artifact.normalized_full_path, data=self._get_group_item(artifact) + ) + return tree_builder.get_tree() - artifact_full_path_split = artifact.normalized_full_path.split(posixpath.sep) - for part in artifact_full_path_split[:-1]: - if part not in dbt_group: - dbt_group[part] = {} - dbt_group = dbt_group[part] - - if FILES_GROUP_KEYWORD in dbt_group: - if artifact.unique_id not in dbt_group[FILES_GROUP_KEYWORD]: - dbt_group[FILES_GROUP_KEYWORD].append(self._get_group_item(artifact)) - else: - dbt_group[FILES_GROUP_KEYWORD] = [self._get_group_item(artifact)] + def get_dwh_group(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema: + tree_builder = TreeBuilder[GroupItemSchema](separator=".") + relevant_artifacts = ( + artifact + for artifact in artifacts + if artifact.unique_id is not None + and artifact.fqn is not None + and isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema)) + ) + for artifact in relevant_artifacts: + tree_builder.add(path=artifact.fqn, data=self._get_group_item(artifact)) + return tree_builder.get_tree() def get_tags_group( self, diff --git a/elementary/monitor/api/groups/schema.py b/elementary/monitor/api/groups/schema.py index d18e0f462..63581e19d 100644 --- a/elementary/monitor/api/groups/schema.py +++ b/elementary/monitor/api/groups/schema.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from elementary.utils.pydantic_shim import BaseModel @@ -8,12 +8,14 @@ class GroupItemSchema(BaseModel): resource_type: Optional[str] -DbtGroupSchema = Dict[str, dict] +TreeGroupSchema = Dict[str, Any] TagsGroupSchema = Dict[str, List[GroupItemSchema]] OwnersGroupSchema = Dict[str, List[GroupItemSchema]] class GroupsSchema(BaseModel): - dbt: DbtGroupSchema = dict() + dbt: TreeGroupSchema = dict() tags: TagsGroupSchema = dict() owners: OwnersGroupSchema = dict() + dwh: TreeGroupSchema = dict() + bi: Optional[TreeGroupSchema] = None diff --git a/elementary/monitor/api/groups/tree_builder.py b/elementary/monitor/api/groups/tree_builder.py new file mode 100644 index 000000000..ebae92909 --- /dev/null +++ b/elementary/monitor/api/groups/tree_builder.py @@ -0,0 +1,31 @@ +from typing import Dict, Generic, List, Optional, TypeVar, Union + +T = TypeVar("T") +TreeT = Dict[str, Union[List[T], "TreeT"]] + + +class TreeBuilder(Generic[T]): + def __init__(self, separator: str, files_keyword: str = "__files__") -> None: + self.separator = separator + self.files_keyword = files_keyword + self._tree: TreeT = {} + + def add(self, path: Optional[str], data: Optional[T]) -> None: + if path is None or data is None: + return + parts = path.split(self.separator) + current: dict = self._tree + for part in parts[:-1]: + if isinstance(current, list): + raise ValueError( + f"Path parts cannot contain files keyword: {self.files_keyword}" + ) + current = current.setdefault(part, {}) + if self.files_keyword in current: + if id not in current[self.files_keyword]: + current[self.files_keyword].append(data) + else: + current[self.files_keyword] = [data] + + def get_tree(self) -> TreeT: + return self._tree diff --git a/elementary/monitor/api/report/report.py b/elementary/monitor/api/report/report.py index 33cc299f7..02a747816 100644 --- a/elementary/monitor/api/report/report.py +++ b/elementary/monitor/api/report/report.py @@ -1,9 +1,10 @@ from collections import defaultdict -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, Iterable, List, Optional, Tuple, Union from elementary.clients.api.api_client import APIClient from elementary.monitor.api.filters.filters import FiltersAPI from elementary.monitor.api.groups.groups import GroupsAPI +from elementary.monitor.api.groups.schema import GroupsSchema from elementary.monitor.api.invocations.invocations import InvocationsAPI from elementary.monitor.api.lineage.lineage import LineageAPI from elementary.monitor.api.models.models import ModelsAPI @@ -30,10 +31,23 @@ from elementary.monitor.api.tests.tests import TestsAPI from elementary.monitor.api.totals_schema import TotalsSchema from elementary.monitor.data_monitoring.schema import SelectorFilterSchema +from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema from elementary.utils.time import get_now_utc_iso_format class ReportAPI(APIClient): + def _get_groups( + self, + models: Iterable[NormalizedModelSchema], + sources: Iterable[NormalizedSourceSchema], + exposures: Iterable[NormalizedExposureSchema], + singular_tests: Iterable[NormalizedTestSchema], + ) -> GroupsSchema: + groups_api = GroupsAPI(self.dbt_runner) + return groups_api.get_groups( + artifacts=[*models, *sources, *exposures, *singular_tests] + ) + def get_report_data( self, days_back: int = 7, @@ -59,7 +73,6 @@ def get_report_data( invocations_per_test=test_runs_amount, ) models_api = ModelsAPI(dbt_runner=self.dbt_runner) - groups_api = GroupsAPI(dbt_runner=self.dbt_runner) lineage_api = LineageAPI(dbt_runner=self.dbt_runner) filters_api = FiltersAPI(dbt_runner=self.dbt_runner) invocations_api = InvocationsAPI(dbt_runner=self.dbt_runner) @@ -73,13 +86,8 @@ def get_report_data( lineage_node_ids.extend(exposures.keys()) singular_tests = tests_api.get_singular_tests() - groups = groups_api.get_groups( - artifacts=[ - *models.values(), - *sources.values(), - *exposures.values(), - *singular_tests, - ] + groups = self._get_groups( + models.values(), sources.values(), exposures.values(), singular_tests ) models_runs = models_api.get_models_runs(