Skip to content

Commit

Permalink
fix(capman): emit the bytes scanned metric from db_query (#6075)
Browse files Browse the repository at this point in the history
* emit the bytes scanned metric from db_query

* 0 not None

* forgot 2 save

* style(lint): Auto commit lint changes

* fix test

* mypy

---------

Co-authored-by: Volo Kluev <[email protected]>
Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 5, 2024
1 parent 2cd5cc1 commit a15ff10
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 8 deletions.
4 changes: 4 additions & 0 deletions snuba/query/allocation_policies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ def _update_quota_balance(
) -> None:
pass

@property
def storage_key(self) -> StorageKey:
return self._storage_key


class PassthroughPolicy(AllocationPolicy):
def _additional_config_definitions(self) -> list[AllocationPolicyConfig]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,6 @@ def _update_quota_balance(
return
if bytes_scanned == 0:
return
# we emitted both kinds of bytes scanned in _get_bytes_scanned_in_query however
# this metric shows what is actually being used to enforce the policy
self.metrics.increment(
"bytes_scanned",
bytes_scanned,
tags={"referrer": str(tenant_ids.get("referrer", "no_referrer"))},
)
if "organization_id" in tenant_ids:
org_limit_bytes_scanned = self.__get_org_limit_bytes_scanned(
tenant_ids.get("organization_id")
Expand Down
31 changes: 30 additions & 1 deletion snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from snuba.clickhouse.query import Query
from snuba.clickhouse.query_dsl.accessors import get_time_range_estimate
from snuba.clickhouse.query_profiler import generate_profile
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query import ProcessableQuery
from snuba.query.allocation_policies import (
MAX_THRESHOLD,
Expand Down Expand Up @@ -681,6 +682,27 @@ def visit_join_clause(self, node: JoinClause[Table]) -> None:
node.right_node.accept(self)


def _record_bytes_scanned(
result_or_error: QueryResultOrError,
attribution_info: AttributionInfo,
dataset_name: str,
storage_key: StorageKey,
) -> None:
custom_metrics = MetricsWrapper(environment.metrics, "allocation_policy")

if result_or_error.query_result:
progress_bytes_scanned = cast(int, result_or_error.query_result.result.get("profile", {}).get("progress_bytes", 0)) # type: ignore
custom_metrics.increment(
"bytes_scanned",
progress_bytes_scanned,
tags={
"referrer": attribution_info.referrer,
"dataset": dataset_name,
"storage_key": storage_key.value,
},
)


def db_query(
clickhouse_query: Union[Query, CompositeQuery[Table]],
query_settings: QuerySettings,
Expand Down Expand Up @@ -792,11 +814,18 @@ def db_query(
# if it didn't do that, something is very wrong so we just panic out here
raise e
finally:
result_or_error = QueryResultOrError(query_result=result, error=error)
_record_bytes_scanned(
result_or_error,
attribution_info,
dataset_name,
allocation_policies[0].storage_key,
)
for allocation_policy in allocation_policies:
allocation_policy.update_quota_balance(
tenant_ids=attribution_info.tenant_ids,
query_id=query_id,
result_or_error=QueryResultOrError(query_result=result, error=error),
result_or_error=result_or_error,
)
if stats.get("cache_hit"):
metrics.increment("cache_hit", tags={"dataset": dataset_name})
Expand Down
39 changes: 39 additions & 0 deletions tests/web/test_db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
AllocationPolicy,
AllocationPolicyConfig,
AllocationPolicyViolations,
PassthroughPolicy,
QueryResultOrError,
QuotaAllowance,
)
Expand All @@ -29,6 +30,7 @@
from snuba.query.query_settings import HTTPQuerySettings
from snuba.querylog.query_metadata import ClickhouseQueryMetadata
from snuba.state.quota import ResourceQuota
from snuba.utils.metrics.backends.testing import get_recorded_metric_calls
from snuba.utils.metrics.timer import Timer
from snuba.web import QueryException
from snuba.web.db_query import (
Expand Down Expand Up @@ -243,6 +245,43 @@ def _build_test_query(
)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_db_record_bytes_scanned() -> None:
dataset_name = "events"
storage_key = StorageKey("errors_ro")
query, storage, attribution_info = _build_test_query(
"count(distinct(project_id))",
allocation_policies=[PassthroughPolicy(storage_key, [], {})],
)

query_metadata_list: list[ClickhouseQueryMetadata] = []
stats: dict[str, Any] = {}

db_query(
clickhouse_query=query,
query_settings=HTTPQuerySettings(),
attribution_info=attribution_info,
dataset_name=dataset_name,
query_metadata_list=query_metadata_list,
formatted_query=format_query(query),
reader=storage.get_cluster().get_reader(),
timer=Timer("foo"),
stats=stats,
trace_id="trace_id",
robust=False,
)

metrics = get_recorded_metric_calls("increment", "allocation_policy.bytes_scanned")
assert metrics
assert len(metrics) == 1
assert metrics[0].tags == {
"referrer": attribution_info.referrer,
"dataset": dataset_name,
"storage_key": storage_key.value,
}


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_db_query_success() -> None:
Expand Down

0 comments on commit a15ff10

Please sign in to comment.