Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add adapter telemetry to snowplow event. #10859

Merged
merged 13 commits into from
Oct 28, 2024
14 changes: 12 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time
from copy import deepcopy
from dataclasses import asdict
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

Expand Down Expand Up @@ -100,7 +101,14 @@ def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str
return status, message


def track_model_run(index, num_nodes, run_model_result):
def _get_adapter_info(adapter, run_model_result) -> Dict[str, Any]:
"""Each adapter returns a dataclass with a flexible dictionary for
adapter-specific fields. Only the non-'model_adapter_details' fields
are guaranteed cross adapter."""
return asdict(adapter.get_adapter_run_info(run_model_result.node.config)) if adapter else {}


def track_model_run(index, num_nodes, run_model_result, adapter=None):
if tracking.active_user is None:
raise DbtInternalError("cannot track model run with no active user")
invocation_id = get_invocation_id()
Expand All @@ -116,6 +124,7 @@ def track_model_run(index, num_nodes, run_model_result):
contract_enforced = False
versioned = False
incremental_strategy = None

tracking.track_model_run(
{
"invocation_id": invocation_id,
Expand All @@ -135,6 +144,7 @@ def track_model_run(index, num_nodes, run_model_result):
"contract_enforced": contract_enforced,
"access": access,
"versioned": versioned,
"adapter_info": _get_adapter_info(adapter, run_model_result),
}
)

Expand Down Expand Up @@ -283,7 +293,7 @@ def before_execute(self) -> None:
self.print_start_line()

def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
track_model_run(self.node_index, self.num_nodes, result, adapter=self.adapter)
self.print_result_line(result)

def _build_run_model_result(self, model, context, elapsed_time: float = 0.0):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
RESOURCE_COUNTS = "iglu:com.dbt/resource_counts/jsonschema/1-0-1"
RPC_REQUEST_SPEC = "iglu:com.dbt/rpc_request/jsonschema/1-0-1"
RUNNABLE_TIMING = "iglu:com.dbt/runnable/jsonschema/1-0-0"
RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-0-4"
RUN_MODEL_SPEC = "iglu:com.dbt/run_model/jsonschema/1-1-0"
PLUGIN_GET_NODES = "iglu:com.dbt/plugin_get_nodes/jsonschema/1-0-0"

SNOWPLOW_TRACKER_VERSION = Version(snowplow_version)
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git@ADAP-301/add-adapter-telemetry
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
19 changes: 18 additions & 1 deletion tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timedelta
from importlib import import_module
from typing import Optional
from unittest.mock import MagicMock, patch

Expand All @@ -18,7 +19,7 @@
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import LogModelResult
from dbt.flags import get_flags, set_from_args
from dbt.task.run import ModelRunner, RunTask
from dbt.task.run import ModelRunner, RunTask, _get_adapter_info
from dbt.tests.util import safe_set_invocation_context
from dbt_common.events.base_types import EventLevel
from dbt_common.events.event_manager_client import add_callback_to_manager
Expand Down Expand Up @@ -68,6 +69,22 @@ def test_run_task_preserve_edges():
mock_node_selector.get_graph_queue.assert_called_with(mock_spec, True)


def test_tracking_fails_safely_for_missing_adapter():
assert {} == _get_adapter_info(None, {})


def test_adapter_info_tracking():
mock_run_result = MagicMock()
mock_run_result.node = MagicMock()
mock_run_result.node.config = {}
assert _get_adapter_info(PostgresAdapter, mock_run_result) == {
"model_adapter_details": {},
"adapter_name": PostgresAdapter.__name__.split("Adapter")[0].lower(),
"adapter_version": import_module("dbt.adapters.postgres.__version__").version,
"base_adapter_version": import_module("dbt.adapters.__about__").version,
}


class TestModelRunner:
@pytest.fixture
def log_model_result_catcher(self) -> EventCatcher:
Expand Down
Loading