From eb00fd5d58a2d2d249f259a2ab390d7219854bc2 Mon Sep 17 00:00:00 2001 From: xurui-c <159840875+xurui-c@users.noreply.github.com> Date: Wed, 12 Jun 2024 13:13:46 -0700 Subject: [PATCH] code (#6024) Co-authored-by: Rachel Chen --- .../bytes_scanned_rejecting_policy.py | 32 +++++++++++++ .../test_bytes_scanned_rejecting_policy.py | 47 +++++++++++++++++++ tests/web/test_db_query.py | 2 +- 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py b/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py index 4e5a204ea7..fe19772cf3 100644 --- a/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py +++ b/snuba/query/allocation_policies/bytes_scanned_rejecting_policy.py @@ -41,6 +41,8 @@ PETABYTE = 10**12 DEFAULT_BYTES_SCANNED_LIMIT = int(1.28 * PETABYTE) DEFAULT_TIMEOUT_PENALIZATION = DEFAULT_BYTES_SCANNED_LIMIT // 40 +DEFAULT_BYTES_THROTTLE_DIVIDER = 1 +DEFAULT_THREADS_THROTTLE_DIVIDER = 1 class BytesScannedRejectingPolicy(AllocationPolicy): @@ -91,6 +93,18 @@ def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: int, DEFAULT_TIMEOUT_PENALIZATION, ), + AllocationPolicyConfig( + "bytes_throttle_divider", + "Divide the scan limit by this number gives the throttling threshold", + int, + DEFAULT_BYTES_THROTTLE_DIVIDER, + ), + AllocationPolicyConfig( + "threads_throttle_divider", + "max threads divided by this number is the number of threads we use to execute queries for a throttled (project_id|organization_id, referrer)", + int, + DEFAULT_BYTES_THROTTLE_DIVIDER, + ), ] def _are_tenant_ids_valid( @@ -206,6 +220,24 @@ def _get_quota_allowance( }, ) return QuotaAllowance(False, self.max_threads, explanation) + + throttle_threshold = max( + 1, scan_limit // self.get_config_value("bytes_throttle_divider") + ) + if granted_quota.granted < throttle_threshold: + self.metrics.increment( + "bytes_scanned_queries_throttled", + tags={"referrer": str(tenant_ids.get("referrer", "no_referrer"))}, + ) + return QuotaAllowance( + True, + max( + 1, + self.max_threads + // self.get_config_value("threads_throttle_divider"), + ), + {"reason": "within_limit but throttled"}, + ) return QuotaAllowance(True, self.max_threads, {"reason": "within_limit"}) def _get_bytes_scanned_in_query( diff --git a/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py b/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py index b27e84a6b4..b31c680d26 100644 --- a/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py +++ b/tests/query/allocation_policies/test_bytes_scanned_rejecting_policy.py @@ -107,6 +107,53 @@ def test_consume_quota( assert allowance.can_run +@pytest.mark.redis_db +def test_throttles( + policy: BytesScannedRejectingPolicy, +) -> None: + _configure_policy(policy) + policy.set_config_value("bytes_throttle_divider", 2) + policy.set_config_value("threads_throttle_divider", 2) + tenant_ids: dict[str, int | str] = { + "organization_id": 123, + "project_id": 12345, + "referrer": "some_referrer", + } + allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID) + assert allowance.max_threads == MAX_THREAD_NUMBER + policy.update_quota_balance( + tenant_ids, + QUERY_ID, + QueryResultOrError( + query_result=QueryResult( + result={"profile": {"progress_bytes": 1}}, + extra={"stats": {}, "sql": "", "experiments": {}}, + ), + error=None, + ), + ) + + allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID) + assert allowance.max_threads == MAX_THREAD_NUMBER + policy.update_quota_balance( + tenant_ids, + QUERY_ID, + QueryResultOrError( + query_result=QueryResult( + result={ + "profile": {"progress_bytes": PROJECT_REFERRER_SCAN_LIMIT // 2} + }, + extra={"stats": {}, "sql": "", "experiments": {}}, + ), + error=None, + ), + ) + + allowance = policy.get_quota_allowance(tenant_ids, QUERY_ID) + assert allowance.max_threads == MAX_THREAD_NUMBER // 2 + assert allowance.explanation["reason"] == "within_limit but throttled" + + @pytest.mark.redis_db def test_cross_org_query(policy: BytesScannedRejectingPolicy) -> None: _configure_policy(policy) diff --git a/tests/web/test_db_query.py b/tests/web/test_db_query.py index d7ac157be3..0d26464bbb 100644 --- a/tests/web/test_db_query.py +++ b/tests/web/test_db_query.py @@ -286,7 +286,7 @@ def test_db_query_success() -> None: "can_run": True, "max_threads": 10, "explanation": { - "reason": "within_limit", + "reason": "within_limit but throttled", "storage_key": "StorageKey.ERRORS_RO", }, },