Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test models in the report #1697

Merged
merged 14 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ jobs:
--aws-access-key-id "$AWS_ACCESS_KEY_ID"
--aws-secret-access-key "$AWS_SECRET_ACCESS_KEY"
--s3-bucket-name elementary-ci-artifacts
--google-service-account-path /tmp/gcs_keyfile.json
--gcs-bucket-name elementary_ci_artifacts
# --google-service-account-path /tmp/gcs_keyfile.json
# --gcs-bucket-name elementary_ci_artifacts
--azure-connection-string "$AZURE_CONNECTION_STRING"
--azure-container-name reports
--update-bucket-website true
Expand Down
15 changes: 14 additions & 1 deletion elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
from elementary.monitor.api.source_freshnesses.source_freshnesses import (
SourceFreshnessesAPI,
)
from elementary.monitor.api.tests.schema import TestResultSchema, TestRunSchema
from elementary.monitor.api.tests.schema import (
TestResultSchema,
TestRunSchema,
TestSchema,
)
from elementary.monitor.api.tests.tests import TestsAPI
from elementary.monitor.api.totals_schema import TotalsSchema
from elementary.monitor.data_monitoring.schema import SelectorFilterSchema
Expand Down Expand Up @@ -103,6 +107,7 @@ def get_report_data(
)
coverages = models_api.get_test_coverages()

tests = tests_api.get_tests()
test_invocation = invocations_api.get_test_invocation_from_filter(filter)

test_results = tests_api.get_test_results(
Expand Down Expand Up @@ -149,6 +154,7 @@ def get_report_data(
"totals"
]
serializable_models_coverages = self._serialize_coverages(coverages)
serializable_tests = self._serialize_tests(tests)
serializable_test_results = self._serialize_test_results(union_test_results)
serializable_test_results_totals = self._serialize_totals(
test_results_totals
Expand All @@ -175,6 +181,7 @@ def get_report_data(
days_back=days_back,
models=serializable_models,
groups=serializable_groups,
tests=serializable_tests,
invocation=serializable_invocation,
test_results=serializable_test_results,
test_results_totals=serializable_test_results_totals,
Expand Down Expand Up @@ -230,6 +237,12 @@ def _serialize_test_results(
)
return serializable_test_results

def _serialize_tests(self, tests: Dict[str, TestSchema]) -> Dict[str, dict]:
serializable_tests = dict()
for key, test in tests.items():
serializable_tests[key] = test.dict()
return serializable_tests

def _serialize_test_runs(
self,
test_runs: Dict[str, List[Union[TestRunSchema, SourceFreshnessRunSchema]]],
Expand Down
1 change: 1 addition & 0 deletions elementary/monitor/api/report/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ReportDataSchema(BaseModel):
models: dict = dict()
groups: dict = dict()
invocation: dict = dict()
tests: dict = dict()
test_results: dict = dict()
test_results_totals: dict = dict()
test_runs: dict = dict()
Expand Down
25 changes: 25 additions & 0 deletions elementary/monitor/api/tests/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,31 @@ class TestMetadataSchema(BaseModel):
normalized_full_path: Optional[str] = None


class TestSchema(BaseModel):
unique_id: str
model_unique_id: Optional[str] = None
table_unique_id: Optional[str] = None
database_name: Optional[str] = None
schema_name: str
table_name: Optional[str] = None
column_name: Optional[str] = None
name: str
display_name: str
original_path: Optional[str] = None
type: str
test_type: Optional[str] = None
test_sub_type: Optional[str] = None
test_params: dict
description: Optional[str] = None
configuration: dict
tags: List[str] = Field(default_factory=list)
normalized_full_path: Optional[str] = None
created_at: Optional[str] = None
latest_run_time: Optional[str] = None
latest_run_time_utc: Optional[str] = None
latest_run_status: Optional[str] = None


class TestResultSchema(BaseModel):
metadata: TestMetadataSchema
test_results: Union[DbtTestResultSchema, ElementaryTestResultSchema]
Expand Down
111 changes: 77 additions & 34 deletions elementary/monitor/api/tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import statistics
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Optional, Union, cast
from typing import DefaultDict, Dict, List, Optional, Union, cast

from dateutil import tz

Expand All @@ -16,11 +16,19 @@
TestResultSchema,
TestResultSummarySchema,
TestRunSchema,
TestSchema,
)
from elementary.monitor.api.tests.utils import (
get_display_name,
get_normalized_full_path,
get_table_full_name,
get_test_configuration,
)
from elementary.monitor.api.totals_schema import TotalsSchema
from elementary.monitor.data_monitoring.schema import SelectorFilterSchema
from elementary.monitor.fetchers.tests.schema import (
NormalizedTestSchema,
TestDBRowSchema,
TestResultDBRowSchema,
)
from elementary.monitor.fetchers.tests.tests import TestsFetcher
Expand Down Expand Up @@ -130,6 +138,13 @@ def _get_test_subscribers(test_meta: dict, model_meta: dict) -> List[str]:
def get_singular_tests(self) -> List[NormalizedTestSchema]:
return self.tests_fetcher.get_singular_tests()

def get_tests(self) -> Dict[str, TestSchema]:
tests_db_rows = self.tests_fetcher.get_tests()
return {
test_db_row.unique_id: self._parse_test_db_row(test_db_row)
for test_db_row in tests_db_rows
}

def get_test_results(
self,
invocation_id: Optional[str],
Expand Down Expand Up @@ -304,26 +319,17 @@ def _parse_affected_row(results_description: str) -> Optional[int]:
def _get_test_metadata_from_test_result_db_row(
test_result_db_row: TestResultDBRowSchema,
) -> TestMetadataSchema:
test_display_name = (
test_result_db_row.test_name.replace("_", " ").title()
if test_result_db_row.test_name
else ""
)
test_display_name = get_display_name(test_result_db_row.test_name)
detected_at_datetime = convert_utc_iso_format_to_datetime(
test_result_db_row.detected_at
)
detected_at_utc = detected_at_datetime
detected_at = detected_at_datetime.astimezone(tz.tzlocal())
table_full_name_parts = [
name
for name in [
test_result_db_row.database_name,
test_result_db_row.schema_name,
test_result_db_row.table_name,
]
if name
]
table_full_name = ".".join(table_full_name_parts).lower()
table_full_name = get_table_full_name(
test_result_db_row.database_name,
test_result_db_row.schema_name,
test_result_db_row.table_name,
)
test_params = test_result_db_row.test_params
test_query = (
test_result_db_row.test_results_query.strip()
Expand All @@ -336,23 +342,11 @@ def _get_test_metadata_from_test_result_db_row(
result_query=test_query,
)

configuration: Dict[str, Any]
if test_result_db_row.test_type == "dbt_test":
configuration = dict(
test_name=test_result_db_row.test_name,
test_params=test_params,
)
else:
time_bucket_configuration = test_params.get("time_bucket", {})
time_bucket_count = time_bucket_configuration.get("count", 1)
time_bucket_period = time_bucket_configuration.get("period", "day")
configuration = dict(
test_name=test_result_db_row.test_name,
timestamp_column=test_params.get("timestamp_column"),
testing_timeframe=f"{time_bucket_count} {time_bucket_period}{'s' if time_bucket_count > 1 else ''}",
anomaly_threshold=test_params.get("sensitivity")
or test_params.get("anomaly_sensitivity"),
)
configuration = get_test_configuration(
test_type=test_result_db_row.test_type,
name=test_result_db_row.test_name,
test_params=test_params,
)

return TestMetadataSchema(
test_unique_id=test_result_db_row.test_unique_id,
Expand All @@ -378,7 +372,56 @@ def _get_test_metadata_from_test_result_db_row(
result=result,
configuration=configuration,
test_tags=test_result_db_row.test_tags,
normalized_full_path=test_result_db_row.normalized_full_path,
normalized_full_path=get_normalized_full_path(
test_result_db_row.package_name, test_result_db_row.original_path
),
)

@classmethod
def _parse_test_db_row(cls, test_db_row: TestDBRowSchema) -> TestSchema:
latest_run_datetime = (
convert_utc_iso_format_to_datetime(test_db_row.latest_run_time)
if test_db_row.latest_run_time
else None
)

return TestSchema(
unique_id=test_db_row.unique_id,
model_unique_id=test_db_row.model_unique_id,
table_unique_id=get_table_full_name(
test_db_row.database_name,
test_db_row.schema_name,
test_db_row.table_name,
),
database_name=test_db_row.database_name,
schema_name=test_db_row.schema_name,
table_name=test_db_row.table_name,
column_name=test_db_row.column_name,
name=test_db_row.name,
display_name=get_display_name(test_db_row.name),
original_path=test_db_row.original_path,
type=test_db_row.type,
test_type=test_db_row.test_type,
test_sub_type=test_db_row.test_sub_type,
test_params=test_db_row.test_params,
description=test_db_row.meta.get("description"),
configuration=get_test_configuration(
test_db_row.test_type, test_db_row.name, test_db_row.test_params
),
tags=list(set(test_db_row.tags + test_db_row.model_tags)),
normalized_full_path=get_normalized_full_path(
test_db_row.package_name, test_db_row.original_path
),
created_at=test_db_row.created_at if test_db_row.created_at else None,
latest_run_time=latest_run_datetime.isoformat()
if latest_run_datetime
else None,
latest_run_time_utc=latest_run_datetime.astimezone(tz.tzlocal()).isoformat()
if latest_run_datetime
else None,
latest_run_status=test_db_row.latest_run_status
if test_db_row.latest_run_status
else None,
)

@staticmethod
Expand Down
59 changes: 59 additions & 0 deletions elementary/monitor/api/tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Any, Dict, Optional


def get_table_full_name(
database_name: Optional[str],
schema_name: Optional[str],
table_name: Optional[str],
) -> str:
if not table_name:
return ""

table_full_name_parts = [
name
for name in [
database_name,
schema_name,
table_name,
]
if name
]
table_full_name = ".".join(table_full_name_parts).lower()
return table_full_name


def get_display_name(name: str) -> str:
return name.replace("_", " ").title()


def get_test_configuration(
test_type: Optional[str], name: str, test_params: Dict
) -> Dict[str, Any]:
if test_type is None:
return dict()
if test_type == "dbt_test":
return dict(
test_name=name,
test_params=test_params,
)
else:
time_bucket_configuration = test_params.get("time_bucket", {})
time_bucket_count = time_bucket_configuration.get("count", 1)
time_bucket_period = time_bucket_configuration.get("period", "day")
return dict(
test_name=name,
timestamp_column=test_params.get("timestamp_column"),
testing_timeframe=f"{time_bucket_count} {time_bucket_period}{'s' if time_bucket_count > 1 else ''}",
anomaly_threshold=test_params.get("sensitivity")
or test_params.get("anomaly_sensitivity"),
)


def get_normalized_full_path(
package_name: Optional[str], original_path: Optional[str]
) -> Optional[str]:
if not original_path:
return None
if package_name:
return f"{package_name}/{original_path}"
return original_path
56 changes: 0 additions & 56 deletions elementary/monitor/dbt_project/macros/base_queries/tests.sql

This file was deleted.

Loading
Loading