Skip to content

Commit

Permalink
Pass warehouse type to FE.
Browse files Browse the repository at this point in the history
  • Loading branch information
elongl committed Jun 21, 2023
1 parent 5ffa907 commit abbfdf5
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 71 deletions.
2 changes: 2 additions & 0 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def get_report_data(
disable_samples: bool = False,
filter: SelectorFilterSchema = SelectorFilterSchema(),
env: Optional[str] = None,
warehouse_type: Optional[str] = None,
) -> Tuple[ReportDataSchema, Optional[Exception]]:
try:
tests_api = TestsAPI(
Expand Down Expand Up @@ -121,6 +122,7 @@ def get_report_data(
resources_latest_invocation=resources_latest_invocation,
invocations_job_identification=invocations_job_identification,
env=dict(project_name=project_name, env=env),
warehouse_type=warehouse_type,
)
return report_data, None
except Exception as error:
Expand Down
1 change: 1 addition & 0 deletions elementary/monitor/api/report/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ class ReportDataSchema(BaseModel):
invocations_job_identification: dict = dict()
env: dict = dict()
tracking: Optional[dict] = None
warehouse_type: Optional[str] = None
18 changes: 9 additions & 9 deletions elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,6 @@ def monitor(
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
anonymous_tracking.track_cli_start(
Command.MONITOR, get_cli_properties(), ctx.command.name
)
try:
config.validate_monitor()
data_monitoring = DataMonitoringAlerts(
Expand All @@ -288,6 +285,9 @@ def monitor(
disable_samples=disable_samples,
filter=select,
)
anonymous_tracking.track_cli_start(
Command.MONITOR, get_cli_properties(), ctx.command.name
)
success = data_monitoring.run_alerts(
days_back, full_refresh_dbt_package, dbt_vars=vars
)
Expand Down Expand Up @@ -364,9 +364,6 @@ def report(
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
anonymous_tracking.track_cli_start(
Command.REPORT, get_cli_properties(), ctx.command.name
)
try:
data_monitoring = DataMonitoringReport(
config=config,
Expand All @@ -375,6 +372,9 @@ def report(
disable_samples=disable_samples,
filter=select,
)
anonymous_tracking.track_cli_start(
Command.REPORT, get_cli_properties(), ctx.command.name
)
generated_report_successfully, _ = data_monitoring.generate_report(
days_back=days_back,
test_runs_amount=executions_limit,
Expand Down Expand Up @@ -555,9 +555,6 @@ def send_report(
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
anonymous_tracking.track_cli_start(
Command.SEND_REPORT, get_cli_properties(), ctx.command.name
)
try:
config.validate_send_report()
# bucket-file-path determines the path of the report in the bucket.
Expand All @@ -574,6 +571,9 @@ def send_report(
disable_samples=disable_samples,
filter=select,
)
anonymous_tracking.track_cli_start(
Command.SEND_REPORT, get_cli_properties(), ctx.command.name
)
sent_report_successfully = data_monitoring.send_report(
days_back=days_back,
test_runs_amount=executions_limit,
Expand Down
31 changes: 31 additions & 0 deletions elementary/monitor/data_monitoring/data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from elementary.clients.slack.client import SlackClient
from elementary.config.config import Config
from elementary.monitor import dbt_project_utils
from elementary.monitor.data_monitoring.schema import WarehouseInfo
from elementary.monitor.data_monitoring.selector_filter import SelectorFilter
from elementary.tracking.anonymous_tracking import AnonymousTracking
from elementary.tracking.tracking_interface import Tracking
from elementary.utils import package
from elementary.utils.hash import hash
from elementary.utils.log import get_logger

logger = get_logger(__name__)
Expand All @@ -36,7 +39,14 @@ def __init__(
latest_invocation = self.get_latest_invocation()
self.project_name = latest_invocation.get("project_name")
dbt_pkg_version = latest_invocation.get("elementary_version")
warehouse_info = self._get_warehouse_info(
hash_id=isinstance(tracking, AnonymousTracking)
)
if warehouse_info:
self.warehouse_type = warehouse_info.type
if tracking:
if warehouse_info:
tracking.register_warehouse_group(warehouse_info)
tracking.set_env("target_name", latest_invocation.get("target_name"))
tracking.set_env("dbt_orchestrator", latest_invocation.get("orchestrator"))
tracking.set_env("dbt_version", latest_invocation.get("dbt_version"))
Expand Down Expand Up @@ -160,3 +170,24 @@ def _check_dbt_package_compatibility(dbt_pkg_ver_str: str) -> None:
logger.info(
f"edr ({py_pkg_ver}) and Elementary's dbt package ({dbt_pkg_ver}) are compatible."
)

def _get_warehouse_info(self, hash_id: bool = False) -> Optional[WarehouseInfo]:
dbt_runner = DbtRunner(
dbt_project_utils.PATH,
self.config.profiles_dir,
self.config.profile_target,
dbt_env_vars=self.config.dbt_env_vars,
)
try:
warehouse_type, warehouse_unique_id = json.loads(
dbt_runner.run_operation("get_adapter_type_and_unique_id", quiet=True)[
0
]
)
return WarehouseInfo(
id=warehouse_unique_id if not hash_id else hash(warehouse_unique_id),
type=warehouse_type,
)
except Exception:
logger.debug("Could not get warehouse info.", exc_info=True)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def get_report_data(
project_name=project_name or self.project_name,
filter=self.filter.get_filter(),
env=self.config.env,
warehouse_type=self.warehouse_type,
)
self._add_report_tracking(report_data, error)
if error:
Expand Down Expand Up @@ -147,8 +148,7 @@ def _add_report_tracking(
report_data.tracking = dict(
posthog_api_key=self.tracking.POSTHOG_PROJECT_API_KEY,
report_generator_anonymous_user_id=self.tracking.anonymous_user_id,
anonymous_warehouse_id=self.tracking.anonymous_warehouse
and self.tracking.anonymous_warehouse.id,
anonymous_warehouse_id=self.tracking.anonymous_warehouse_id,
)

def send_report(
Expand Down
5 changes: 5 additions & 0 deletions elementary/monitor/data_monitoring/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ class DataMonitoringReportTestResultsSchema(BaseModel):
class DataMonitoringReportTestRunsSchema(BaseModel):
runs: Dict[Optional[str], List[TestRunSchema]] = dict()
totals: Dict[Optional[str], TotalsSchema] = dict()


class WarehouseInfo(BaseModel):
id: str
type: str
44 changes: 1 addition & 43 deletions elementary/tracking/anonymous_tracking.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,26 @@
import json
import logging
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional

from pydantic import BaseModel

import elementary.exceptions.exceptions
import elementary.tracking.runner
from elementary.clients.dbt.dbt_runner import DbtRunner
from elementary.config.config import Config
from elementary.monitor import dbt_project_utils
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger

logging.getLogger("posthog").disabled = True
logger = get_logger(__name__)


class AnonymousWarehouse(BaseModel):
id: str
type: str


class AnonymousTracking(Tracking):
_ANONYMOUS_USER_ID_FILE = ".user_id"
_INTERNAL_EXCEPTIONS_LIMIT = 5

def __init__(self, config: Config) -> None:
super().__init__(config)
self.anonymous_user_id = None
self.anonymous_warehouse = None
self.anonymous_warehouse_id = None
self._do_not_track = config.anonymous_tracking_enabled is False
self._run_id = str(uuid.uuid4())

Expand All @@ -47,7 +37,6 @@ def _init(self):
try:
self._props["env"] = elementary.tracking.runner.get_props()
self.anonymous_user_id = self._get_anonymous_user_id()
self.anonymous_warehouse = self._get_anonymous_warehouse()
except Exception:
logger.debug("Unable to initialize anonymous tracking.", exc_info=True)

Expand Down Expand Up @@ -84,10 +73,6 @@ def _send_anonymous_event(
**self._props,
**properties,
},
groups={
"warehouse": self.anonymous_warehouse
and self.anonymous_warehouse.id
},
)
except Exception:
logger.debug("Unable to send tracking event.", exc_info=True)
Expand All @@ -104,33 +89,6 @@ def _get_exception_properties(exc: Exception):
props.update(exc.anonymous_tracking_context)
return props

def _get_anonymous_warehouse(self) -> Optional[AnonymousWarehouse]:
try:
dbt_runner = DbtRunner(
dbt_project_utils.PATH,
self._config.profiles_dir,
self._config.profile_target,
dbt_env_vars=self._config.dbt_env_vars,
)
if not dbt_project_utils.is_dbt_package_up_to_date():
logger.info("Downloading edr internal dbt package")
dbt_runner.deps(quiet=True)

adapter_type, adapter_unique_id = json.loads(
dbt_runner.run_operation("get_adapter_type_and_unique_id", quiet=True)[
0
]
)
anonymous_warehouse_id = self._hash(adapter_unique_id)
self._set_events_group(
"warehouse",
anonymous_warehouse_id,
{"id": anonymous_warehouse_id, "type": adapter_type},
)
return AnonymousWarehouse(id=anonymous_warehouse_id, type=adapter_type)
except Exception:
return None


class AnonymousCommandLineTracking(AnonymousTracking):
def track_cli_start(
Expand Down
43 changes: 26 additions & 17 deletions elementary/tracking/tracking_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import requests

from elementary.config.config import Config
from elementary.monitor.data_monitoring.schema import WarehouseInfo
from elementary.utils.hash import hash


Expand All @@ -15,6 +16,7 @@ class BaseTracking(ABC):
def __init__(self, config: Config):
self._config = config
self._props: Dict[str, Any] = {}
self.groups = {}

@staticmethod
def _hash(content: str):
Expand All @@ -27,46 +29,52 @@ def set_env(self, key: str, value):
self._props[key] = value

@abstractmethod
def _set_events_group(
group_type: str, group_identifier: str, group_props: Optional[dict] = None
def _register_group(
self, group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> None:
raise NotImplementedError

@abstractmethod
def _send_event(
self,
distinct_id: str,
event_name: str,
properties: Optional[dict] = None,
groups: Optional[dict] = None,
) -> None:
raise NotImplementedError

def register_warehouse_group(self, warehouse_info: WarehouseInfo) -> None:
self._register_group(
"warehouse",
warehouse_info.id,
warehouse_info.dict(),
)


class Tracking(BaseTracking):
def __init__(self, config: Config):
super().__init__(config)
posthog.project_api_key = self.POSTHOG_PROJECT_API_KEY

@staticmethod
def _set_events_group(
group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> None:
posthog.group_identify(group_type, group_identifier, group_props)

@staticmethod
def _send_event(
self,
distinct_id: str,
event_name: str,
properties: Optional[dict] = None,
groups: Optional[dict] = None,
) -> None:
posthog.capture(
distinct_id=distinct_id,
event=event_name,
properties=properties,
groups=groups,
groups=self.groups,
)

def _register_group(
self, group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> None:
posthog.group_identify(group_type, group_identifier, group_props)
self.groups[group_type] = group_identifier


class TrackingAPI(BaseTracking):
def __init__(self, config: Config):
Expand Down Expand Up @@ -108,26 +116,27 @@ def _capture(
api_key=self.POSTHOG_PROJECT_API_KEY,
event=event_name,
distinct_id=distinct_id,
properties={**properties, "$groups": groups},
properties={**(properties or {}), "$groups": groups},
),
)
return response

def _set_events_group(
def _register_group(
self, group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> requests.Response:
return self._group_identify(group_type, group_identifier, group_props)
resp = self._group_identify(group_type, group_identifier, group_props)
self.groups[group_type] = group_identifier
return resp

def _send_event(
self,
distinct_id: str,
event_name: str,
properties: Optional[dict] = None,
groups: Optional[dict] = None,
) -> requests.Response:
return self._capture(
distinct_id=distinct_id,
event_name=event_name,
properties=properties,
groups=groups,
groups=self.groups,
)

0 comments on commit abbfdf5

Please sign in to comment.