diff --git a/Makefile b/Makefile index 6aba29ca9b6..3cd880fc14f 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ api-tests: SNUBA_SETTINGS=test pytest -vv tests/*_api.py backend-typing: - mypy snuba tests scripts --strict --config-file mypy.ini --exclude 'tests/datasets|tests/query|tests/test_split.py' + mypy snuba tests scripts --strict --config-file mypy.ini --exclude 'tests/datasets|tests/query' install-python-dependencies: pip uninstall -qqy uwsgi # pip doesn't do well with swapping drop-ins diff --git a/docs/source/architecture/queryprocessing.rst b/docs/source/architecture/queryprocessing.rst index 42f3bd44ffe..8caca1c69fd 100644 --- a/docs/source/architecture/queryprocessing.rst +++ b/docs/source/architecture/queryprocessing.rst @@ -122,24 +122,6 @@ finds equality conditions on tags and replace them with the equivalent condition on a tags hashmap (where we have a bloom filter index) making the filtering operation faster. -Query Splitter --------------- - -Some queries can be executed in an optimized way by splitting them into multiple -individual Clickhouse queries and by assembling the results of each one of them. - -Two examples are time splitting and column splitting. Both hare `in this file `_. - -Time splitting splits a query (that does not contain aggregations and is properly -sorted) into multiple ones over a variable time range that increases in size -progressively and executes them in sequence stopping as soon we have enough -results. - -Column splitting splits filtering and column fetching. It executes the filtering -part of the query on a minimal number of columns so Clickhouse loads fewer columns, -then, through a second query, fetches the missing columns only for the rows -filtered by the first query. - Query Formatter --------------- diff --git a/snuba/datasets/configuration/discover/storages/discover.yaml b/snuba/datasets/configuration/discover/storages/discover.yaml index 62428647965..ee42a5c869d 100644 --- a/snuba/datasets/configuration/discover/storages/discover.yaml +++ b/snuba/datasets/configuration/discover/storages/discover.yaml @@ -163,12 +163,3 @@ allocation_policies: default_config_overrides: is_enforced: 0 is_active: 0 -query_splitters: - - splitter: ColumnSplitQueryStrategy - args: - id_column: event_id - project_column: project_id - timestamp_column: timestamp - - splitter: TimeSplitQueryStrategy - args: - timestamp_col: timestamp diff --git a/snuba/datasets/configuration/events/storages/errors.yaml b/snuba/datasets/configuration/events/storages/errors.yaml index f827e1f9580..9838ece2b9a 100644 --- a/snuba/datasets/configuration/events/storages/errors.yaml +++ b/snuba/datasets/configuration/events/storages/errors.yaml @@ -323,15 +323,6 @@ query_processors: - environment - project_id - processor: TableRateLimit -query_splitters: - - splitter: ColumnSplitQueryStrategy - args: - id_column: event_id - project_column: project_id - timestamp_column: timestamp - - splitter: TimeSplitQueryStrategy - args: - timestamp_col: timestamp mandatory_condition_checkers: - condition: ProjectIdEnforcer replacer_processor: diff --git a/snuba/datasets/configuration/events/storages/errors_ro.yaml b/snuba/datasets/configuration/events/storages/errors_ro.yaml index 7d4ee6b0842..93d538d6fff 100644 --- a/snuba/datasets/configuration/events/storages/errors_ro.yaml +++ b/snuba/datasets/configuration/events/storages/errors_ro.yaml @@ -322,12 +322,3 @@ query_processors: - environment - project_id - processor: TableRateLimit -query_splitters: - - splitter: ColumnSplitQueryStrategy - args: - id_column: event_id - project_column: project_id - timestamp_column: timestamp - - splitter: TimeSplitQueryStrategy - args: - timestamp_col: timestamp diff --git a/snuba/datasets/configuration/json_schema.py b/snuba/datasets/configuration/json_schema.py index db6d3df2d0c..f9b911bf75e 100644 --- a/snuba/datasets/configuration/json_schema.py +++ b/snuba/datasets/configuration/json_schema.py @@ -87,6 +87,7 @@ def string_with_description(description: str) -> dict[str, str]: "description": "The stream loader for a writing to ClickHouse. This provides what is needed to start a Kafka consumer and fill in the ClickHouse table.", } + ###### # Column specific json schemas def make_column_schema( @@ -332,11 +333,6 @@ def registered_class_array_schema( "QueryProcessor", "Name of ClickhouseQueryProcessor class config key. Responsible for the transformation applied to a query.", ) -STORAGE_QUERY_SPLITTERS_SCHEMA = registered_class_array_schema( - "splitter", - "QuerySplitStrategy", - "Name of QuerySplitStrategy class config key. Responsible for splitting a query into two at runtime and combining the results.", -) STORAGE_MANDATORY_CONDITION_CHECKERS_SCHEMA = registered_class_array_schema( "condition", "ConditionChecker", @@ -542,7 +538,6 @@ def registered_class_array_schema( "readiness_state": READINESS_STATE_SCHEMA, "schema": SCHEMA_SCHEMA, "query_processors": STORAGE_QUERY_PROCESSORS_SCHEMA, - "query_splitters": STORAGE_QUERY_SPLITTERS_SCHEMA, "mandatory_condition_checkers": STORAGE_MANDATORY_CONDITION_CHECKERS_SCHEMA, "allocation_policies": STORAGE_ALLOCATION_POLICIES_SCHEMA, }, @@ -569,7 +564,6 @@ def registered_class_array_schema( "schema": SCHEMA_SCHEMA, "stream_loader": STREAM_LOADER_SCHEMA, "query_processors": STORAGE_QUERY_PROCESSORS_SCHEMA, - "query_splitters": STORAGE_QUERY_SPLITTERS_SCHEMA, "mandatory_condition_checkers": STORAGE_MANDATORY_CONDITION_CHECKERS_SCHEMA, "allocation_policies": STORAGE_ALLOCATION_POLICIES_SCHEMA, "replacer_processor": STORAGE_REPLACER_PROCESSOR_SCHEMA, @@ -607,7 +601,6 @@ def registered_class_array_schema( "postgres_table": TYPE_STRING, "row_processor": CDC_STORAGE_ROW_PROCESSOR_SCHEMA, "query_processors": STORAGE_QUERY_PROCESSORS_SCHEMA, - "query_splitters": STORAGE_QUERY_SPLITTERS_SCHEMA, "mandatory_condition_checkers": STORAGE_MANDATORY_CONDITION_CHECKERS_SCHEMA, "allocation_policies": STORAGE_ALLOCATION_POLICIES_SCHEMA, "replacer_processor": STORAGE_REPLACER_PROCESSOR_SCHEMA, diff --git a/snuba/datasets/configuration/spans/storages/metrics_summaries.yaml b/snuba/datasets/configuration/spans/storages/metrics_summaries.yaml index d46ff952dd4..0007db47124 100644 --- a/snuba/datasets/configuration/spans/storages/metrics_summaries.yaml +++ b/snuba/datasets/configuration/spans/storages/metrics_summaries.yaml @@ -97,11 +97,6 @@ query_processors: - processor: TableRateLimit - processor: TupleUnaliaser -query_splitters: - - splitter: TimeSplitQueryStrategy - args: - timestamp_col: end_timestamp - mandatory_condition_checkers: - condition: ProjectIdEnforcer diff --git a/snuba/datasets/configuration/spans/storages/spans.yaml b/snuba/datasets/configuration/spans/storages/spans.yaml index 66329add95e..6e4cc85301b 100644 --- a/snuba/datasets/configuration/spans/storages/spans.yaml +++ b/snuba/datasets/configuration/spans/storages/spans.yaml @@ -169,11 +169,6 @@ query_processors: - processor: TableRateLimit - processor: TupleUnaliaser -query_splitters: - - splitter: TimeSplitQueryStrategy - args: - timestamp_col: end_timestamp - mandatory_condition_checkers: - condition: ProjectIdEnforcer diff --git a/snuba/datasets/configuration/storage_builder.py b/snuba/datasets/configuration/storage_builder.py index b14358519a9..e07c936e3d5 100644 --- a/snuba/datasets/configuration/storage_builder.py +++ b/snuba/datasets/configuration/storage_builder.py @@ -11,7 +11,6 @@ from snuba.datasets.configuration.utils import ( get_mandatory_condition_checkers, get_query_processors, - get_query_splitters, parse_columns, ) from snuba.datasets.message_filters import StreamMessageFilter @@ -43,7 +42,6 @@ STREAM_LOADER = "stream_loader" PRE_FILTER = "pre_filter" QUERY_PROCESSORS = "query_processors" -QUERY_SPLITTERS = "query_splitters" MANDATORY_CONDITION_CHECKERS = "mandatory_condition_checkers" WRITER_OPTIONS = "writer_options" SUBCRIPTION_SCHEDULER_MODE = "subscription_scheduler_mode" @@ -79,9 +77,6 @@ def __build_readable_storage_kwargs(config: dict[str, Any]) -> dict[str, Any]: QUERY_PROCESSORS: get_query_processors( config[QUERY_PROCESSORS] if QUERY_PROCESSORS in config else [] ), - QUERY_SPLITTERS: get_query_splitters( - config[QUERY_SPLITTERS] if QUERY_SPLITTERS in config else [] - ), MANDATORY_CONDITION_CHECKERS: get_mandatory_condition_checkers( config[MANDATORY_CONDITION_CHECKERS] if MANDATORY_CONDITION_CHECKERS in config diff --git a/snuba/datasets/configuration/transactions/storages/transactions.yaml b/snuba/datasets/configuration/transactions/storages/transactions.yaml index db64f1d9517..ba9c591be52 100644 --- a/snuba/datasets/configuration/transactions/storages/transactions.yaml +++ b/snuba/datasets/configuration/transactions/storages/transactions.yaml @@ -248,11 +248,6 @@ query_processors: - processor: TableRateLimit - processor: TupleUnaliaser -query_splitters: - - splitter: TimeSplitQueryStrategy - args: - timestamp_col: finish_ts - mandatory_condition_checkers: - condition: ProjectIdEnforcer diff --git a/snuba/datasets/configuration/utils.py b/snuba/datasets/configuration/utils.py index f3ff94f4627..725390293cb 100644 --- a/snuba/datasets/configuration/utils.py +++ b/snuba/datasets/configuration/utils.py @@ -13,7 +13,6 @@ String, UInt, ) -from snuba.datasets.plans.splitters import QuerySplitStrategy from snuba.query.processors.condition_checkers import ConditionChecker from snuba.query.processors.physical import ClickhouseQueryProcessor from snuba.utils.schemas import ( @@ -31,11 +30,6 @@ class QueryProcessorDefinition(TypedDict): args: dict[str, Any] -class QuerySplitterDefinition(TypedDict): - splitter: str - args: dict[str, Any] - - class MandatoryConditionCheckerDefinition(TypedDict): condition: str args: dict[str, Any] @@ -52,17 +46,6 @@ def get_query_processors( ] -def get_query_splitters( - query_splitter_objects: list[QuerySplitterDefinition], -) -> list[QuerySplitStrategy]: - return [ - QuerySplitStrategy.get_from_name(qs["splitter"]).from_kwargs( - **qs.get("args", {}) - ) - for qs in query_splitter_objects - ] - - def get_mandatory_condition_checkers( mandatory_condition_checkers_objects: list[MandatoryConditionCheckerDefinition], ) -> list[ConditionChecker]: diff --git a/snuba/datasets/plans/splitters/__init__.py b/snuba/datasets/plans/splitters/__init__.py deleted file mode 100644 index ba2efe38839..00000000000 --- a/snuba/datasets/plans/splitters/__init__.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import annotations - -import os -from abc import ABC, abstractmethod -from typing import Callable, Optional, Type, cast - -from snuba.clickhouse.query import Query -from snuba.query.query_settings import QuerySettings -from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory -from snuba.web import QueryResult - -SplitQueryRunner = Callable[[Query, QuerySettings], QueryResult] - - -class QuerySplitStrategy(ABC, metaclass=RegisteredClass): - """ - Implements a query split algorithm. It works in a similar way as a - QueryExecutionStrategy, it takes a query, request.query_settings and a query runner - and decides if it should split the query into more efficient parts. - If it can split the query, it uses the SplitQueryRunner to execute every chunk, - otherwise it returns None immediately. - - The main difference between this class and the QueryPlanExecutionStrategy is that - it relies on a smarter QueryRunner (SplitQueryRunner) than the one provided to the - execution strategy. The runner this class receives is supposed to take care of - running the DB query processors before executing the query on the database so that - such responsibility is confined to the plan execution strategy. - - A QuerySplitStrategy must not fall back on the runner method to execute the - entire query in case it cannot perform any useful split. - Doing so would prevent following splitters defined by the storage to attempt the split. - """ - - @abstractmethod - def execute( - self, - query: Query, - query_settings: QuerySettings, - runner: SplitQueryRunner, - ) -> Optional[QueryResult]: - """ - Executes and/or splits the query provided, like the equivalent method in - QueryPlanExecutionStrategy. - Since not every split algorithm can work on every query, this method should - return None when the query is not supported by this strategy. - """ - raise NotImplementedError - - @classmethod - def get_from_name(cls, name: str) -> Type["QuerySplitStrategy"]: - return cast(Type["QuerySplitStrategy"], cls.class_from_name(name)) - - @classmethod - def from_kwargs(cls, **kwargs: str) -> QuerySplitStrategy: - return cls(**kwargs) - - @classmethod - def config_key(cls) -> str: - return cls.__name__ - - -import_submodules_in_directory( - os.path.dirname(os.path.realpath(__file__)), "snuba.datasets.plans.splitters" -) diff --git a/snuba/datasets/plans/splitters/strategies.py b/snuba/datasets/plans/splitters/strategies.py deleted file mode 100644 index 5a8f52e36f9..00000000000 --- a/snuba/datasets/plans/splitters/strategies.py +++ /dev/null @@ -1,364 +0,0 @@ -import copy -import logging -import math -from dataclasses import replace -from datetime import timedelta -from typing import Optional - -from snuba import environment, settings, state, util -from snuba.clickhouse.query import Query -from snuba.clickhouse.query_dsl.accessors import get_time_range -from snuba.datasets.plans.splitters import QuerySplitStrategy, SplitQueryRunner -from snuba.query import OrderByDirection, SelectedExpression -from snuba.query.conditions import ( - OPERATOR_TO_FUNCTION, - ConditionFunctions, - combine_and_conditions, - get_first_level_and_conditions, - in_condition, -) -from snuba.query.dsl import literals_tuple -from snuba.query.expressions import Column as ColumnExpr -from snuba.query.expressions import CurriedFunctionCall as CurriedFunctionCallExpr -from snuba.query.expressions import Expression -from snuba.query.expressions import FunctionCall as FunctionCallExpr -from snuba.query.expressions import Literal as LiteralExpr -from snuba.query.matchers import AnyExpression, Column, FunctionCall, Or, Param, String -from snuba.query.query_settings import QuerySettings -from snuba.utils.metrics.wrapper import MetricsWrapper -from snuba.web import QueryResult - -logger = logging.getLogger("snuba.query.split") -metrics = MetricsWrapper(environment.metrics, "query.splitter") - -# Every time we find zero results for a given step, expand the search window by -# this factor. Based on the assumption that the initial window is 2 hours, the -# worst case (there are 0 results in the database) would have us making 4 -# queries before hitting the 90d limit (2+20+200+2000 hours == 92 days). -STEP_GROWTH = 10 - - -def _replace_ast_condition( - query: Query, field: str, operator: str, new_operand: Expression -) -> None: - """ - Replaces a condition in the top level AND boolean condition - in the query WHERE clause. - """ - - def replace_condition(expression: Expression) -> Expression: - match = FunctionCall( - String(OPERATOR_TO_FUNCTION[operator]), - (Param("column", Column(None, String(field))), AnyExpression()), - ).match(expression) - - return ( - expression - if match is None - else replace( - expression, parameters=(match.expression("column"), new_operand) - ) - ) - - condition = query.get_condition() - if condition is not None: - query.set_ast_condition( - combine_and_conditions( - [ - replace_condition(c) - for c in get_first_level_and_conditions(condition) - ] - ) - ) - - -class TimeSplitQueryStrategy(QuerySplitStrategy): - """ - A strategy that breaks the time window into smaller ones and executes - them in sequence. - """ - - def __init__(self, timestamp_col: str) -> None: - self.__timestamp_col = timestamp_col - - def execute( - self, - query: Query, - query_settings: QuerySettings, - runner: SplitQueryRunner, - ) -> Optional[QueryResult]: - """ - If a query is: - - ORDER BY timestamp DESC - - has no grouping - - has an offset/limit - - has a large time range - We know we have to reverse-sort the entire set of rows to return the small - chunk at the end of the time range, so optimistically split the time range - into smaller increments, and start with the last one, so that we can potentially - avoid querying the entire range. - """ - limit = query.get_limit() - if limit is None or query.get_groupby(): - return None - - if query.get_offset() >= 1000: - return None - - orderby = query.get_orderby() - if ( - not orderby - or orderby[0].direction != OrderByDirection.DESC - or not isinstance(orderby[0].expression, ColumnExpr) - or not orderby[0].expression.column_name == self.__timestamp_col - ): - return None - - from_date_ast, to_date_ast = get_time_range(query, self.__timestamp_col) - - if from_date_ast is None or to_date_ast is None: - return None - - date_align, split_step = state.get_configs( - [("date_align_seconds", 1), ("split_step", 3600)] # default 1 hour - ) - assert isinstance(split_step, int) - remaining_offset = query.get_offset() - - overall_result: Optional[QueryResult] = None - split_end = to_date_ast - split_start = max(split_end - timedelta(seconds=split_step), from_date_ast) - total_results = 0 - while split_start < split_end and total_results < limit: - # We need to make a copy to use during the query execution because we replace - # the start-end conditions on the query at each iteration of this loop. - split_query = copy.deepcopy(query) - - _replace_ast_condition( - split_query, self.__timestamp_col, ">=", LiteralExpr(None, split_start) - ) - _replace_ast_condition( - split_query, self.__timestamp_col, "<", LiteralExpr(None, split_end) - ) - - # Because its paged, we have to ask for (limit+offset) results - # and set offset=0 so we can then trim them ourselves. - split_query.set_offset(0) - split_query.set_limit(limit - total_results + remaining_offset) - - # At every iteration we only append the "data" key from the results returned by - # the runner. The "extra" key is only populated at the first iteration of the - # loop and never changed. - result = runner(split_query, query_settings) - - if overall_result is None: - overall_result = result - else: - overall_result.result["data"].extend(result.result["data"]) - - if remaining_offset > 0 and len(overall_result.result["data"]) > 0: - to_trim = min(remaining_offset, len(overall_result.result["data"])) - overall_result.result["data"] = overall_result.result["data"][to_trim:] - remaining_offset -= to_trim - - total_results = len(overall_result.result["data"]) - - if total_results < limit: - if len(result.result["data"]) == 0: - # If we got nothing from the last query, expand the range by a static factor - split_step = split_step * STEP_GROWTH - else: - # If we got some results but not all of them, estimate how big the time - # range should be for the next query based on how many results we got for - # our last query and its time range, and how many we have left to fetch. - remaining = limit - total_results - split_step = split_step * math.ceil( - remaining / float(len(result.result["data"])) - ) - - # Set the start and end of the next query based on the new range. - split_end = split_start - try: - split_start = max( - split_end - timedelta(seconds=split_step), from_date_ast - ) - except OverflowError: - split_start = from_date_ast - - return overall_result - - -class ColumnSplitQueryStrategy(QuerySplitStrategy): - """ - A strategy that performs column based splitting: if the client requests enough columns, - a first query on the minimum set of columns is ran to load as little Clickhouse data - as possible. A second query based on the results of the first is then executed to - build the full result set. - """ - - def __init__( - self, - id_column: str, - project_column: str, - timestamp_column: str, - ) -> None: - self.__id_column = id_column - self.__project_column = project_column - self.__timestamp_column = timestamp_column - - def execute( - self, - query: Query, - query_settings: QuerySettings, - runner: SplitQueryRunner, - ) -> Optional[QueryResult]: - """ - Split query in 2 steps if a large number of columns is being selected. - - First query only selects event_id, project_id and timestamp. - - Second query selects all fields for only those events. - - Shrink the date range. - """ - limit = query.get_limit() - if ( - limit is None - or limit == 0 - or query.get_groupby() - or not query.get_selected_columns() - ): - return None - - if limit > settings.COLUMN_SPLIT_MAX_LIMIT: - metrics.increment("column_splitter.query_above_limit") - return None - - # Do not split if there is already a = or IN condition on an ID column - id_column_matcher = FunctionCall( - Or([String(ConditionFunctions.EQ), String(ConditionFunctions.IN)]), - ( - Column(None, String(self.__id_column)), - AnyExpression(), - ), - ) - - for expr in query.get_condition() or []: - match = id_column_matcher.match(expr) - - if match: - return None - - # We need to count the number of table/column name pairs - # not the number of distinct Column objects in the query - # so to avoid counting aliased columns multiple times. - selected_columns = { - (col.table_name, col.column_name) - for col in query.get_columns_referenced_in_select() - } - - if len(selected_columns) < settings.COLUMN_SPLIT_MIN_COLS: - metrics.increment("column_splitter.main_query_min_threshold") - return None - - minimal_query = copy.deepcopy(query) - - # TODO: provide the table alias name to this splitter if we ever use it - # in joins. - minimal_query.set_ast_selected_columns( - [ - SelectedExpression( - self.__id_column, - ColumnExpr(self.__id_column, None, self.__id_column), - ), - SelectedExpression( - self.__project_column, - ColumnExpr(self.__project_column, None, self.__project_column), - ), - SelectedExpression( - self.__timestamp_column, - ColumnExpr(self.__timestamp_column, None, self.__timestamp_column), - ), - ] - ) - - for exp in minimal_query.get_all_expressions(): - if exp.alias in ( - self.__id_column, - self.__project_column, - self.__timestamp_column, - ) and not (isinstance(exp, ColumnExpr) and exp.column_name == exp.alias): - logger.warning( - "Potential alias shadowing due to column splitter", - extra={"expression": exp}, - exc_info=True, - ) - - # Ensures the AST minimal query is actually runnable on its own. - if not minimal_query.validate_aliases(): - return None - - # There is a Clickhouse bug where if functions in the ORDER BY clause are not in the SELECT, - # they fail on distributed tables. For that specific case, skip the query splitter. - for orderby in minimal_query.get_orderby(): - if isinstance( - orderby.expression, (FunctionCallExpr, CurriedFunctionCallExpr) - ): - metrics.increment("column_splitter.orderby_has_a_function") - return None - - result = runner(minimal_query, query_settings) - del minimal_query - - if not result.result["data"]: - metrics.increment("column_splitter.no_data_from_minimal_query") - return None - - # Making a copy just in case runner returned None (which would drive the execution - # strategy to ignore the result of this splitter and try the next one). - query = copy.deepcopy(query) - - event_ids = list( - set([event[self.__id_column] for event in result.result["data"]]) - ) - if len(event_ids) > settings.COLUMN_SPLIT_MAX_RESULTS: - # We may be runing a query that is beyond clickhouse maximum query size, - # so we cowardly abandon. - metrics.increment("column_splitter.intermediate_results_beyond_limit") - return None - - query.add_condition_to_ast( - in_condition( - ColumnExpr(None, None, self.__id_column), - [LiteralExpr(None, e_id) for e_id in event_ids], - ) - ) - query.set_offset(0) - query.set_limit(len(result.result["data"])) - - project_ids = list( - set([event[self.__project_column] for event in result.result["data"]]) - ) - _replace_ast_condition( - query, - self.__project_column, - "IN", - literals_tuple(None, [LiteralExpr(None, p_id) for p_id in project_ids]), - ) - - timestamps = [event[self.__timestamp_column] for event in result.result["data"]] - _replace_ast_condition( - query, - self.__timestamp_column, - ">=", - LiteralExpr(None, util.parse_datetime(min(timestamps))), - ) - # We add 1 second since this gets translated to ('timestamp', '<', to_date) - # and events are stored with a granularity of 1 second. - _replace_ast_condition( - query, - self.__timestamp_column, - "<", - LiteralExpr( - None, - (util.parse_datetime(max(timestamps)) + timedelta(seconds=1)), - ), - ) - return runner(query, query_settings) diff --git a/snuba/datasets/plans/storage_plan_builder.py b/snuba/datasets/plans/storage_plan_builder.py index f8c256fff52..45517fd0876 100644 --- a/snuba/datasets/plans/storage_plan_builder.py +++ b/snuba/datasets/plans/storage_plan_builder.py @@ -5,7 +5,6 @@ import sentry_sdk from snuba import settings as snuba_settings -from snuba import state from snuba.clickhouse.query import Query from snuba.clusters.cluster import ClickhouseCluster from snuba.datasets.entities.storage_selectors import QueryStorageSelector @@ -17,7 +16,6 @@ QueryPlanExecutionStrategy, QueryRunner, ) -from snuba.datasets.plans.splitters import QuerySplitStrategy from snuba.datasets.plans.translator.query import QueryTranslator from snuba.datasets.schemas import RelationalSource from snuba.datasets.schemas.tables import TableSource @@ -54,11 +52,9 @@ def __init__( self, cluster: ClickhouseCluster, db_query_processors: Sequence[ClickhouseQueryProcessor], - splitters: Optional[Sequence[QuerySplitStrategy]] = None, ) -> None: self.__cluster = cluster self.__query_processors = db_query_processors - self.__splitters = splitters or [] @with_span() def execute( @@ -89,18 +85,6 @@ def process_and_run_query( cluster_name=self.__cluster.get_clickhouse_cluster_name() or "", ) - use_split = state.get_config("use_split", 1) - if use_split: - for splitter in self.__splitters: - with sentry_sdk.start_span( - description=type(splitter).__name__, op="splitter" - ): - result = splitter.execute( - query, query_settings, process_and_run_query - ) - if result is not None: - return result - return process_and_run_query(query, query_settings) @@ -241,7 +225,6 @@ def build_and_rank_plans( execution_strategy=SimpleQueryPlanExecutionStrategy( cluster=cluster, db_query_processors=db_query_processors, - splitters=storage.get_query_splitters(), ), ) ] diff --git a/snuba/datasets/storage.py b/snuba/datasets/storage.py index 77e8c907d56..11debb69457 100644 --- a/snuba/datasets/storage.py +++ b/snuba/datasets/storage.py @@ -11,7 +11,6 @@ get_cluster, ) from snuba.clusters.storage_sets import StorageSetKey -from snuba.datasets.plans.splitters import QuerySplitStrategy from snuba.datasets.readiness_state import ReadinessState from snuba.datasets.schemas import Schema from snuba.datasets.schemas.tables import WritableTableSchema, WriteFormat @@ -80,15 +79,6 @@ def get_query_processors(self) -> Sequence[ClickhouseQueryProcessor]: """ raise NotImplementedError - def get_query_splitters(self) -> Sequence[QuerySplitStrategy]: - """ - If this storage supports splitting queries as optimizations, they are provided here. - These are optimizations, the query plan builder may decide to override the storage - and to skip the splitters. So correctness of the query must not depend on these - strategies to be applied. - """ - return [] - def get_mandatory_condition_checkers(self) -> Sequence[ConditionChecker]: """ Returns a list of expression patterns that need to always be @@ -130,13 +120,11 @@ def __init__( schema: Schema, readiness_state: ReadinessState, query_processors: Optional[Sequence[ClickhouseQueryProcessor]] = None, - query_splitters: Optional[Sequence[QuerySplitStrategy]] = None, mandatory_condition_checkers: Optional[Sequence[ConditionChecker]] = None, allocation_policies: Optional[list[AllocationPolicy]] = None, ) -> None: self.__storage_key = storage_key self.__query_processors = query_processors or [] - self.__query_splitters = query_splitters or [] self.__mandatory_condition_checkers = mandatory_condition_checkers or [] self.__allocation_policies = allocation_policies or [] super().__init__(storage_set_key, schema, readiness_state) @@ -147,9 +135,6 @@ def get_storage_key(self) -> StorageKey: def get_query_processors(self) -> Sequence[ClickhouseQueryProcessor]: return self.__query_processors - def get_query_splitters(self) -> Sequence[QuerySplitStrategy]: - return self.__query_splitters - def get_mandatory_condition_checkers(self) -> Sequence[ConditionChecker]: return self.__mandatory_condition_checkers @@ -166,7 +151,6 @@ def __init__( schema: Schema, query_processors: Sequence[ClickhouseQueryProcessor], stream_loader: KafkaStreamLoader, - query_splitters: Optional[Sequence[QuerySplitStrategy]] = None, mandatory_condition_checkers: Optional[Sequence[ConditionChecker]] = None, allocation_policies: Optional[list[AllocationPolicy]] = None, replacer_processor: Optional[ReplacerProcessor[Any]] = None, @@ -181,7 +165,6 @@ def __init__( schema, readiness_state, query_processors, - query_splitters, mandatory_condition_checkers, allocation_policies, ) diff --git a/tests/test_api.py b/tests/test_api.py index babfd0d9182..b14ef244724 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2105,40 +2105,37 @@ def test_max_limit(self) -> None: @patch("snuba.settings.RECORD_QUERIES", True) @patch("snuba.state.record_query") def test_record_queries(self, record_query_mock: MagicMock) -> None: - for use_split, expected_query_count in [(0, 1), (1, 2)]: - state.set_config("use_split", use_split) - record_query_mock.reset_mock() - result = json.loads( - self.post( - json.dumps( - { - "project": 1, - "tenant_ids": {"referrer": "test", "organization_id": 1234}, - "selected_columns": [ - "event_id", - "title", - "transaction", - "tags[a]", - "tags[b]", - "message", - "project_id", - ], - "limit": 5, - "from_date": self.base_time.isoformat(), - "to_date": ( - self.base_time + timedelta(minutes=self.minutes) - ).isoformat(), - } - ), - ).data - ) + record_query_mock.reset_mock() + result = json.loads( + self.post( + json.dumps( + { + "project": 1, + "tenant_ids": {"referrer": "test", "organization_id": 1234}, + "selected_columns": [ + "event_id", + "title", + "transaction", + "tags[a]", + "tags[b]", + "message", + "project_id", + ], + "limit": 5, + "from_date": self.base_time.isoformat(), + "to_date": ( + self.base_time + timedelta(minutes=self.minutes) + ).isoformat(), + } + ), + ).data + ) - assert len(result["data"]) == 5 - assert record_query_mock.call_count == 1 - metadata = record_query_mock.call_args[0][0] - assert metadata["dataset"] == "events" - assert metadata["request"]["referrer"] == "test" - assert len(metadata["query_list"]) == expected_query_count + assert len(result["data"]) == 5 + assert record_query_mock.call_count == 1 + metadata = record_query_mock.call_args[0][0] + assert metadata["dataset"] == "events" + assert metadata["request"]["referrer"] == "test" @patch("snuba.web.query._run_query_pipeline") def test_error_handler(self, pipeline_mock: MagicMock) -> None: diff --git a/tests/test_snql_api.py b/tests/test_snql_api.py index 48c54360cc3..8ce4b7c8df9 100644 --- a/tests/test_snql_api.py +++ b/tests/test_snql_api.py @@ -308,32 +308,29 @@ def test_project_rate_limiting(self) -> None: @patch("snuba.settings.RECORD_QUERIES", True) @patch("snuba.state.record_query") def test_record_queries(self, record_query_mock: Any) -> None: - for use_split, expected_query_count in [(0, 1), (1, 2)]: - state.set_config("use_split", use_split) - record_query_mock.reset_mock() - result = json.loads( - self.post( - "/events/snql", - data=json.dumps( - { - "query": f"""MATCH (events) - SELECT event_id, title, transaction, tags[a], tags[b], message, project_id - WHERE timestamp >= toDateTime('{self.base_time.isoformat()}') - AND timestamp < toDateTime('{self.next_time.isoformat()}') - AND project_id IN tuple({self.project_id}) - LIMIT 5""", - "tenant_ids": {"referrer": "test", "organization_id": 123}, - } - ), - ).data - ) + record_query_mock.reset_mock() + result = json.loads( + self.post( + "/events/snql", + data=json.dumps( + { + "query": f"""MATCH (events) + SELECT event_id, title, transaction, tags[a], tags[b], message, project_id + WHERE timestamp >= toDateTime('{self.base_time.isoformat()}') + AND timestamp < toDateTime('{self.next_time.isoformat()}') + AND project_id IN tuple({self.project_id}) + LIMIT 5""", + "tenant_ids": {"referrer": "test", "organization_id": 123}, + } + ), + ).data + ) - assert len(result["data"]) == 1 - assert record_query_mock.call_count == 1 - metadata = record_query_mock.call_args[0][0] - assert metadata["dataset"] == "events" - assert metadata["request"]["referrer"] == "test" - assert len(metadata["query_list"]) == expected_query_count + assert len(result["data"]) == 1 + assert record_query_mock.call_count == 1 + metadata = record_query_mock.call_args[0][0] + assert metadata["dataset"] == "events" + assert metadata["request"]["referrer"] == "test" @patch("snuba.settings.RECORD_QUERIES", True) @patch("snuba.state.record_query") diff --git a/tests/test_split.py b/tests/test_split.py deleted file mode 100644 index 36fe145fb11..00000000000 --- a/tests/test_split.py +++ /dev/null @@ -1,466 +0,0 @@ -from datetime import datetime -from typing import Any, MutableMapping, Sequence - -import pytest -from snuba_sdk.legacy import json_to_snql - -from snuba import state -from snuba.clickhouse.columns import ColumnSet, String -from snuba.clickhouse.query import Query as ClickhouseQuery -from snuba.clickhouse.query_dsl.accessors import get_time_range -from snuba.clusters.cluster import ClickhouseCluster -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.factory import get_dataset -from snuba.datasets.plans.splitters.strategies import ( - ColumnSplitQueryStrategy, - TimeSplitQueryStrategy, -) -from snuba.datasets.plans.storage_plan_builder import SimpleQueryPlanExecutionStrategy -from snuba.datasets.plans.translator.query import identity_translate -from snuba.query import SelectedExpression -from snuba.query.data_source.simple import Table -from snuba.query.expressions import Column -from snuba.query.query_settings import HTTPQuerySettings, QuerySettings -from snuba.query.snql.parser import parse_snql_query -from snuba.reader import Reader -from snuba.web import QueryResult - - -@pytest.fixture(autouse=True) -def setup_teardown(redis_db: None) -> None: - state.set_config("use_split", 1) - - -split_specs = [ - ( - "events", - "events", - "event_id", - "project_id", - "timestamp", - ), - ( - "transactions", - "transactions", - "event_id", - "project_id", - "finish_ts", - ), -] - - -@pytest.mark.parametrize( - "dataset_name, entity_name, id_column, project_column, timestamp_column", - split_specs, -) -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -def test_no_split( - dataset_name: str, - entity_name: str, - id_column: str, - project_column: str, - timestamp_column: str, -) -> None: - entity_key = EntityKey(entity_name) - entity = get_entity(entity_key) - query = ClickhouseQuery( - entity.get_all_storages()[0].get_schema().get_data_source(), - ) - - def do_query( - clickhouse_query: ClickhouseQuery, - query_settings: QuerySettings, - reader: Reader, - cluster_name: str, - ) -> QueryResult: - assert query == query - return QueryResult({}, {}) - - strategy = SimpleQueryPlanExecutionStrategy( - ClickhouseCluster("127.0.0.1", 1024, "default", "", "default", 80, set(), True), - [], - [ - ColumnSplitQueryStrategy( - id_column=id_column, - project_column=project_column, - timestamp_column=timestamp_column, - ), - TimeSplitQueryStrategy(timestamp_col=timestamp_column), - ], - ) - - strategy.execute(query, HTTPQuerySettings(), do_query) - - -@pytest.mark.redis_db -def test_set_limit_on_split_query(): - storage = get_entity(EntityKey.EVENTS).get_all_storages()[0] - query = ClickhouseQuery( - Table("events", storage.get_schema().get_columns()), - selected_columns=[ - SelectedExpression(col.name, Column(None, None, col.name)) - for col in storage.get_schema().get_columns() - ], - limit=420, - ) - - query_run_count = 0 - - def do_query( - clickhouse_query: ClickhouseQuery, query_settings: QuerySettings - ) -> QueryResult: - nonlocal query_run_count - query_run_count += 1 - if query_run_count == 1: - return QueryResult( - result={ - "data": [ - { - "event_id": "a", - "project_id": "1", - "timestamp": " 2019-10-01 22:33:42", - }, - { - "event_id": "a", - "project_id": "1", - "timestamp": " 2019-10-01 22:44:42", - }, - ] - }, - extra={}, - ) - else: - assert clickhouse_query.get_limit() == 2 - return QueryResult({}, {}) - - ColumnSplitQueryStrategy( - id_column="event_id", - project_column="project_id", - timestamp_column="timestamp", - ).execute(query, HTTPQuerySettings(), do_query) - assert query_run_count == 2 - - -test_data_col = [ - ( - "events", - "events", - "event_id", - "project_id", - "timestamp", - [{"event_id": "a", "project_id": "1", "timestamp": " 2019-10-01 22:33:42"}], - [ - { - "event_id": "a", - "project_id": "1", - "level": "error", - "timestamp": " 2019-10-01 22:33:42", - } - ], - ), - ( - "transactions", - "transactions", - "event_id", - "project_id", - "finish_ts", - [{"event_id": "a", "project_id": "1", "finish_ts": "2019-10-01 22:33:42"}], - [ - { - "event_id": "a", - "project_id": "1", - "level": "error", - "finish_ts": "2019-10-01 22:33:42", - } - ], - ), -] - - -@pytest.mark.parametrize( - "dataset_name, entity_name, id_column, project_column, timestamp_column, first_query_data, second_query_data", - test_data_col, -) -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -def test_col_split( - dataset_name: str, - entity_name: str, - id_column: str, - project_column: str, - timestamp_column: str, - first_query_data: Sequence[MutableMapping[str, Any]], - second_query_data: Sequence[MutableMapping[str, Any]], -) -> None: - def do_query( - clickhouse_query: ClickhouseQuery, - query_settings: QuerySettings, - reader: Reader, - cluster_name: str, - ) -> QueryResult: - selected_col_names = [ - c.expression.column_name - for c in query.get_selected_columns() or [] - if isinstance(c.expression, Column) - ] - if selected_col_names == list(first_query_data[0].keys()): - return QueryResult({"data": first_query_data}, {}) - elif selected_col_names == list(second_query_data[0].keys()): - return QueryResult({"data": second_query_data}, {}) - else: - raise ValueError(f"Unexpected selected columns: {selected_col_names}") - - entity_key = EntityKey(entity_name) - entity = get_entity(entity_key) - query = ClickhouseQuery( - entity.get_all_storages()[0].get_schema().get_data_source(), - selected_columns=[ - SelectedExpression(name=col_name, expression=Column(None, None, col_name)) - for col_name in second_query_data[0].keys() - ], - ) - - strategy = SimpleQueryPlanExecutionStrategy( - ClickhouseCluster("127.0.0.1", 1024, "default", "", "default", 80, set(), True), - [], - [ - ColumnSplitQueryStrategy(id_column, project_column, timestamp_column), - TimeSplitQueryStrategy(timestamp_col=timestamp_column), - ], - ) - - strategy.execute(query, HTTPQuerySettings(), do_query) - - -column_set = ColumnSet( - [ - ("event_id", String()), - ("project_id", String()), - ("timestamp", String()), - ("level", String()), - ("release", String()), - ("platform", String()), - ("transaction", String()), - ] -) - -column_split_tests = [ - ( - "event_id", - "project_id", - "timestamp", - { - "selected_columns": [ - "event_id", - "level", - "release", - "platform", - "transaction", - "timestamp", - "project_id", - ], - "conditions": [ - ("timestamp", ">=", "2019-09-19T10:00:00"), - ("timestamp", "<", "2019-09-19T12:00:00"), - ("project_id", "IN", [1, 2, 3]), - ], - "groupby": ["timestamp"], - "limit": 10, - }, - False, - ), # Query with group by. No split - ( - "event_id", - "project_id", - "timestamp", - { - "selected_columns": [ - "event_id", - "level", - "release", - "platform", - "transaction", - "timestamp", - "project_id", - ], - "conditions": [ - ("timestamp", ">=", "2019-09-19T10:00:00"), - ("timestamp", "<", "2019-09-19T12:00:00"), - ("project_id", "IN", [1, 2, 3]), - ], - "limit": 10, - }, - True, - ), # Valid query to split - ( - "event_id", - "project_id", - "timestamp", - { - "selected_columns": ["event_id", "level", "release", "platform"], - "conditions": [ - ("timestamp", ">=", "2019-09-19T10:00:00"), - ("timestamp", "<", "2019-09-19T12:00:00"), - ("project_id", "IN", [1, 2, 3]), - ], - "limit": 10, - }, - False, - ), # Valid query but not enough columns in the select - ( - "event_id", - "project_id", - "timestamp", - { - "selected_columns": [ - "event_id", - "level", - "release", - "platform", - "transaction", - "timestamp", - "project_id", - ], - "conditions": [ - ("timestamp", ">=", "2019-09-19T10:00:00"), - ("timestamp", "<", "2019-09-19T12:00:00"), - ("project_id", "IN", [1, 2, 3]), - ("event_id", "=", "a" * 32), - ], - "limit": 10, - }, - False, - ), # Query with = on event_id, not split - ( - "event_id", - "project_id", - "timestamp", - { - "selected_columns": [ - "event_id", - "level", - "release", - "platform", - "transaction", - "timestamp", - "project_id", - ], - "conditions": [ - ("timestamp", ">=", "2019-09-19T10:00:00"), - ("timestamp", "<", "2019-09-19T12:00:00"), - ("project_id", "IN", [1, 2, 3]), - ("event_id", "IN", ["a" * 32, "b" * 32]), - ], - "limit": 10, - }, - False, - ), # Query with IN on event_id - do not split - ( - "event_id", - "project_id", - "timestamp", - { - "selected_columns": [ - "event_id", - "level", - "release", - "platform", - "transaction", - "timestamp", - "project_id", - ], - "conditions": [ - ("timestamp", ">=", "2019-09-19T10:00:00"), - ("timestamp", "<", "2019-09-19T12:00:00"), - ("project_id", "IN", [1, 2, 3]), - ("event_id", ">", "a" * 32), - ], - "limit": 10, - }, - True, - ), # Query with other condition on event_id - proceed with split -] - - -@pytest.mark.parametrize( - "id_column, project_column, timestamp_column, query, expected_result", - column_split_tests, -) -@pytest.mark.redis_db -def test_col_split_conditions( - id_column: str, project_column: str, timestamp_column: str, query, expected_result -) -> None: - dataset = get_dataset("events") - request = json_to_snql(query, "events") - request.validate() - query, _ = parse_snql_query(str(request.query), dataset) - splitter = ColumnSplitQueryStrategy(id_column, project_column, timestamp_column) - - def do_query( - query: ClickhouseQuery, query_settings: QuerySettings = None - ) -> QueryResult: - return QueryResult( - { - "data": [ - { - id_column: "asd123", - project_column: 123, - timestamp_column: "2019-10-01 22:33:42", - } - ] - }, - {}, - ) - - assert ( - splitter.execute(query, HTTPQuerySettings(), do_query) is not None - ) == expected_result - - -@pytest.mark.redis_db -def test_time_split_ast() -> None: - """ - Test that the time split transforms the query properly both on the old representation - and on the AST representation. - """ - found_timestamps = [] - - def do_query( - query: ClickhouseQuery, - query_settings: QuerySettings, - ) -> QueryResult: - from_date_ast, to_date_ast = get_time_range(query, "timestamp") - assert from_date_ast is not None and isinstance(from_date_ast, datetime) - assert to_date_ast is not None and isinstance(to_date_ast, datetime) - - found_timestamps.append((from_date_ast.isoformat(), to_date_ast.isoformat())) - - return QueryResult({"data": []}, {}) - - body = """ - MATCH (events) - SELECT event_id, level, offset, partition, transaction, timestamp, project_id - WHERE timestamp >= toDateTime('2019-09-18T10:00:00') - AND timestamp < toDateTime('2019-09-19T12:00:00') - AND project_id IN tuple(1) - ORDER BY timestamp DESC - LIMIT 10 - """ - - query, _ = parse_snql_query(body, get_dataset("events")) - entity = get_entity(query.get_from_clause().key) - settings = HTTPQuerySettings() - for p in entity.get_query_processors(): - p.process_query(query, settings) - - clickhouse_query = identity_translate(query) - splitter = TimeSplitQueryStrategy("timestamp") - splitter.execute(clickhouse_query, settings, do_query) - - assert found_timestamps == [ - ("2019-09-19T11:00:00", "2019-09-19T12:00:00"), - ("2019-09-19T01:00:00", "2019-09-19T11:00:00"), - ("2019-09-18T10:00:00", "2019-09-19T01:00:00"), - ]