diff --git a/CHANGELOG.md b/CHANGELOG.md index ff24d645bce..640faaeac9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,33 @@ # Changelog +## 24.12.0 + +### Various fixes & improvements + +- feat(inc-984): store project ids list in dictionary in scrub job (#6675) by @volokluev +- ref(lw-deletes): enforce ratelimiter (#6644) by @MeredithAnya +- fix(admin): Allow KILL MUTATION commands in sudo mode (#6672) by @evanh +- fix(inc984): align start/end timestamp to partition boundaries (#6670) by @volokluev +- chore(deps): bump relay from 0.9.2 to 0.9.4 (#6660) by @jjbayer +- feat(inc984): make mutation condition simpler (#6669) by @volokluev +- chore: Bump Arroyo to 2.19.5 (#6666) by @ayirr7 +- ref: bump sentry-arroyo to 2.19.4 (#6663) by @getsentry-bot +- fix(eap-alerts): Fix subscriptions referrer for eap alerts (#6662) by @shruthilayaj +- chore(api): Do not log healthcheck error if downfile exists (#6635) by @untitaker +- feat(eap): add additional validation for group by (#6659) by @davidtsuk +- feat(eap): add default value to virtual column (#6657) by @davidtsuk +- ref: bump sentry-arroyo to 2.19.3 (#6656) by @getsentry-bot +- Implement filter offset for attribute keys API (#6618) by @xurui-c +- feat: make sentry RPC instrumentation more specific to the endpoint (#6654) by @kylemumma +- fix(consumers): Respect 60 day retention days period (#6631) by @volokluev +- feat: add missing example in admin rpc tool (#6647) by @kylemumma +- hotfix(inc-984): Add manual job to scrub IPs from spans (#6649) by @volokluev +- feat: support 15 minute granularity on eap time series RPC (#6645) by @kylemumma +- fix(eap): Fix divide by 0 bug (#6653) by @davidtsuk +- fix: run sentry tests when RPC changes (#6652) by @colin-sentry +- meta: Bump new development version (60ff5441) +- chore(eap-spans): Take advantage of parallel reads (#6579) by @phacops + ## 24.11.2 ### Various fixes & improvements diff --git a/devservices/config.yml b/devservices/config.yml index add64b98a6f..e22ad1a0a62 100644 --- a/devservices/config.yml +++ b/devservices/config.yml @@ -46,6 +46,8 @@ services: host.docker.internal: host-gateway networks: - devservices + labels: + - orchestrator=devservices restart: unless-stopped snuba: diff --git a/docs/source/clickhouse/supported_versions.rst b/docs/source/clickhouse/supported_versions.rst index 29b55090ec5..00ce3b3d37a 100644 --- a/docs/source/clickhouse/supported_versions.rst +++ b/docs/source/clickhouse/supported_versions.rst @@ -1,12 +1,10 @@ ============================= ClickHouse supported versions ============================= -The following versions of Clickhouse have been tested and are known to work +The following version(s) of Clickhouse have been tested and are known to work with Snuba: -- 20.3 -- 20.7 -- 21.8 +- 23.8.11.29 (Altinity Stable Build) Any version of Clikhouse used outside of this list could potentially work, but is not guaranteed to work. Some functionality might be broken. Use a diff --git a/docs/source/conf.py b/docs/source/conf.py index 4c8173384af..52780b7553d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,7 @@ copyright = "2021, Sentry Team and Contributors" author = "Sentry Team and Contributors" -release = "24.12.0.dev0" +release = "25.1.0.dev0" # -- General configuration --------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 2687f74fe77..d802adce545 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ click==8.1.7 clickhouse-driver==0.2.6 confluent-kafka==2.3.0 datadog==0.21.0 -devservices==1.0.5 +devservices==1.0.6 flake8==7.0.0 Flask==2.2.5 google-cloud-storage==2.18.0 diff --git a/rust_snuba/rust-toolchain.toml b/rust_snuba/rust-toolchain.toml index 27ae62c5bcb..0193dee3606 100644 --- a/rust_snuba/rust-toolchain.toml +++ b/rust_snuba/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.74.1" +channel = "1.83.0" diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 262ac0b3773..9924e09d408 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -246,20 +246,11 @@ pub fn consumer_impl( let processor = if mutations_mode { let mut_factory = MutConsumerStrategyFactory { storage_config: first_storage, - env_config, - logical_topic_name, max_batch_size, max_batch_time, processing_concurrency: ConcurrencyConfig::new(concurrency), clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), - async_inserts, - python_max_queue_depth, - use_rust_processor, health_check_file: health_check_file.map(ToOwned::to_owned), - enforce_schema, - physical_consumer_group: consumer_group.to_owned(), - physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), - accountant_topic_config: consumer_config.accountant_topic, batch_write_timeout, }; diff --git a/rust_snuba/src/metrics/global_tags.rs b/rust_snuba/src/metrics/global_tags.rs index 5dc5f99e996..8d5177ae568 100644 --- a/rust_snuba/src/metrics/global_tags.rs +++ b/rust_snuba/src/metrics/global_tags.rs @@ -36,7 +36,7 @@ where } } -impl<'a, M> Middleware for AddGlobalTags<'a, M> +impl Middleware for AddGlobalTags<'_, M> where M: Middleware, { diff --git a/rust_snuba/src/mutations/factory.rs b/rust_snuba/src/mutations/factory.rs index 12f3644018a..04d7b8ae874 100644 --- a/rust_snuba/src/mutations/factory.rs +++ b/rust_snuba/src/mutations/factory.rs @@ -13,7 +13,7 @@ use sentry_arroyo::processing::strategies::run_task_in_threads::{ }; use sentry_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory}; use sentry_arroyo::types::Message; -use sentry_arroyo::types::{Partition, Topic}; +use sentry_arroyo::types::Partition; use crate::config; use crate::metrics::global_tags::set_global_tag; @@ -25,20 +25,11 @@ use crate::mutations::synchronize::Synchronizer; pub struct MutConsumerStrategyFactory { pub storage_config: config::StorageConfig, - pub env_config: config::EnvConfig, - pub logical_topic_name: String, pub max_batch_size: usize, pub max_batch_time: Duration, pub processing_concurrency: ConcurrencyConfig, pub clickhouse_concurrency: ConcurrencyConfig, - pub async_inserts: bool, - pub python_max_queue_depth: Option, - pub use_rust_processor: bool, pub health_check_file: Option, - pub enforce_schema: bool, - pub physical_consumer_group: String, - pub physical_topic_name: Topic, - pub accountant_topic_config: config::TopicConfig, pub batch_write_timeout: Option, } diff --git a/rust_snuba/src/processors/eap_spans.rs b/rust_snuba/src/processors/eap_spans.rs index 998bee9e6ca..5017430a289 100644 --- a/rust_snuba/src/processors/eap_spans.rs +++ b/rust_snuba/src/processors/eap_spans.rs @@ -40,7 +40,7 @@ pub fn process_message( let payload_bytes = payload.payload().context("Expected payload")?; let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?; let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0); - let mut span: EAPSpan = msg.try_into()?; + let mut span: EAPSpan = msg.into(); span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config)); @@ -354,7 +354,7 @@ mod tests { #[test] fn test_serialization() { let msg: FromSpanMessage = serde_json::from_slice(SPAN_KAFKA_MESSAGE.as_bytes()).unwrap(); - let span: EAPSpan = msg.try_into().unwrap(); + let span: EAPSpan = msg.into(); insta::with_settings!({sort_maps => true}, { insta::assert_json_snapshot!(span) }); diff --git a/setup.py b/setup.py index 4da517feb80..3bfc5e1289a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup -VERSION = "24.12.0.dev0" +VERSION = "25.1.0.dev0" def get_requirements() -> Sequence[str]: diff --git a/snuba/admin/clickhouse/system_queries.py b/snuba/admin/clickhouse/system_queries.py index b46c5d40afa..72f27461106 100644 --- a/snuba/admin/clickhouse/system_queries.py +++ b/snuba/admin/clickhouse/system_queries.py @@ -93,7 +93,7 @@ def _run_sql_query_on_host( (SYSTEM) \s (?!SHUTDOWN\b)(?!KILL\b) - [\w\s]+ + [\w\s'\-_]+ ;? # Optional semicolon $ """, @@ -104,7 +104,7 @@ def _run_sql_query_on_host( r"""^ (OPTIMIZE\sTABLE) \s - [\w\s]+ + [\w\s_\-']+ ;? # Optional semicolon $ """, diff --git a/snuba/lw_deletions/strategy.py b/snuba/lw_deletions/strategy.py index c8d445a7de6..e1a1170447d 100644 --- a/snuba/lw_deletions/strategy.py +++ b/snuba/lw_deletions/strategy.py @@ -18,9 +18,11 @@ from snuba.datasets.storage import WritableTableStorage from snuba.lw_deletions.batching import BatchStepCustom, ValuesBatch from snuba.lw_deletions.formatters import Formatter +from snuba.query.allocation_policies import AllocationPolicyViolations from snuba.query.query_settings import HTTPQuerySettings from snuba.state import get_int_config from snuba.utils.metrics import MetricsBackend +from snuba.web import QueryException from snuba.web.bulk_delete_query import construct_or_conditions, construct_query from snuba.web.delete_query import ( ConditionsType, @@ -46,6 +48,7 @@ def __init__( ) -> None: self.__next_step = next_step self.__storage = storage + self.__storage_name = storage.get_storage_key().value self.__cluster_name = self.__storage.get_cluster().get_clickhouse_cluster_name() self.__tables = storage.get_deletion_settings().tables self.__formatter: Formatter = formatter @@ -62,18 +65,25 @@ def submit(self, message: Message[ValuesBatch[KafkaPayload]]) -> None: try: self._execute_delete(conditions) - except TooManyOngoingMutationsError: + except TooManyOngoingMutationsError as err: # backpressure is applied while we wait for the # currently ongoing mutations to finish self.__metrics.increment("too_many_ongoing_mutations") + logger.warning(str(err), exc_info=True) raise MessageRejected + except QueryException as err: + cause = err.__cause__ + if isinstance(cause, AllocationPolicyViolations): + self.__metrics.increment("allocation_policy_violation") + raise MessageRejected self.__next_step.submit(message) def _get_attribute_info(self) -> AttributionInfo: return AttributionInfo( app_id=AppID("lw-deletes"), - tenant_ids={}, + # concurrent allocation policies requires project or org id + tenant_ids={"project_id": 1}, referrer="lw-deletes", team=None, feature=None, diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index e7a78d41d08..6ddd3258f79 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -134,9 +134,9 @@ def run_job(job_spec: JobSpec) -> JobStatus: if not job_spec.is_async: current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED) job_logger.info("[runner] job execution finished") - except BaseException: + except BaseException as e: current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED) - job_logger.error("[runner] job execution failed") + job_logger.error(f"[runner] job execution failed {e}") job_logger.info(f"[runner] exception {traceback.format_exc()}") finally: _release_job_lock(job_spec.job_id) diff --git a/snuba/manual_jobs/scrub_ips_from_eap_spans.py b/snuba/manual_jobs/scrub_ips_from_eap_spans.py new file mode 100644 index 00000000000..9fd415f97d6 --- /dev/null +++ b/snuba/manual_jobs/scrub_ips_from_eap_spans.py @@ -0,0 +1,49 @@ +from datetime import datetime +from typing import Any, Mapping, Optional + +from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.manual_jobs import Job, JobLogger, JobSpec + + +class ScrubIpFromEAPSpans(Job): + def __init__(self, job_spec: JobSpec) -> None: + self.__validate_job_params(job_spec.params) + super().__init__(job_spec) + + def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None: + assert params + assert isinstance(params["organization_ids"], list) + assert all([isinstance(p, int) for p in params["organization_ids"]]) + self._organization_ids = params["organization_ids"] + self._start_datetime = datetime.fromisoformat(params["start_datetime"]) + self._end_datetime = datetime.fromisoformat(params["end_datetime"]) + + def _get_query(self, cluster_name: str | None) -> str: + organization_ids = ",".join([str(p) for p in self._organization_ids]) + start_datetime = self._start_datetime.isoformat() + end_datetime = self._end_datetime.isoformat() + on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else "" + return f"""ALTER TABLE eap_spans_2_local +{on_cluster} +UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)), `attr_str_1`) +WHERE organization_id IN [{organization_ids}] +AND _sort_timestamp >= toDateTime('{start_datetime}') +AND _sort_timestamp < toDateTime('{end_datetime}')""" + + def execute(self, logger: JobLogger) -> None: + cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) + storage_node = cluster.get_local_nodes()[0] + connection = cluster.get_node_connection( + ClickhouseClientSettings.CLEANUP, storage_node + ) + if not cluster.is_single_node(): + cluster_name = cluster.get_clickhouse_cluster_name() + else: + cluster_name = None + query = self._get_query(cluster_name) + logger.info("Executing query: {query}") + result = connection.execute(query=query, settings={"mutations_sync": 0}) + + logger.info("complete") + logger.info(repr(result)) diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 66ef0da6704..0c78a6eca34 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -20,6 +20,7 @@ from snuba.query.exceptions import InvalidQueryException, NoRowsToDeleteException from snuba.query.expressions import Expression from snuba.reader import Result +from snuba.state import get_str_config from snuba.utils.metrics.util import with_span from snuba.utils.metrics.wrapper import MetricsWrapper from snuba.utils.schemas import ColumnValidator, InvalidColumnType @@ -208,9 +209,14 @@ def delete_from_tables( if highest_rows_to_delete == 0: return result + storage_name = storage.get_storage_key().value + project_id = attribution_info.tenant_ids.get("project_id") + if project_id and should_use_killswitch(storage_name, str(project_id)): + return result + delete_query: DeleteQueryMessage = { "rows_to_delete": highest_rows_to_delete, - "storage_name": storage.get_storage_key().value, + "storage_name": storage_name, "conditions": conditions, "tenant_ids": attribution_info.tenant_ids, } @@ -224,3 +230,10 @@ def construct_or_conditions(conditions: Sequence[ConditionsType]) -> Expression: into OR conditions for a bulk delete """ return combine_or_conditions([_construct_condition(cond) for cond in conditions]) + + +def should_use_killswitch(storage_name: str, project_id: str) -> bool: + killswitch_config = get_str_config( + f"lw_deletes_killswitch_{storage_name}", default="" + ) + return project_id in killswitch_config if killswitch_config else False diff --git a/snuba/web/db_query.py b/snuba/web/db_query.py index 4e38d51f532..45a28af5899 100644 --- a/snuba/web/db_query.py +++ b/snuba/web/db_query.py @@ -302,7 +302,6 @@ def execute_query_with_readthrough_caching( clickhouse_query_settings["query_id"] = query_id if span: span.set_data("query_id", query_id) - return execute_query( clickhouse_query, query_settings, diff --git a/snuba/web/rpc/common/aggregation.py b/snuba/web/rpc/common/aggregation.py index 38c77d4bf5d..5d248d51cda 100644 --- a/snuba/web/rpc/common/aggregation.py +++ b/snuba/web/rpc/common/aggregation.py @@ -40,13 +40,16 @@ class ExtrapolationMeta: avg_sampling_rate: float @staticmethod - def from_row(row_data: Dict[str, Any], column_label: str) -> "ExtrapolationMeta": + def from_row( + row_data: Dict[str, Any], column_label: str + ) -> "ExtrapolationMeta | None": """ Computes the reliability and average sample rate for a column based on the extrapolation columns. + If the sample count is 0, we return None as we don't have any data to extrapolate. """ confidence_interval = None average_sample_rate = 0 - sample_count = None + sample_count = 0 is_percentile = False percentile = 0.0 granularity = 0.0 @@ -87,7 +90,10 @@ def from_row(row_data: Dict[str, Any], column_label: str) -> "ExtrapolationMeta" sample_count = col_value reliability = Reliability.RELIABILITY_UNSPECIFIED - if confidence_interval is not None and sample_count is not None: + if confidence_interval is not None: + if sample_count == 0: + return None + estimate = row_data[column_label] # relative confidence represents the ratio of the confidence interval to the estimate (by default it is the upper bound) if is_percentile: @@ -233,9 +239,12 @@ def get_average_sample_rate_column(aggregation: AttributeAggregation) -> Express referenced_column=aggregation.label, metadata={}, ).to_alias() - return f.avgIf( - f.divide(literal(1), sampling_weight_column), - get_field_existence_expression(aggregation), + return f.divide( + f.sumIf(sign_column, get_field_existence_expression(aggregation)), + f.sumIf( + f.multiply(sign_column, sampling_weight_column), + get_field_existence_expression(aggregation), + ), alias=alias, ) @@ -249,8 +258,11 @@ def _get_count_column_alias(aggregation: AttributeAggregation) -> str: def get_count_column(aggregation: AttributeAggregation) -> Expression: - field = attribute_key_to_expression(aggregation.key) - return f.count(field, alias=_get_count_column_alias(aggregation)) + return f.sumIf( + sign_column, + get_field_existence_expression(aggregation), + alias=_get_count_column_alias(aggregation), + ) def _get_possible_percentiles( diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py index 1b87017146c..d263852fe28 100644 --- a/snuba/web/rpc/v1/endpoint_time_series.py +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -173,14 +173,17 @@ def _convert_result_timeseries( extrapolation_meta = ExtrapolationMeta.from_row( row_data, timeseries.label ) - timeseries.data_points.append( - DataPoint( - data=row_data[timeseries.label], - data_present=True, - avg_sampling_rate=extrapolation_meta.avg_sampling_rate, - reliability=extrapolation_meta.reliability, + if extrapolation_meta is not None: + timeseries.data_points.append( + DataPoint( + data=row_data[timeseries.label], + data_present=True, + avg_sampling_rate=extrapolation_meta.avg_sampling_rate, + reliability=extrapolation_meta.reliability, + ) ) - ) + else: + timeseries.data_points.append(DataPoint(data=0, data_present=False)) return result_timeseries.values() diff --git a/snuba/web/rpc/v1/endpoint_trace_item_table.py b/snuba/web/rpc/v1/endpoint_trace_item_table.py index bcc3578257c..7bece3f811c 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_table.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_table.py @@ -212,7 +212,8 @@ def _convert_results( res[column_name].attribute_name = column_name extrapolation_meta = ExtrapolationMeta.from_row(row, column_name) if ( - extrapolation_meta.reliability + extrapolation_meta is not None + and extrapolation_meta.reliability != Reliability.RELIABILITY_UNSPECIFIED ): res[column_name].reliabilities.append( diff --git a/tests/admin/test_system_queries.py b/tests/admin/test_system_queries.py index b346c44708c..231d6029057 100644 --- a/tests/admin/test_system_queries.py +++ b/tests/admin/test_system_queries.py @@ -110,6 +110,7 @@ def test_invalid_system_query(sql_query: str) -> None: ("SYSSSSSSSTEM DO SOMETHING", False), ("SYSTEM STOP MERGES", True), ("SYSTEM STOP TTL MERGES", True), + ("SYSTEM STOP TTL MERGES ON CLUSTER 'snuba-spans'", True), ("KILL MUTATION WHERE mutation_id='0000000000'", True), ("system STOP MerGes", True), ("system SHUTDOWN", False), diff --git a/tests/manual_jobs/test_scrub_ips_from_eap_spans.py b/tests/manual_jobs/test_scrub_ips_from_eap_spans.py new file mode 100644 index 00000000000..a3914b89614 --- /dev/null +++ b/tests/manual_jobs/test_scrub_ips_from_eap_spans.py @@ -0,0 +1,298 @@ +import random +import uuid +from datetime import datetime, timedelta +from typing import Any, Mapping + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + Column, + TraceItemColumnValues, + TraceItemTableRequest, + TraceItemTableResponse, +) +from sentry_protos.snuba.v1.request_common_pb2 import ( + PageToken, + RequestMeta, + ResponseMeta, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.job_status import JobStatus +from snuba.manual_jobs.runner import get_job_status, run_job +from snuba.manual_jobs.scrub_ips_from_eap_spans import ScrubIpFromEAPSpans +from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable +from tests.helpers import write_raw_unprocessed_events + +_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" +_USER_IP = "192.168.0.45" + + +@pytest.mark.redis_db +@pytest.mark.clickhouse_db +def test_basic() -> None: + job_id = "abc" + run_job( + JobSpec( + job_id, + "ScrubIpFromEAPSpans", + False, + { + "organization_ids": [1, 3, 5, 6], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:00", + }, + ) + ) + + assert get_job_status(job_id) == JobStatus.FINISHED + + +@pytest.mark.parametrize( + ("jobspec"), + [ + JobSpec( + "abc", + "ScrubIpFromEAPSpans", + False, + { + "organization_ids": [1, "b"], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:00", + }, + ), + JobSpec( + "abc", + "ScrubIpFromEAPSpans", + False, + { + "organization_ids": [1, 2], + "start_datetime": "2024-12-01 00:00:0", + "end_datetime": "2024-12-10 00:00:00", + }, + ), + JobSpec( + "abc", + "ScrubIpFromEAPSpans", + False, + { + "organization_ids": [1, 2], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:0", + }, + ), + ], +) +@pytest.mark.redis_db +def test_fail_validation(jobspec: JobSpec) -> None: + with pytest.raises(Exception): + run_job(jobspec) + + +@pytest.mark.redis_db +def test_generate_query() -> None: + job = ScrubIpFromEAPSpans( + JobSpec( + "bassa", + "ScrubIpFromEAPSpans", + False, + { + "organization_ids": [1, 3, 5, 6], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:00", + }, + ) + ) + assert ( + job._get_query(None) + == """ALTER TABLE eap_spans_2_local + +UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)), `attr_str_1`) +WHERE organization_id IN [1,3,5,6] +AND _sort_timestamp >= toDateTime('2024-12-01T00:00:00') +AND _sort_timestamp < toDateTime('2024-12-10T00:00:00')""" + ) + + +def _gen_message( + dt: datetime, + organization_id: int, + measurements: dict[str, dict[str, float]] | None = None, + tags: dict[str, str] | None = None, +) -> Mapping[str, Any]: + measurements = measurements or {} + tags = tags or {} + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": { + "sentry.environment": "development", + "sentry.release": _RELEASE_TAG, + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0", + "my.float.field": 101.2, + "my.int.field": 2000, + "my.neg.field": -100, + "my.neg.float.field": -101.2, + "my.true.bool.field": True, + "my.false.bool.field": False, + }, + "measurements": { + "num_of_spans": {"value": 50.0}, + "eap.measurement": {"value": random.choice([1, 100, 1000])}, + **measurements, + }, + "organization_id": organization_id, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "release": _RELEASE_TAG, + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:127.0.0.1", + }, + "span_id": "123456781234567D", + "tags": { + "http.status_code": "200", + "relay_endpoint_version": "3", + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "relay_no_cache": "False", + "relay_protocol_version": "3", + "relay_use_post_or_schedule": "True", + "relay_use_post_or_schedule_rejected": "version", + "user.ip": _USER_IP, + "spans_over_limit": "False", + "server_name": "blah", + "color": random.choice(["red", "green", "blue"]), + "location": random.choice(["mobile", "frontend", "backend"]), + **tags, + }, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_precise": dt.timestamp(), + "end_timestamp_precise": dt.timestamp() + 1, + } + + +def _generate_request( + ts: Any, hour_ago: int, organization_id: int, project_ids: list[int] +) -> TraceItemTableRequest: + # project_ids is added as an argument to avoid this query getting cached + return TraceItemTableRequest( + meta=RequestMeta( + project_ids=project_ids, + organization_id=organization_id, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color") + ) + ), + columns=[ + Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user.ip")) + ], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user.ip") + ) + ) + ], + ) + + +def _generate_expected_response(ip: str) -> TraceItemTableResponse: + return TraceItemTableResponse( + column_values=[ + TraceItemColumnValues( + attribute_name="user.ip", + results=[AttributeValue(val_str=ip) for _ in range(20)], + ) + ], + page_token=PageToken(offset=20), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_span_is_scrubbed() -> None: + BASE_TIME = datetime.utcnow().replace( + minute=0, second=0, microsecond=0 + ) - timedelta(minutes=180) + organization_ids = [0, 1] + spans_storage = get_storage(StorageKey("eap_spans")) + messages = [ + _gen_message(BASE_TIME - timedelta(minutes=i), organization_id) + for organization_id in organization_ids + for i in range(20) + ] + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + # we inserted spans for organizations 0, 1, 2, and we make sure they look as expected + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + for organization_id in organization_ids: + + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_id, [1, 2, 3]) + ) + assert response == _generate_expected_response(_USER_IP) + + # next we scrub organizations 0 + start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds) + end_datetime = datetime.utcfromtimestamp(ts.seconds) + + run_job( + JobSpec( + "plswork", + "ScrubIpFromEAPSpans", + False, + { + "organization_ids": [organization_ids[0]], + "start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"), + "end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"), + }, + ) + ) + + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_ids[0], [3, 2, 1]) + ) + assert response == _generate_expected_response("scrubbed") + + # then we make sure organization 1 is NOT SCRUBBED + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_ids[1], [3, 2, 1]) + ) + assert response == _generate_expected_response(_USER_IP) diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index e582dd0edc7..e6ba7e3c420 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -147,6 +147,7 @@ def test_get_extrapolation_meta( reliability: Reliability.ValueType, ) -> None: extrapolation_meta = ExtrapolationMeta.from_row(row_data, column_name) + assert extrapolation_meta is not None assert extrapolation_meta.avg_sampling_rate == average_sample_rate assert extrapolation_meta.reliability == reliability diff --git a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py index d87ac8a8e4d..a12ff4c2265 100644 --- a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py +++ b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series_extrapolation.py @@ -301,6 +301,58 @@ def test_confidence_interval_zero_estimate(self) -> None: ) ] + def test_confidence_interval_no_samples(self) -> None: + # store a a test metric with a value of 1, every second for an hour + granularity_secs = 120 + query_duration = 3600 + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 0)], + measurements=[ + DummyMeasurement("client_sample_rate", get_value=lambda s: 1) + ], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_P50, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="test_metric2" + ), # non-existent metric + label="p50(test_metric2)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ), + ], + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + TimeSeries( + label="p50(test_metric2)", + buckets=expected_buckets, + data_points=[ + DataPoint(data_present=False) for _ in range(len(expected_buckets)) + ], + ) + ] + def test_count_unreliable(self) -> None: # store a a test metric with a value of 1, every second for an hour granularity_secs = 120 @@ -538,3 +590,64 @@ def test_percentile_unreliable(self) -> None: ], ), ] + + def test_average_sampling_rate(self) -> None: + granularity_secs = 120 + query_duration = 3600 + store_timeseries( + BASE_TIME, + 60, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 1)], + measurements=[ + DummyMeasurement( + # for each time bucket we store an event with 1% sampling rate and 100% sampling rate + "client_sample_rate", + get_value=lambda s: 0.01 if (s / 60) % 2 == 0 else 1, + ) + ], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="count(test_metric)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, + ), + ], + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + TimeSeries( + label="count(test_metric)", + buckets=expected_buckets, + data_points=[ + DataPoint( + data=1 / 0.01 + + 1, # 2 events (1 with 1% sampling rate and 1 with 100% sampling rate) + data_present=True, + reliability=Reliability.RELIABILITY_LOW, + avg_sampling_rate=2 + / 101, # weighted average = (1 + 1)/(1/0.01 + 1) = 2/101 + ) + for _ in range(len(expected_buckets)) + ], + ), + ] diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index 450418a4bae..5bc9f68b740 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -108,6 +108,19 @@ def test_deletes_not_enabled_runtime_config() -> None: delete_from_storage(storage, conditions, attr_info) +@pytest.mark.redis_db +@patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10) +@patch("snuba.web.bulk_delete_query.produce_delete_query") +def test_deletes_killswitch(mock_produce_query: Mock, mock_enforce_rows: Mock) -> None: + storage = get_writable_storage(StorageKey("search_issues")) + conditions = {"project_id": [1], "group_id": [1, 2, 3, 4]} + attr_info = get_attribution_info() + + set_config("lw_deletes_killswitch_search_issues", "[1]") + delete_from_storage(storage, conditions, attr_info) + mock_produce_query.assert_not_called() + + @pytest.mark.redis_db def test_delete_invalid_column_type() -> None: storage = get_writable_storage(StorageKey("search_issues"))