Skip to content

Commit

Permalink
Merge branch 'master' of github.com:elementary-data/elementary into e…
Browse files Browse the repository at this point in the history
…le-765-implement-columns-api
  • Loading branch information
IDoneShaveIt committed Jun 25, 2023
2 parents 285bc73 + e736708 commit 2aa08d9
Show file tree
Hide file tree
Showing 9 changed files with 373,401 additions and 373,590 deletions.
7 changes: 5 additions & 2 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
NormalizedSourceSchema,
TotalsSchema,
)
from elementary.monitor.api.report.schema import ReportDataSchema
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
from elementary.monitor.api.tests.schema import TestResultSchema, TestRunSchema
from elementary.monitor.api.tests.tests import TestsAPI
from elementary.monitor.data_monitoring.schema import SelectorFilterSchema
Expand All @@ -33,6 +33,7 @@ def get_report_data(
disable_samples: bool = False,
filter: SelectorFilterSchema = SelectorFilterSchema(),
env: Optional[str] = None,
warehouse_type: Optional[str] = None,
) -> Tuple[ReportDataSchema, Optional[Exception]]:
try:
tests_api = TestsAPI(
Expand Down Expand Up @@ -120,7 +121,9 @@ def get_report_data(
invocations=invocations,
resources_latest_invocation=resources_latest_invocation,
invocations_job_identification=invocations_job_identification,
env=dict(project_name=project_name, env=env),
env=ReportDataEnvSchema(
project_name=project_name, env=env, warehouse_type=warehouse_type
),
)
return report_data, None
except Exception as error:
Expand Down
8 changes: 7 additions & 1 deletion elementary/monitor/api/report/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from pydantic import BaseModel


class ReportDataEnvSchema(BaseModel):
project_name: Optional[str] = None
env: Optional[str] = None
warehouse_type: Optional[str] = None


class ReportDataSchema(BaseModel):
creation_time: Optional[str] = None
days_back: Optional[int] = None
Expand All @@ -21,5 +27,5 @@ class ReportDataSchema(BaseModel):
invocations: list = list()
resources_latest_invocation: dict = dict()
invocations_job_identification: dict = dict()
env: dict = dict()
env: ReportDataEnvSchema = ReportDataEnvSchema()
tracking: Optional[dict] = None
24 changes: 15 additions & 9 deletions elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,6 @@ def monitor(
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
anonymous_tracking.track_cli_start(
Command.MONITOR, get_cli_properties(), ctx.command.name
)
try:
config.validate_monitor()
data_monitoring = DataMonitoringAlerts(
Expand All @@ -288,6 +285,11 @@ def monitor(
disable_samples=disable_samples,
filter=select,
)
# The call to track_cli_start must be after the constructor of DataMonitoringAlerts as it enriches the tracking properties.
# This is a tech-debt that should be fixed in the future.
anonymous_tracking.track_cli_start(
Command.MONITOR, get_cli_properties(), ctx.command.name
)
success = data_monitoring.run_alerts(
days_back, full_refresh_dbt_package, dbt_vars=vars
)
Expand Down Expand Up @@ -364,9 +366,6 @@ def report(
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
anonymous_tracking.track_cli_start(
Command.REPORT, get_cli_properties(), ctx.command.name
)
try:
data_monitoring = DataMonitoringReport(
config=config,
Expand All @@ -375,6 +374,11 @@ def report(
disable_samples=disable_samples,
filter=select,
)
# The call to track_cli_start must be after the constructor of DataMonitoringAlerts as it enriches the tracking properties.
# This is a tech-debt that should be fixed in the future.
anonymous_tracking.track_cli_start(
Command.REPORT, get_cli_properties(), ctx.command.name
)
generated_report_successfully, _ = data_monitoring.generate_report(
days_back=days_back,
test_runs_amount=executions_limit,
Expand Down Expand Up @@ -555,9 +559,6 @@ def send_report(
)
anonymous_tracking = AnonymousCommandLineTracking(config)
anonymous_tracking.set_env("use_select", bool(select))
anonymous_tracking.track_cli_start(
Command.SEND_REPORT, get_cli_properties(), ctx.command.name
)
try:
config.validate_send_report()
# bucket-file-path determines the path of the report in the bucket.
Expand All @@ -574,6 +575,11 @@ def send_report(
disable_samples=disable_samples,
filter=select,
)
# The call to track_cli_start must be after the constructor of DataMonitoringAlerts as it enriches the tracking properties.
# This is a tech-debt that should be fixed in the future.
anonymous_tracking.track_cli_start(
Command.SEND_REPORT, get_cli_properties(), ctx.command.name
)
sent_report_successfully = data_monitoring.send_report(
days_back=days_back,
test_runs_amount=executions_limit,
Expand Down
33 changes: 33 additions & 0 deletions elementary/monitor/data_monitoring/data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from elementary.clients.slack.client import SlackClient
from elementary.config.config import Config
from elementary.monitor import dbt_project_utils
from elementary.monitor.data_monitoring.schema import WarehouseInfo
from elementary.monitor.data_monitoring.selector_filter import SelectorFilter
from elementary.tracking.anonymous_tracking import AnonymousTracking
from elementary.tracking.tracking_interface import Tracking
from elementary.utils import package
from elementary.utils.hash import hash
from elementary.utils.log import get_logger

logger = get_logger(__name__)
Expand All @@ -36,7 +39,16 @@ def __init__(
latest_invocation = self.get_latest_invocation()
self.project_name = latest_invocation.get("project_name")
dbt_pkg_version = latest_invocation.get("elementary_version")
self.warehouse_info = self._get_warehouse_info(
hash_id=isinstance(tracking, AnonymousTracking)
)
if tracking:
if self.warehouse_info:
tracking.register_group(
"warehouse",
self.warehouse_info.id,
self.warehouse_info.dict(),
)
tracking.set_env("target_name", latest_invocation.get("target_name"))
tracking.set_env("dbt_orchestrator", latest_invocation.get("orchestrator"))
tracking.set_env("dbt_version", latest_invocation.get("dbt_version"))
Expand Down Expand Up @@ -160,3 +172,24 @@ def _check_dbt_package_compatibility(dbt_pkg_ver_str: str) -> None:
logger.info(
f"edr ({py_pkg_ver}) and Elementary's dbt package ({dbt_pkg_ver}) are compatible."
)

def _get_warehouse_info(self, hash_id: bool = False) -> Optional[WarehouseInfo]:
dbt_runner = DbtRunner(
dbt_project_utils.PATH,
self.config.profiles_dir,
self.config.profile_target,
dbt_env_vars=self.config.dbt_env_vars,
)
try:
warehouse_type, warehouse_unique_id = json.loads(
dbt_runner.run_operation("get_adapter_type_and_unique_id", quiet=True)[
0
]
)
return WarehouseInfo(
id=warehouse_unique_id if not hash_id else hash(warehouse_unique_id),
type=warehouse_type,
)
except Exception:
logger.debug("Could not get warehouse info.", exc_info=True)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def get_report_data(
project_name=project_name or self.project_name,
filter=self.filter.get_filter(),
env=self.config.env,
warehouse_type=self.warehouse_info.type if self.warehouse_info else None,
)
self._add_report_tracking(report_data, error)
if error:
Expand Down Expand Up @@ -147,8 +148,9 @@ def _add_report_tracking(
report_data.tracking = dict(
posthog_api_key=self.tracking.POSTHOG_PROJECT_API_KEY,
report_generator_anonymous_user_id=self.tracking.anonymous_user_id,
anonymous_warehouse_id=self.tracking.anonymous_warehouse
and self.tracking.anonymous_warehouse.id,
anonymous_warehouse_id=self.warehouse_info.id
if self.warehouse_info
else None,
)

def send_report(
Expand Down
5 changes: 5 additions & 0 deletions elementary/monitor/data_monitoring/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ class DataMonitoringReportTestResultsSchema(BaseModel):
class DataMonitoringReportTestRunsSchema(BaseModel):
runs: Dict[Optional[str], List[TestRunSchema]] = dict()
totals: Dict[Optional[str], TotalsSchema] = dict()


class WarehouseInfo(BaseModel):
id: str
type: str
43 changes: 0 additions & 43 deletions elementary/tracking/anonymous_tracking.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,25 @@
import json
import logging
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional

from pydantic import BaseModel

import elementary.exceptions.exceptions
import elementary.tracking.runner
from elementary.clients.dbt.dbt_runner import DbtRunner
from elementary.config.config import Config
from elementary.monitor import dbt_project_utils
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger

logging.getLogger("posthog").disabled = True
logger = get_logger(__name__)


class AnonymousWarehouse(BaseModel):
id: str
type: str


class AnonymousTracking(Tracking):
_ANONYMOUS_USER_ID_FILE = ".user_id"
_INTERNAL_EXCEPTIONS_LIMIT = 5

def __init__(self, config: Config) -> None:
super().__init__(config)
self.anonymous_user_id = None
self.anonymous_warehouse = None
self._do_not_track = config.anonymous_tracking_enabled is False
self._run_id = str(uuid.uuid4())

Expand All @@ -47,7 +36,6 @@ def _init(self):
try:
self._props["env"] = elementary.tracking.runner.get_props()
self.anonymous_user_id = self._get_anonymous_user_id()
self.anonymous_warehouse = self._get_anonymous_warehouse()
except Exception:
logger.debug("Unable to initialize anonymous tracking.", exc_info=True)

Expand Down Expand Up @@ -84,10 +72,6 @@ def _send_anonymous_event(
**self._props,
**properties,
},
groups={
"warehouse": self.anonymous_warehouse
and self.anonymous_warehouse.id
},
)
except Exception:
logger.debug("Unable to send tracking event.", exc_info=True)
Expand All @@ -104,33 +88,6 @@ def _get_exception_properties(exc: Exception):
props.update(exc.anonymous_tracking_context)
return props

def _get_anonymous_warehouse(self) -> Optional[AnonymousWarehouse]:
try:
dbt_runner = DbtRunner(
dbt_project_utils.PATH,
self._config.profiles_dir,
self._config.profile_target,
dbt_env_vars=self._config.dbt_env_vars,
)
if not dbt_project_utils.is_dbt_package_up_to_date():
logger.info("Downloading edr internal dbt package")
dbt_runner.deps(quiet=True)

adapter_type, adapter_unique_id = json.loads(
dbt_runner.run_operation("get_adapter_type_and_unique_id", quiet=True)[
0
]
)
anonymous_warehouse_id = self._hash(adapter_unique_id)
self._set_events_group(
"warehouse",
anonymous_warehouse_id,
{"id": anonymous_warehouse_id, "type": adapter_type},
)
return AnonymousWarehouse(id=anonymous_warehouse_id, type=adapter_type)
except Exception:
return None


class AnonymousCommandLineTracking(AnonymousTracking):
def track_cli_start(
Expand Down
35 changes: 18 additions & 17 deletions elementary/tracking/tracking_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class BaseTracking(ABC):
def __init__(self, config: Config):
self._config = config
self._props: Dict[str, Any] = {}
self.groups = {}

@staticmethod
def _hash(content: str):
Expand All @@ -27,17 +28,17 @@ def set_env(self, key: str, value):
self._props[key] = value

@abstractmethod
def _set_events_group(
group_type: str, group_identifier: str, group_props: Optional[dict] = None
def register_group(
self, group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> None:
raise NotImplementedError

@abstractmethod
def _send_event(
self,
distinct_id: str,
event_name: str,
properties: Optional[dict] = None,
groups: Optional[dict] = None,
) -> None:
raise NotImplementedError

Expand All @@ -47,26 +48,25 @@ def __init__(self, config: Config):
super().__init__(config)
posthog.project_api_key = self.POSTHOG_PROJECT_API_KEY

@staticmethod
def _set_events_group(
group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> None:
posthog.group_identify(group_type, group_identifier, group_props)

@staticmethod
def _send_event(
self,
distinct_id: str,
event_name: str,
properties: Optional[dict] = None,
groups: Optional[dict] = None,
) -> None:
posthog.capture(
distinct_id=distinct_id,
event=event_name,
properties=properties,
groups=groups,
groups=self.groups,
)

def register_group(
self, group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> None:
posthog.group_identify(group_type, group_identifier, group_props)
self.groups[group_type] = group_identifier


class TrackingAPI(BaseTracking):
def __init__(self, config: Config):
Expand Down Expand Up @@ -108,26 +108,27 @@ def _capture(
api_key=self.POSTHOG_PROJECT_API_KEY,
event=event_name,
distinct_id=distinct_id,
properties={**properties, "$groups": groups},
properties={**(properties or {}), "$groups": groups},
),
)
return response

def _set_events_group(
def register_group(
self, group_type: str, group_identifier: str, group_props: Optional[dict] = None
) -> requests.Response:
return self._group_identify(group_type, group_identifier, group_props)
resp = self._group_identify(group_type, group_identifier, group_props)
self.groups[group_type] = group_identifier
return resp

def _send_event(
self,
distinct_id: str,
event_name: str,
properties: Optional[dict] = None,
groups: Optional[dict] = None,
) -> requests.Response:
return self._capture(
distinct_id=distinct_id,
event_name=event_name,
properties=properties,
groups=groups,
groups=self.groups,
)
Loading

0 comments on commit 2aa08d9

Please sign in to comment.