Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(capman): emit the bytes scanned metric from db_query #6075

Merged
merged 6 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions snuba/query/allocation_policies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ def _update_quota_balance(
) -> None:
pass

@property
def storage_key(self) -> StorageKey:
return self._storage_key


class PassthroughPolicy(AllocationPolicy):
def _additional_config_definitions(self) -> list[AllocationPolicyConfig]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,6 @@ def _update_quota_balance(
return
if bytes_scanned == 0:
return
# we emitted both kinds of bytes scanned in _get_bytes_scanned_in_query however
# this metric shows what is actually being used to enforce the policy
self.metrics.increment(
"bytes_scanned",
bytes_scanned,
tags={"referrer": str(tenant_ids.get("referrer", "no_referrer"))},
)
if "organization_id" in tenant_ids:
org_limit_bytes_scanned = self.__get_org_limit_bytes_scanned(
tenant_ids.get("organization_id")
Expand Down
31 changes: 30 additions & 1 deletion snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from snuba.clickhouse.query import Query
from snuba.clickhouse.query_dsl.accessors import get_time_range_estimate
from snuba.clickhouse.query_profiler import generate_profile
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query import ProcessableQuery
from snuba.query.allocation_policies import (
MAX_THRESHOLD,
Expand Down Expand Up @@ -681,6 +682,27 @@ def visit_join_clause(self, node: JoinClause[Table]) -> None:
node.right_node.accept(self)


def _record_bytes_scanned(
result_or_error: QueryResultOrError,
attribution_info: AttributionInfo,
dataset_name: str,
storage_key: StorageKey,
) -> None:
custom_metrics = MetricsWrapper(environment.metrics, "allocation_policy")

if result_or_error.query_result:
progress_bytes_scanned = cast(int, result_or_error.query_result.result.get("profile", {}).get("progress_bytes", 0)) # type: ignore
custom_metrics.increment(
"bytes_scanned",
progress_bytes_scanned,
tags={
"referrer": attribution_info.referrer,
"dataset": dataset_name,
"storage_key": storage_key.value,
},
)


def db_query(
clickhouse_query: Union[Query, CompositeQuery[Table]],
query_settings: QuerySettings,
Expand Down Expand Up @@ -792,11 +814,18 @@ def db_query(
# if it didn't do that, something is very wrong so we just panic out here
raise e
finally:
result_or_error = QueryResultOrError(query_result=result, error=error)
_record_bytes_scanned(
result_or_error,
attribution_info,
dataset_name,
allocation_policies[0].storage_key,
)
for allocation_policy in allocation_policies:
allocation_policy.update_quota_balance(
tenant_ids=attribution_info.tenant_ids,
query_id=query_id,
result_or_error=QueryResultOrError(query_result=result, error=error),
result_or_error=result_or_error,
)
if stats.get("cache_hit"):
metrics.increment("cache_hit", tags={"dataset": dataset_name})
Expand Down
39 changes: 39 additions & 0 deletions tests/web/test_db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
AllocationPolicy,
AllocationPolicyConfig,
AllocationPolicyViolations,
PassthroughPolicy,
QueryResultOrError,
QuotaAllowance,
)
Expand All @@ -29,6 +30,7 @@
from snuba.query.query_settings import HTTPQuerySettings
from snuba.querylog.query_metadata import ClickhouseQueryMetadata
from snuba.state.quota import ResourceQuota
from snuba.utils.metrics.backends.testing import get_recorded_metric_calls
from snuba.utils.metrics.timer import Timer
from snuba.web import QueryException
from snuba.web.db_query import (
Expand Down Expand Up @@ -243,6 +245,43 @@ def _build_test_query(
)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_db_record_bytes_scanned() -> None:
dataset_name = "events"
storage_key = StorageKey("errors_ro")
query, storage, attribution_info = _build_test_query(
"count(distinct(project_id))",
allocation_policies=[PassthroughPolicy(storage_key, [], {})],
)

query_metadata_list: list[ClickhouseQueryMetadata] = []
stats: dict[str, Any] = {}

db_query(
clickhouse_query=query,
query_settings=HTTPQuerySettings(),
attribution_info=attribution_info,
dataset_name=dataset_name,
query_metadata_list=query_metadata_list,
formatted_query=format_query(query),
reader=storage.get_cluster().get_reader(),
timer=Timer("foo"),
stats=stats,
trace_id="trace_id",
robust=False,
)

metrics = get_recorded_metric_calls("increment", "allocation_policy.bytes_scanned")
assert metrics
assert len(metrics) == 1
assert metrics[0].tags == {
"referrer": attribution_info.referrer,
"dataset": dataset_name,
"storage_key": storage_key.value,
}


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_db_query_success() -> None:
Expand Down
Loading