Skip to content

Commit

Permalink
Merge branch 'master' into enocht/properly-handle-validation-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
enochtangg authored Jun 11, 2024
2 parents f6e223e + 85d8728 commit 665ff47
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 10 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ jobs:
matrix:
version:
[
"22.8.15.25.altinitystable",
"23.3.13.7.altinitystable",
"23.8.8.21.altinitystable",
"23.3.19.33.altinitystable",
"23.8.11.29.altinitystable",
]

steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ schema:
{ name: group_status, type: UInt, args: { size: 8 } },
{ name: group_substatus, type: UInt, args: { size: 8, schema_modifiers: [ nullable ] } },
{ name: group_priority, type: UInt, args: { size: 8, schema_modifiers: [ nullable ] } },
{ name: group_first_release_id, type: UUID, args: { schema_modifiers: [ nullable ] } },
{ name: group_first_seen, type: DateTime },
{ name: group_num_comments, type: UInt, args: { size: 64 } },

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ schema:
{ name: group_status, type: UInt, args: { size: 8 } },
{ name: group_substatus, type: UInt, args: { size: 8, schema_modifiers: [ nullable ] } },
{ name: group_priority, type: UInt, args: { size: 8, schema_modifiers: [ nullable ] } },
{ name: group_first_release_id, type: UUID, args: { schema_modifiers: [ nullable ] } },
{ name: group_first_seen, type: DateTime },
{ name: group_num_comments, type: UInt, args: { size: 64 } },

Expand Down Expand Up @@ -58,6 +59,9 @@ allocation_policies:
query_processors:
- processor: TableRateLimit
- processor: ConsistencyEnforcerProcessor
- processor: UUIDColumnProcessor
args:
columns: [group_first_release_id]

mandatory_condition_checkers:
- condition: ProjectIdEnforcer
Expand Down
1 change: 1 addition & 0 deletions snuba/datasets/processors/group_attributes_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def process_message(
"group_status": message["status"],
"group_substatus": message["substatus"],
"group_priority": message.get("priority", None),
"group_first_release_id": message.get("first_release_id", None),
"group_first_seen": datetime.strptime(
message["first_seen"], settings.PAYLOAD_DATETIME_FORMAT
),
Expand Down
45 changes: 40 additions & 5 deletions snuba/query/allocation_policies/per_referrer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
logger = logging.getLogger("snuba.query.allocation_policy_per_referrer")

_DEFAULT_MAX_THREADS = 10
_DEFAULT_CONCURRENT_REQUEST_PER_REFERRER = 100
_REFERRER_CONCURRENT_OVERRIDE = -1
_REFERRER_MAX_THREADS_OVERRIDE = -1
_REQUESTS_THROTTLE_DIVIDER = 1
_THREADS_THROTTLE_DIVIDER = 1


class ReferrerGuardRailPolicy(BaseConcurrentRateLimitAllocationPolicy):
Expand All @@ -28,7 +33,9 @@ class ReferrerGuardRailPolicy(BaseConcurrentRateLimitAllocationPolicy):
This concern is orthogonal to customer rate limits in its purpose. This rate limiter being tripped is a problem
caused by sentry developers, not customer abuse. It either means that a feature was release that queries this referrer
too much or that an appropriate rate limit was not set somewhere upstream. It affects customers randomly and basically
acts as a load shedder.
acts as a load shedder. As a referrer approaches the rate limiter's threshold for rejecting queries, that referrer's
queries will get throttled. The threshold for throttling and the (reduced) number of threads are configurable via
_REQUESTS_THROTTLE_DIVIDER and _THREADS_THROTTLE_DIVIDER
For example, a product team may push out a feature that sends 20 snuba queries every 5 seconds on the UI.
In that case, that feature should break but others should continue to be served.
Expand All @@ -47,21 +54,33 @@ def _additional_config_definitions(self) -> list[AllocationPolicyConfig]:
""",
value_type=int,
param_types={},
default=100,
default=_DEFAULT_CONCURRENT_REQUEST_PER_REFERRER,
),
AllocationPolicyConfig(
name="referrer_concurrent_override",
description="""override the concurrent limit for a referrer""",
value_type=int,
param_types={"referrer": str},
default=-1,
default=_REFERRER_CONCURRENT_OVERRIDE,
),
AllocationPolicyConfig(
name="referrer_max_threads_override",
description="""override the max_threads for a referrer, applies to every query made by that referrer""",
param_types={"referrer": str},
value_type=int,
default=-1,
default=_REFERRER_MAX_THREADS_OVERRIDE,
),
AllocationPolicyConfig(
name="requests_throttle_divider",
description="default_concurrent_request_per_referrer divided by this value will be the threshold at which we will decrease the number of threads (THROTTLED_THREADS) used to execute queries",
value_type=int,
default=_REQUESTS_THROTTLE_DIVIDER,
),
AllocationPolicyConfig(
name="threads_throttle_divider",
description="max threads divided by this number is the number of threads we use to execute queries for a throttled referrer",
value_type=int,
default=_THREADS_THROTTLE_DIVIDER,
),
]

Expand Down Expand Up @@ -100,6 +119,22 @@ def _get_quota_allowance(
query_id,
rate_limit_params,
)
assert (
rate_limit_params.concurrent_limit is not None
), "concurrent_limit must be set"
num_threads = self._get_max_threads(referrer)
requests_throttle_threshold = max(
1,
self.get_config_value("default_concurrent_request_per_referrer")
// self.get_config_value("requests_throttle_divider"),
)
if rate_limit_stats.concurrent > requests_throttle_threshold:
num_threads = max(
1, num_threads // self.get_config_value("threads_throttle_divider")
)
self.metrics.increment(
"concurrent_queries_throttled", tags={"referrer": referrer}
)
self.metrics.timing(
"concurrent_queries_referrer",
rate_limit_stats.concurrent,
Expand All @@ -112,7 +147,7 @@ def _get_quota_allowance(
}
return QuotaAllowance(
can_run=can_run,
max_threads=self._get_max_threads(referrer),
max_threads=num_threads,
explanation=decision_explanation,
)

Expand Down
4 changes: 2 additions & 2 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,10 @@ def db_query(
metrics.increment("cache_hit", tags={"dataset": dataset_name})
elif stats.get("is_duplicate"):
metrics.increment("cache_stampede", tags={"dataset": dataset_name})
elif stats.get("cache_hit_simple"):
metrics.increment("cache_hit_simple", tags={"dataset": dataset_name})
else:
metrics.increment("cache_miss", tags={"dataset": dataset_name})
if stats.get("cache_hit_simple"):
metrics.increment("cache_hit_simple", tags={"dataset": dataset_name})
if result:
return result
raise error or Exception(
Expand Down
7 changes: 7 additions & 0 deletions tests/datasets/test_group_attributes_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from typing import Optional

Expand All @@ -14,6 +15,8 @@
from snuba.processor import ProcessedMessage
from snuba.writer import WriterTableRow

RELEASE_ID = uuid.uuid4()


@pytest.fixture
def group_created() -> GroupAttributesSnapshot:
Expand All @@ -23,6 +26,8 @@ def group_created() -> GroupAttributesSnapshot:
"group_id": 1,
"status": 0,
"substatus": 7,
"priority": 25,
"first_release_id": RELEASE_ID,
"first_seen": "2023-02-27T15:40:12.223000Z",
"num_comments": 0,
"assignee_user_id": None,
Expand Down Expand Up @@ -59,6 +64,8 @@ def test_group_created(self, group_created):
"group_id": 1,
"group_status": 0,
"group_substatus": 7,
"group_priority": 25,
"group_first_release_id": RELEASE_ID,
"group_first_seen": datetime.strptime(
group_created["first_seen"], settings.PAYLOAD_DATETIME_FORMAT
),
Expand Down
28 changes: 28 additions & 0 deletions tests/query/allocation_policies/test_per_referrer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,34 @@ def test_policy_pass_basic(self):
tenant_ids={"referrer": "statistical_detectors"}, query_id="4"
).can_run

@pytest.mark.redis_db
def test_throttle(self) -> None:
policy = ReferrerGuardRailPolicy.from_kwargs(
**{
"storage_key": "generic_metrics_distributions",
"required_tenant_types": ["referrer"],
}
)

policy.set_config_value("default_concurrent_request_per_referrer", 4)
policy.set_config_value("requests_throttle_divider", 2)
policy.set_config_value("threads_throttle_divider", 2)
first_quota_allowance = policy.get_quota_allowance(
tenant_ids={"referrer": "statistical_detectors"}, query_id="1"
)
assert first_quota_allowance.max_threads == policy.max_threads

second_quota_allowance = policy.get_quota_allowance(
tenant_ids={"referrer": "statistical_detectors"}, query_id="2"
)
assert second_quota_allowance.max_threads == policy.max_threads

third_quota_allowance = policy.get_quota_allowance(
tenant_ids={"referrer": "statistical_detectors"}, query_id="3"
)
assert third_quota_allowance.max_threads == policy.max_threads // 2
assert third_quota_allowance.can_run

@pytest.mark.redis_db
def test_override(self):
policy = ReferrerGuardRailPolicy.from_kwargs(
Expand Down
57 changes: 57 additions & 0 deletions tests/web/test_db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,60 @@ def test_clickhouse_settings_applied_to_query_id(
assert ("cache_hit_simple" in stats) == test_cache_hit_simple
assert clickhouse_query_settings["query_id"].startswith(expected_startswith)
assert _get_cache_partition(reader).get("test_query_id") is not None


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_cache_metrics_with_simple_readthrough() -> None:
query, storage, attribution_info = _build_test_query("count(distinct(project_id))")
state.set_config("disable_lua_randomize_query_id", 1)
state.set_config("read_through_cache.disable_lua_scripts_sample_rate", 1)

formatted_query = format_query(query)
reader = storage.get_cluster().get_reader()

with mock.patch("snuba.web.db_query.metrics", new=mock.Mock()) as metrics_mock:
result = db_query(
clickhouse_query=query,
query_settings=HTTPQuerySettings(),
attribution_info=attribution_info,
dataset_name="events",
query_metadata_list=[],
formatted_query=formatted_query,
reader=reader,
timer=Timer("foo"),
stats={},
trace_id="trace_id",
robust=False,
)
assert "cache_hit_simple" in result.extra["stats"]
# Assert on first call cache_miss is incremented
metrics_mock.assert_has_calls(
[
mock.call.increment("cache_miss", tags={"dataset": "events"}),
mock.call.increment("cache_hit_simple", tags={"dataset": "events"}),
]
)

metrics_mock.reset_mock()
result = db_query(
clickhouse_query=query,
query_settings=HTTPQuerySettings(),
attribution_info=attribution_info,
dataset_name="events",
query_metadata_list=[],
formatted_query=formatted_query,
reader=reader,
timer=Timer("foo"),
stats={},
trace_id="trace_id",
robust=False,
)
assert "cache_hit_simple" in result.extra["stats"]
# Assert on second call cache_hit is incremented
metrics_mock.assert_has_calls(
[
mock.call.increment("cache_hit", tags={"dataset": "events"}),
mock.call.increment("cache_hit_simple", tags={"dataset": "events"}),
]
)

0 comments on commit 665ff47

Please sign in to comment.