Skip to content

Commit

Permalink
Merge branch 'master' into jferg/uptime-monitors
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshFerge committed Dec 18, 2024
2 parents 8721607 + c556160 commit 41d6d96
Show file tree
Hide file tree
Showing 25 changed files with 577 additions and 54 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
# Changelog

## 24.12.0

### Various fixes & improvements

- feat(inc-984): store project ids list in dictionary in scrub job (#6675) by @volokluev
- ref(lw-deletes): enforce ratelimiter (#6644) by @MeredithAnya
- fix(admin): Allow KILL MUTATION commands in sudo mode (#6672) by @evanh
- fix(inc984): align start/end timestamp to partition boundaries (#6670) by @volokluev
- chore(deps): bump relay from 0.9.2 to 0.9.4 (#6660) by @jjbayer
- feat(inc984): make mutation condition simpler (#6669) by @volokluev
- chore: Bump Arroyo to 2.19.5 (#6666) by @ayirr7
- ref: bump sentry-arroyo to 2.19.4 (#6663) by @getsentry-bot
- fix(eap-alerts): Fix subscriptions referrer for eap alerts (#6662) by @shruthilayaj
- chore(api): Do not log healthcheck error if downfile exists (#6635) by @untitaker
- feat(eap): add additional validation for group by (#6659) by @davidtsuk
- feat(eap): add default value to virtual column (#6657) by @davidtsuk
- ref: bump sentry-arroyo to 2.19.3 (#6656) by @getsentry-bot
- Implement filter offset for attribute keys API (#6618) by @xurui-c
- feat: make sentry RPC instrumentation more specific to the endpoint (#6654) by @kylemumma
- fix(consumers): Respect 60 day retention days period (#6631) by @volokluev
- feat: add missing example in admin rpc tool (#6647) by @kylemumma
- hotfix(inc-984): Add manual job to scrub IPs from spans (#6649) by @volokluev
- feat: support 15 minute granularity on eap time series RPC (#6645) by @kylemumma
- fix(eap): Fix divide by 0 bug (#6653) by @davidtsuk
- fix: run sentry tests when RPC changes (#6652) by @colin-sentry
- meta: Bump new development version (60ff5441)
- chore(eap-spans): Take advantage of parallel reads (#6579) by @phacops

## 24.11.2

### Various fixes & improvements
Expand Down
2 changes: 2 additions & 0 deletions devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ services:
host.docker.internal: host-gateway
networks:
- devservices
labels:
- orchestrator=devservices
restart: unless-stopped

snuba:
Expand Down
6 changes: 2 additions & 4 deletions docs/source/clickhouse/supported_versions.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
=============================
ClickHouse supported versions
=============================
The following versions of Clickhouse have been tested and are known to work
The following version(s) of Clickhouse have been tested and are known to work
with Snuba:

- 20.3
- 20.7
- 21.8
- 23.8.11.29 (Altinity Stable Build)

Any version of Clikhouse used outside of this list could potentially work,
but is not guaranteed to work. Some functionality might be broken. Use a
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
copyright = "2021, Sentry Team and Contributors"
author = "Sentry Team and Contributors"

release = "24.12.0.dev0"
release = "25.1.0.dev0"


# -- General configuration ---------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ click==8.1.7
clickhouse-driver==0.2.6
confluent-kafka==2.3.0
datadog==0.21.0
devservices==1.0.5
devservices==1.0.6
flake8==7.0.0
Flask==2.2.5
google-cloud-storage==2.18.0
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.74.1"
channel = "1.83.0"
9 changes: 0 additions & 9 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,11 @@ pub fn consumer_impl(
let processor = if mutations_mode {
let mut_factory = MutConsumerStrategyFactory {
storage_config: first_storage,
env_config,
logical_topic_name,
max_batch_size,
max_batch_time,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
async_inserts,
python_max_queue_depth,
use_rust_processor,
health_check_file: health_check_file.map(ToOwned::to_owned),
enforce_schema,
physical_consumer_group: consumer_group.to_owned(),
physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name),
accountant_topic_config: consumer_config.accountant_topic,
batch_write_timeout,
};

Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/src/metrics/global_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
}
}

impl<'a, M> Middleware for AddGlobalTags<'a, M>
impl<M> Middleware for AddGlobalTags<'_, M>
where
M: Middleware,
{
Expand Down
11 changes: 1 addition & 10 deletions rust_snuba/src/mutations/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sentry_arroyo::processing::strategies::run_task_in_threads::{
};
use sentry_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory};
use sentry_arroyo::types::Message;
use sentry_arroyo::types::{Partition, Topic};
use sentry_arroyo::types::Partition;

use crate::config;
use crate::metrics::global_tags::set_global_tag;
Expand All @@ -25,20 +25,11 @@ use crate::mutations::synchronize::Synchronizer;

pub struct MutConsumerStrategyFactory {
pub storage_config: config::StorageConfig,
pub env_config: config::EnvConfig,
pub logical_topic_name: String,
pub max_batch_size: usize,
pub max_batch_time: Duration,
pub processing_concurrency: ConcurrencyConfig,
pub clickhouse_concurrency: ConcurrencyConfig,
pub async_inserts: bool,
pub python_max_queue_depth: Option<usize>,
pub use_rust_processor: bool,
pub health_check_file: Option<String>,
pub enforce_schema: bool,
pub physical_consumer_group: String,
pub physical_topic_name: Topic,
pub accountant_topic_config: config::TopicConfig,
pub batch_write_timeout: Option<Duration>,
}

Expand Down
4 changes: 2 additions & 2 deletions rust_snuba/src/processors/eap_spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn process_message(
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?;
let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0);
let mut span: EAPSpan = msg.try_into()?;
let mut span: EAPSpan = msg.into();

span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config));

Expand Down Expand Up @@ -354,7 +354,7 @@ mod tests {
#[test]
fn test_serialization() {
let msg: FromSpanMessage = serde_json::from_slice(SPAN_KAFKA_MESSAGE.as_bytes()).unwrap();
let span: EAPSpan = msg.try_into().unwrap();
let span: EAPSpan = msg.into();
insta::with_settings!({sort_maps => true}, {
insta::assert_json_snapshot!(span)
});
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from setuptools import find_packages, setup

VERSION = "24.12.0.dev0"
VERSION = "25.1.0.dev0"


def get_requirements() -> Sequence[str]:
Expand Down
4 changes: 2 additions & 2 deletions snuba/admin/clickhouse/system_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _run_sql_query_on_host(
(SYSTEM)
\s
(?!SHUTDOWN\b)(?!KILL\b)
[\w\s]+
[\w\s'\-_]+
;? # Optional semicolon
$
""",
Expand All @@ -104,7 +104,7 @@ def _run_sql_query_on_host(
r"""^
(OPTIMIZE\sTABLE)
\s
[\w\s]+
[\w\s_\-']+
;? # Optional semicolon
$
""",
Expand Down
14 changes: 12 additions & 2 deletions snuba/lw_deletions/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from snuba.datasets.storage import WritableTableStorage
from snuba.lw_deletions.batching import BatchStepCustom, ValuesBatch
from snuba.lw_deletions.formatters import Formatter
from snuba.query.allocation_policies import AllocationPolicyViolations
from snuba.query.query_settings import HTTPQuerySettings
from snuba.state import get_int_config
from snuba.utils.metrics import MetricsBackend
from snuba.web import QueryException
from snuba.web.bulk_delete_query import construct_or_conditions, construct_query
from snuba.web.delete_query import (
ConditionsType,
Expand All @@ -46,6 +48,7 @@ def __init__(
) -> None:
self.__next_step = next_step
self.__storage = storage
self.__storage_name = storage.get_storage_key().value
self.__cluster_name = self.__storage.get_cluster().get_clickhouse_cluster_name()
self.__tables = storage.get_deletion_settings().tables
self.__formatter: Formatter = formatter
Expand All @@ -62,18 +65,25 @@ def submit(self, message: Message[ValuesBatch[KafkaPayload]]) -> None:

try:
self._execute_delete(conditions)
except TooManyOngoingMutationsError:
except TooManyOngoingMutationsError as err:
# backpressure is applied while we wait for the
# currently ongoing mutations to finish
self.__metrics.increment("too_many_ongoing_mutations")
logger.warning(str(err), exc_info=True)
raise MessageRejected
except QueryException as err:
cause = err.__cause__
if isinstance(cause, AllocationPolicyViolations):
self.__metrics.increment("allocation_policy_violation")
raise MessageRejected

self.__next_step.submit(message)

def _get_attribute_info(self) -> AttributionInfo:
return AttributionInfo(
app_id=AppID("lw-deletes"),
tenant_ids={},
# concurrent allocation policies requires project or org id
tenant_ids={"project_id": 1},
referrer="lw-deletes",
team=None,
feature=None,
Expand Down
4 changes: 2 additions & 2 deletions snuba/manual_jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def run_job(job_spec: JobSpec) -> JobStatus:
if not job_spec.is_async:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED)
job_logger.info("[runner] job execution finished")
except BaseException:
except BaseException as e:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED)
job_logger.error("[runner] job execution failed")
job_logger.error(f"[runner] job execution failed {e}")
job_logger.info(f"[runner] exception {traceback.format_exc()}")
finally:
_release_job_lock(job_spec.job_id)
Expand Down
49 changes: 49 additions & 0 deletions snuba/manual_jobs/scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datetime import datetime
from typing import Any, Mapping, Optional

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec


class ScrubIpFromEAPSpans(Job):
def __init__(self, job_spec: JobSpec) -> None:
self.__validate_job_params(job_spec.params)
super().__init__(job_spec)

def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
assert params
assert isinstance(params["organization_ids"], list)
assert all([isinstance(p, int) for p in params["organization_ids"]])
self._organization_ids = params["organization_ids"]
self._start_datetime = datetime.fromisoformat(params["start_datetime"])
self._end_datetime = datetime.fromisoformat(params["end_datetime"])

def _get_query(self, cluster_name: str | None) -> str:
organization_ids = ",".join([str(p) for p in self._organization_ids])
start_datetime = self._start_datetime.isoformat()
end_datetime = self._end_datetime.isoformat()
on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else ""
return f"""ALTER TABLE eap_spans_2_local
{on_cluster}
UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)), `attr_str_1`)
WHERE organization_id IN [{organization_ids}]
AND _sort_timestamp >= toDateTime('{start_datetime}')
AND _sort_timestamp < toDateTime('{end_datetime}')"""

def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)
storage_node = cluster.get_local_nodes()[0]
connection = cluster.get_node_connection(
ClickhouseClientSettings.CLEANUP, storage_node
)
if not cluster.is_single_node():
cluster_name = cluster.get_clickhouse_cluster_name()
else:
cluster_name = None
query = self._get_query(cluster_name)
logger.info("Executing query: {query}")
result = connection.execute(query=query, settings={"mutations_sync": 0})

logger.info("complete")
logger.info(repr(result))
15 changes: 14 additions & 1 deletion snuba/web/bulk_delete_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from snuba.query.exceptions import InvalidQueryException, NoRowsToDeleteException
from snuba.query.expressions import Expression
from snuba.reader import Result
from snuba.state import get_str_config
from snuba.utils.metrics.util import with_span
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.schemas import ColumnValidator, InvalidColumnType
Expand Down Expand Up @@ -208,9 +209,14 @@ def delete_from_tables(
if highest_rows_to_delete == 0:
return result

storage_name = storage.get_storage_key().value
project_id = attribution_info.tenant_ids.get("project_id")
if project_id and should_use_killswitch(storage_name, str(project_id)):
return result

delete_query: DeleteQueryMessage = {
"rows_to_delete": highest_rows_to_delete,
"storage_name": storage.get_storage_key().value,
"storage_name": storage_name,
"conditions": conditions,
"tenant_ids": attribution_info.tenant_ids,
}
Expand All @@ -224,3 +230,10 @@ def construct_or_conditions(conditions: Sequence[ConditionsType]) -> Expression:
into OR conditions for a bulk delete
"""
return combine_or_conditions([_construct_condition(cond) for cond in conditions])


def should_use_killswitch(storage_name: str, project_id: str) -> bool:
killswitch_config = get_str_config(
f"lw_deletes_killswitch_{storage_name}", default=""
)
return project_id in killswitch_config if killswitch_config else False
1 change: 0 additions & 1 deletion snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ def execute_query_with_readthrough_caching(
clickhouse_query_settings["query_id"] = query_id
if span:
span.set_data("query_id", query_id)

return execute_query(
clickhouse_query,
query_settings,
Expand Down
28 changes: 20 additions & 8 deletions snuba/web/rpc/common/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ class ExtrapolationMeta:
avg_sampling_rate: float

@staticmethod
def from_row(row_data: Dict[str, Any], column_label: str) -> "ExtrapolationMeta":
def from_row(
row_data: Dict[str, Any], column_label: str
) -> "ExtrapolationMeta | None":
"""
Computes the reliability and average sample rate for a column based on the extrapolation columns.
If the sample count is 0, we return None as we don't have any data to extrapolate.
"""
confidence_interval = None
average_sample_rate = 0
sample_count = None
sample_count = 0
is_percentile = False
percentile = 0.0
granularity = 0.0
Expand Down Expand Up @@ -87,7 +90,10 @@ def from_row(row_data: Dict[str, Any], column_label: str) -> "ExtrapolationMeta"
sample_count = col_value

reliability = Reliability.RELIABILITY_UNSPECIFIED
if confidence_interval is not None and sample_count is not None:
if confidence_interval is not None:
if sample_count == 0:
return None

estimate = row_data[column_label]
# relative confidence represents the ratio of the confidence interval to the estimate (by default it is the upper bound)
if is_percentile:
Expand Down Expand Up @@ -233,9 +239,12 @@ def get_average_sample_rate_column(aggregation: AttributeAggregation) -> Express
referenced_column=aggregation.label,
metadata={},
).to_alias()
return f.avgIf(
f.divide(literal(1), sampling_weight_column),
get_field_existence_expression(aggregation),
return f.divide(
f.sumIf(sign_column, get_field_existence_expression(aggregation)),
f.sumIf(
f.multiply(sign_column, sampling_weight_column),
get_field_existence_expression(aggregation),
),
alias=alias,
)

Expand All @@ -249,8 +258,11 @@ def _get_count_column_alias(aggregation: AttributeAggregation) -> str:


def get_count_column(aggregation: AttributeAggregation) -> Expression:
field = attribute_key_to_expression(aggregation.key)
return f.count(field, alias=_get_count_column_alias(aggregation))
return f.sumIf(
sign_column,
get_field_existence_expression(aggregation),
alias=_get_count_column_alias(aggregation),
)


def _get_possible_percentiles(
Expand Down
Loading

0 comments on commit 41d6d96

Please sign in to comment.