diff --git a/elementary/monitor/api/report/report.py b/elementary/monitor/api/report/report.py index a2a2629d6..b864a4bd3 100644 --- a/elementary/monitor/api/report/report.py +++ b/elementary/monitor/api/report/report.py @@ -33,6 +33,7 @@ def get_report_data( disable_samples: bool = False, filter: SelectorFilterSchema = SelectorFilterSchema(), env: Optional[str] = None, + warehouse_type: Optional[str] = None, ) -> Tuple[ReportDataSchema, Optional[Exception]]: try: tests_api = TestsAPI( @@ -121,6 +122,7 @@ def get_report_data( resources_latest_invocation=resources_latest_invocation, invocations_job_identification=invocations_job_identification, env=dict(project_name=project_name, env=env), + warehouse_type=warehouse_type, ) return report_data, None except Exception as error: diff --git a/elementary/monitor/api/report/schema.py b/elementary/monitor/api/report/schema.py index 0b63cd35b..bf04470b2 100644 --- a/elementary/monitor/api/report/schema.py +++ b/elementary/monitor/api/report/schema.py @@ -23,3 +23,4 @@ class ReportDataSchema(BaseModel): invocations_job_identification: dict = dict() env: dict = dict() tracking: Optional[dict] = None + warehouse_type: Optional[str] = None diff --git a/elementary/monitor/cli.py b/elementary/monitor/cli.py index 975340027..6f60ed4f5 100644 --- a/elementary/monitor/cli.py +++ b/elementary/monitor/cli.py @@ -275,9 +275,6 @@ def monitor( ) anonymous_tracking = AnonymousCommandLineTracking(config) anonymous_tracking.set_env("use_select", bool(select)) - anonymous_tracking.track_cli_start( - Command.MONITOR, get_cli_properties(), ctx.command.name - ) try: config.validate_monitor() data_monitoring = DataMonitoringAlerts( @@ -288,6 +285,9 @@ def monitor( disable_samples=disable_samples, filter=select, ) + anonymous_tracking.track_cli_start( + Command.MONITOR, get_cli_properties(), ctx.command.name + ) success = data_monitoring.run_alerts( days_back, full_refresh_dbt_package, dbt_vars=vars ) @@ -364,9 +364,6 @@ def report( ) anonymous_tracking = AnonymousCommandLineTracking(config) anonymous_tracking.set_env("use_select", bool(select)) - anonymous_tracking.track_cli_start( - Command.REPORT, get_cli_properties(), ctx.command.name - ) try: data_monitoring = DataMonitoringReport( config=config, @@ -375,6 +372,9 @@ def report( disable_samples=disable_samples, filter=select, ) + anonymous_tracking.track_cli_start( + Command.REPORT, get_cli_properties(), ctx.command.name + ) generated_report_successfully, _ = data_monitoring.generate_report( days_back=days_back, test_runs_amount=executions_limit, @@ -555,9 +555,6 @@ def send_report( ) anonymous_tracking = AnonymousCommandLineTracking(config) anonymous_tracking.set_env("use_select", bool(select)) - anonymous_tracking.track_cli_start( - Command.SEND_REPORT, get_cli_properties(), ctx.command.name - ) try: config.validate_send_report() # bucket-file-path determines the path of the report in the bucket. @@ -574,6 +571,9 @@ def send_report( disable_samples=disable_samples, filter=select, ) + anonymous_tracking.track_cli_start( + Command.SEND_REPORT, get_cli_properties(), ctx.command.name + ) sent_report_successfully = data_monitoring.send_report( days_back=days_back, test_runs_amount=executions_limit, diff --git a/elementary/monitor/data_monitoring/data_monitoring.py b/elementary/monitor/data_monitoring/data_monitoring.py index ba811626a..faeb681d8 100644 --- a/elementary/monitor/data_monitoring/data_monitoring.py +++ b/elementary/monitor/data_monitoring/data_monitoring.py @@ -7,9 +7,12 @@ from elementary.clients.slack.client import SlackClient from elementary.config.config import Config from elementary.monitor import dbt_project_utils +from elementary.monitor.data_monitoring.schema import WarehouseInfo from elementary.monitor.data_monitoring.selector_filter import SelectorFilter +from elementary.tracking.anonymous_tracking import AnonymousTracking from elementary.tracking.tracking_interface import Tracking from elementary.utils import package +from elementary.utils.hash import hash from elementary.utils.log import get_logger logger = get_logger(__name__) @@ -36,7 +39,14 @@ def __init__( latest_invocation = self.get_latest_invocation() self.project_name = latest_invocation.get("project_name") dbt_pkg_version = latest_invocation.get("elementary_version") + warehouse_info = self._get_warehouse_info( + hash_id=isinstance(tracking, AnonymousTracking) + ) + if warehouse_info: + self.warehouse_type = warehouse_info.type if tracking: + if warehouse_info: + tracking.register_warehouse_group(warehouse_info) tracking.set_env("target_name", latest_invocation.get("target_name")) tracking.set_env("dbt_orchestrator", latest_invocation.get("orchestrator")) tracking.set_env("dbt_version", latest_invocation.get("dbt_version")) @@ -160,3 +170,24 @@ def _check_dbt_package_compatibility(dbt_pkg_ver_str: str) -> None: logger.info( f"edr ({py_pkg_ver}) and Elementary's dbt package ({dbt_pkg_ver}) are compatible." ) + + def _get_warehouse_info(self, hash_id: bool = False) -> Optional[WarehouseInfo]: + dbt_runner = DbtRunner( + dbt_project_utils.PATH, + self.config.profiles_dir, + self.config.profile_target, + dbt_env_vars=self.config.dbt_env_vars, + ) + try: + warehouse_type, warehouse_unique_id = json.loads( + dbt_runner.run_operation("get_adapter_type_and_unique_id", quiet=True)[ + 0 + ] + ) + return WarehouseInfo( + id=warehouse_unique_id if not hash_id else hash(warehouse_unique_id), + type=warehouse_type, + ) + except Exception: + logger.debug("Could not get warehouse info.", exc_info=True) + return None diff --git a/elementary/monitor/data_monitoring/report/data_monitoring_report.py b/elementary/monitor/data_monitoring/report/data_monitoring_report.py index 52e8192cc..e433ca428 100644 --- a/elementary/monitor/data_monitoring/report/data_monitoring_report.py +++ b/elementary/monitor/data_monitoring/report/data_monitoring_report.py @@ -108,6 +108,7 @@ def get_report_data( project_name=project_name or self.project_name, filter=self.filter.get_filter(), env=self.config.env, + warehouse_type=self.warehouse_type, ) self._add_report_tracking(report_data, error) if error: @@ -147,8 +148,7 @@ def _add_report_tracking( report_data.tracking = dict( posthog_api_key=self.tracking.POSTHOG_PROJECT_API_KEY, report_generator_anonymous_user_id=self.tracking.anonymous_user_id, - anonymous_warehouse_id=self.tracking.anonymous_warehouse - and self.tracking.anonymous_warehouse.id, + anonymous_warehouse_id=self.tracking.anonymous_warehouse_id, ) def send_report( diff --git a/elementary/monitor/data_monitoring/schema.py b/elementary/monitor/data_monitoring/schema.py index 3f751ad48..2e43aa9b3 100644 --- a/elementary/monitor/data_monitoring/schema.py +++ b/elementary/monitor/data_monitoring/schema.py @@ -50,3 +50,8 @@ class DataMonitoringReportTestResultsSchema(BaseModel): class DataMonitoringReportTestRunsSchema(BaseModel): runs: Dict[Optional[str], List[TestRunSchema]] = dict() totals: Dict[Optional[str], TotalsSchema] = dict() + + +class WarehouseInfo(BaseModel): + id: str + type: str diff --git a/elementary/tracking/anonymous_tracking.py b/elementary/tracking/anonymous_tracking.py index 21c6fe7fe..c2e41c657 100644 --- a/elementary/tracking/anonymous_tracking.py +++ b/elementary/tracking/anonymous_tracking.py @@ -1,16 +1,11 @@ -import json import logging import uuid from pathlib import Path from typing import Any, Dict, List, Optional -from pydantic import BaseModel - import elementary.exceptions.exceptions import elementary.tracking.runner -from elementary.clients.dbt.dbt_runner import DbtRunner from elementary.config.config import Config -from elementary.monitor import dbt_project_utils from elementary.tracking.tracking_interface import Tracking from elementary.utils.log import get_logger @@ -18,11 +13,6 @@ logger = get_logger(__name__) -class AnonymousWarehouse(BaseModel): - id: str - type: str - - class AnonymousTracking(Tracking): _ANONYMOUS_USER_ID_FILE = ".user_id" _INTERNAL_EXCEPTIONS_LIMIT = 5 @@ -30,7 +20,7 @@ class AnonymousTracking(Tracking): def __init__(self, config: Config) -> None: super().__init__(config) self.anonymous_user_id = None - self.anonymous_warehouse = None + self.anonymous_warehouse_id = None self._do_not_track = config.anonymous_tracking_enabled is False self._run_id = str(uuid.uuid4()) @@ -47,7 +37,6 @@ def _init(self): try: self._props["env"] = elementary.tracking.runner.get_props() self.anonymous_user_id = self._get_anonymous_user_id() - self.anonymous_warehouse = self._get_anonymous_warehouse() except Exception: logger.debug("Unable to initialize anonymous tracking.", exc_info=True) @@ -84,10 +73,6 @@ def _send_anonymous_event( **self._props, **properties, }, - groups={ - "warehouse": self.anonymous_warehouse - and self.anonymous_warehouse.id - }, ) except Exception: logger.debug("Unable to send tracking event.", exc_info=True) @@ -104,33 +89,6 @@ def _get_exception_properties(exc: Exception): props.update(exc.anonymous_tracking_context) return props - def _get_anonymous_warehouse(self) -> Optional[AnonymousWarehouse]: - try: - dbt_runner = DbtRunner( - dbt_project_utils.PATH, - self._config.profiles_dir, - self._config.profile_target, - dbt_env_vars=self._config.dbt_env_vars, - ) - if not dbt_project_utils.is_dbt_package_up_to_date(): - logger.info("Downloading edr internal dbt package") - dbt_runner.deps(quiet=True) - - adapter_type, adapter_unique_id = json.loads( - dbt_runner.run_operation("get_adapter_type_and_unique_id", quiet=True)[ - 0 - ] - ) - anonymous_warehouse_id = self._hash(adapter_unique_id) - self._set_events_group( - "warehouse", - anonymous_warehouse_id, - {"id": anonymous_warehouse_id, "type": adapter_type}, - ) - return AnonymousWarehouse(id=anonymous_warehouse_id, type=adapter_type) - except Exception: - return None - class AnonymousCommandLineTracking(AnonymousTracking): def track_cli_start( diff --git a/elementary/tracking/tracking_interface.py b/elementary/tracking/tracking_interface.py index 1077217d1..283a3221f 100644 --- a/elementary/tracking/tracking_interface.py +++ b/elementary/tracking/tracking_interface.py @@ -5,6 +5,7 @@ import requests from elementary.config.config import Config +from elementary.monitor.data_monitoring.schema import WarehouseInfo from elementary.utils.hash import hash @@ -15,6 +16,7 @@ class BaseTracking(ABC): def __init__(self, config: Config): self._config = config self._props: Dict[str, Any] = {} + self.groups = {} @staticmethod def _hash(content: str): @@ -27,46 +29,52 @@ def set_env(self, key: str, value): self._props[key] = value @abstractmethod - def _set_events_group( - group_type: str, group_identifier: str, group_props: Optional[dict] = None + def _register_group( + self, group_type: str, group_identifier: str, group_props: Optional[dict] = None ) -> None: raise NotImplementedError @abstractmethod def _send_event( + self, distinct_id: str, event_name: str, properties: Optional[dict] = None, - groups: Optional[dict] = None, ) -> None: raise NotImplementedError + def register_warehouse_group(self, warehouse_info: WarehouseInfo) -> None: + self._register_group( + "warehouse", + warehouse_info.id, + warehouse_info.dict(), + ) + class Tracking(BaseTracking): def __init__(self, config: Config): super().__init__(config) posthog.project_api_key = self.POSTHOG_PROJECT_API_KEY - @staticmethod - def _set_events_group( - group_type: str, group_identifier: str, group_props: Optional[dict] = None - ) -> None: - posthog.group_identify(group_type, group_identifier, group_props) - - @staticmethod def _send_event( + self, distinct_id: str, event_name: str, properties: Optional[dict] = None, - groups: Optional[dict] = None, ) -> None: posthog.capture( distinct_id=distinct_id, event=event_name, properties=properties, - groups=groups, + groups=self.groups, ) + def _register_group( + self, group_type: str, group_identifier: str, group_props: Optional[dict] = None + ) -> None: + posthog.group_identify(group_type, group_identifier, group_props) + self.groups[group_type] = group_identifier + class TrackingAPI(BaseTracking): def __init__(self, config: Config): @@ -108,26 +116,27 @@ def _capture( api_key=self.POSTHOG_PROJECT_API_KEY, event=event_name, distinct_id=distinct_id, - properties={**properties, "$groups": groups}, + properties={**(properties or {}), "$groups": groups}, ), ) return response - def _set_events_group( + def _register_group( self, group_type: str, group_identifier: str, group_props: Optional[dict] = None ) -> requests.Response: - return self._group_identify(group_type, group_identifier, group_props) + resp = self._group_identify(group_type, group_identifier, group_props) + self.groups[group_type] = group_identifier + return resp def _send_event( self, distinct_id: str, event_name: str, properties: Optional[dict] = None, - groups: Optional[dict] = None, ) -> requests.Response: return self._capture( distinct_id=distinct_id, event_name=event_name, properties=properties, - groups=groups, + groups=self.groups, )