diff --git a/.vscode/launch.json b/.vscode/launch.json index 7a69e0502c3..b296b614ba0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,6 +1,13 @@ { "version": "0.2.0", "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + }, { "name": "Pytest Current File", "type": "debugpy", @@ -10,7 +17,6 @@ "${file}" ], "console": "integratedTerminal", - "justMyCode": true // wasn't sure if I should keep or delete } ] } diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 128da9b384f..19e4147ba28 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -1,4 +1,5 @@ import os +import re from typing import Optional, Sequence import click @@ -392,5 +393,11 @@ def generate( The migration will be written into the local directory. The user is responsible for making the commit, PR, and merging. """ + expected_pattern = r"(.+/)?snuba/datasets/configuration/.*/storages/.*\.(yml|yaml)" + if not re.fullmatch(expected_pattern, storage_path): + raise click.ClickException( + f"Storage path {storage_path} does not match expected pattern {expected_pattern}" + ) + autogeneration.generate(storage_path) click.echo("This function is under construction.") diff --git a/snuba/datasets/configuration/replays/storages/replays.yaml b/snuba/datasets/configuration/replays/storages/replays.yaml index c66d5dd40f7..740c981ea75 100644 --- a/snuba/datasets/configuration/replays/storages/replays.yaml +++ b/snuba/datasets/configuration/replays/storages/replays.yaml @@ -190,19 +190,19 @@ allocation_policies: - project_id default_config_overrides: is_enforced: 0 - - name: BytesScannedWindowAllocationPolicy + - name: ReferrerGuardRailPolicy args: required_tenant_types: - - organization_id - referrer default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 100000 - - name: ReferrerGuardRailPolicy + is_enforced: 0 + is_active: 0 + - name: BytesScannedRejectingPolicy args: required_tenant_types: + - organization_id + - project_id - referrer default_config_overrides: - is_enforced: 0 is_active: 0 + is_enforced: 0 diff --git a/snuba/datasets/configuration/spans/storages/spans.yaml b/snuba/datasets/configuration/spans/storages/spans.yaml index 6fbecb941fc..5a06b7ee647 100644 --- a/snuba/datasets/configuration/spans/storages/spans.yaml +++ b/snuba/datasets/configuration/spans/storages/spans.yaml @@ -115,22 +115,23 @@ allocation_policies: - project_id default_config_overrides: is_enforced: 0 - - name: BytesScannedWindowAllocationPolicy + - name: ReferrerGuardRailPolicy args: required_tenant_types: - - organization_id - referrer default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 100000 - - name: ReferrerGuardRailPolicy + is_enforced: 0 + is_active: 0 + - name: BytesScannedRejectingPolicy args: required_tenant_types: + - organization_id + - project_id - referrer default_config_overrides: - is_enforced: 0 is_active: 0 + is_enforced: 0 + query_processors: - processor: UniqInSelectAndHavingProcessor - processor: UUIDColumnProcessor diff --git a/snuba/migrations/autogeneration/diff.py b/snuba/migrations/autogeneration/diff.py new file mode 100644 index 00000000000..3931ae60244 --- /dev/null +++ b/snuba/migrations/autogeneration/diff.py @@ -0,0 +1,139 @@ +from typing import cast + +import yaml + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.datasets.configuration.utils import parse_columns +from snuba.migrations.columns import MigrationModifiers +from snuba.migrations.operations import AddColumn, DropColumn, OperationTarget +from snuba.utils.schemas import Column, ColumnType, SchemaModifiers + +""" +This file is for autogenerating the migration for adding a column to your storage. +""" + + +def generate_migration_ops( + oldstorage: str, newstorage: str +) -> tuple[list[AddColumn], list[DropColumn]]: + """ + Input: + old_storage, the original storage yaml in str format + new_storage, the modified storage yaml in str format + + Returns a tuple (forwardops, backwardsops) this are the forward and backward migration + operations required to migrate the storage as described in the given yaml files. + + Only supports adding columns, throws error for anything else. + """ + valid, reason = _is_valid_add_column(oldstorage, newstorage) + if not valid: + raise ValueError(reason) + + oldcol_names = set( + col["name"] for col in yaml.safe_load(oldstorage)["schema"]["columns"] + ) + newstorage_dict = yaml.safe_load(newstorage) + newcols = newstorage_dict["schema"]["columns"] + + forwardops: list[AddColumn] = [] + for i, col in enumerate(newcols): + if col["name"] not in oldcol_names: + column = _schema_column_to_migration_column(parse_columns([col])[0]) + after = newcols[i - 1]["name"] + storage_set = StorageSetKey(newstorage_dict["storage"]["set_key"]) + forwardops += [ + AddColumn( + storage_set=storage_set, + table_name=newstorage_dict["schema"]["local_table_name"], + column=column, + after=after, + target=OperationTarget.LOCAL, + ), + AddColumn( + storage_set=storage_set, + table_name=newstorage_dict["schema"]["dist_table_name"], + column=column, + after=after, + target=OperationTarget.DISTRIBUTED, + ), + ] + return (forwardops, [op.get_reverse() for op in reversed(forwardops)]) + + +def _is_valid_add_column(oldstorage: str, newstorage: str) -> tuple[bool, str]: + """ + Input: + old_storage, the old storage yaml in str format + new_storage, the modified storage yaml in str format + + Returns true if the changes to the storage is valid column addition, false otherwise, + along with a reasoning. + """ + oldstorage_dict = yaml.safe_load(oldstorage) + newstorage_dict = yaml.safe_load(newstorage) + if oldstorage_dict == newstorage_dict: + return True, "storages are the same" + + # nothing changed but the columns + t1 = oldstorage_dict["schema"].pop("columns") + t2 = newstorage_dict["schema"].pop("columns") + if not (oldstorage_dict == newstorage_dict): + return ( + False, + "Expected the only change to the storage to be the columns, but that is not true", + ) + oldstorage_dict["schema"]["columns"] = t1 + newstorage_dict["schema"]["columns"] = t2 + + # only changes to columns is additions + oldstorage_cols = oldstorage_dict["schema"]["columns"] + newstorage_cols = newstorage_dict["schema"]["columns"] + + colnames_old = set(e["name"] for e in oldstorage_cols) + colnames_new = set(e["name"] for e in newstorage_cols) + if not colnames_old.issubset(colnames_new): + return (False, "Column removal is not supported") + + pold, pnew = 0, 0 + while pold < len(oldstorage_cols) and pnew < len(newstorage_cols): + curr_old = oldstorage_cols[pold] + curr_new = newstorage_cols[pnew] + + if curr_old == curr_new: + pold += 1 + pnew += 1 + elif curr_new["name"] in colnames_old: + return ( + False, + f"Modification to columns in unsupported, column '{curr_new['name']}' was modified or reordered", + ) + else: + if pold == 0: + return ( + False, + "Adding a column to the beginning is currently unsupported, please add it anywhere else.", + ) + else: + pnew += 1 + assert pold == len(oldstorage_cols) # should always hold + return True, "" + + +def _schema_column_to_migration_column( + column: Column[SchemaModifiers], +) -> Column[MigrationModifiers]: + """ + Given SchemaModifiers returns equivalent MigrationModifiers. + Only nullable is supported, throws error if conversion cant be made. + """ + newtype = cast(ColumnType[MigrationModifiers], column.type.get_raw()) + mods = column.type.get_modifiers() + if not mods: + return Column(column.name, newtype) + + # convert schema modifiers to migration modifiers + if mods.readonly: + raise ValueError("readonly modifier is not supported") + newtype = newtype.set_modifiers(MigrationModifiers(nullable=mods.nullable)) + return Column(column.name, newtype) diff --git a/snuba/migrations/autogeneration/main.py b/snuba/migrations/autogeneration/main.py index 447fd040be6..1be2456951c 100644 --- a/snuba/migrations/autogeneration/main.py +++ b/snuba/migrations/autogeneration/main.py @@ -1,11 +1,27 @@ import os import subprocess +from snuba.migrations.autogeneration.diff import generate_migration_ops -def generate(storage_path: str) -> tuple[str, str]: - storage_path = os.path.realpath(os.path.abspath(os.path.expanduser(storage_path))) - # get the version of the file at HEAD +def generate(storage_path: str) -> None: + # load into memory the given storage and the version of it at HEAD + new_storage, old_storage = get_working_and_head(storage_path) + + # generate the migration operations + generate_migration_ops(old_storage, new_storage) + + +def get_working_and_head(path: str) -> tuple[str, str]: + """ + Given a path to a file, returns the contents of the file in the working directory + and the contents of it at HEAD in the git repo, as a tuple: (working, head) + + preconditions: + - path is a valid path to a file in a git repo + """ + path = os.path.realpath(os.path.abspath(os.path.expanduser(path))) + # get the version at HEAD try: repo_path = ( subprocess.run( @@ -14,15 +30,15 @@ def generate(storage_path: str) -> tuple[str, str]: "rev-parse", "--show-toplevel", ], - cwd=os.path.dirname(storage_path), + cwd=os.path.dirname(path), capture_output=True, check=True, ) .stdout.decode("utf-8") .strip() ) - repo_rel_path = os.path.relpath(storage_path, repo_path) - old_storage = subprocess.run( + repo_rel_path = os.path.relpath(path, repo_path) + head_file = subprocess.run( ["git", "show", f"HEAD:{repo_rel_path}"], cwd=repo_path, capture_output=True, @@ -31,8 +47,8 @@ def generate(storage_path: str) -> tuple[str, str]: except subprocess.CalledProcessError as e: raise ValueError(e.stderr.decode("utf-8")) from e - # get the user-provided (modified) storage - with open(storage_path, "r") as f: - new_storage = f.read() + # working + with open(path, "r") as f: + working_file = f.read() - return old_storage, new_storage + return (working_file, head_file) diff --git a/snuba/migrations/clickhouse.py b/snuba/migrations/clickhouse.py index 7fb134b0e3b..c04958faa55 100644 --- a/snuba/migrations/clickhouse.py +++ b/snuba/migrations/clickhouse.py @@ -1,4 +1,4 @@ CLICKHOUSE_SERVER_MIN_VERSION = "21.8.12.29" # Note: 21.8.12.29 and 21.8.13.1 are used in self-hosted builds # even though SaaS clusters are all on 22.8 or above -CLICKHOUSE_SERVER_MAX_VERSION = "23.3.19.33" +CLICKHOUSE_SERVER_MAX_VERSION = "23.8.11.29" diff --git a/snuba/migrations/group_loader.py b/snuba/migrations/group_loader.py index 2cbc0987846..fb7cd33c4bb 100644 --- a/snuba/migrations/group_loader.py +++ b/snuba/migrations/group_loader.py @@ -1,6 +1,8 @@ from __future__ import annotations +import os from abc import ABC, abstractmethod +from glob import glob from importlib import import_module from typing import Sequence @@ -30,14 +32,44 @@ class DirectoryLoader(GroupLoader, ABC): """ Loads migrations that are defined as files of a directory. The file name represents the migration ID. + + Migrations must be named: xxxx_migration_name.py where xxxx is 4 digit, + 0 padded migration number. As regex: [0-9][0-9][0-9][0-9]_.*\.py + Within a dir, migration number are strictly increasing by 1 beginning at + 0001 """ def __init__(self, module_path: str) -> None: - self.__module = module_path + self.__module = module_path # the one with dots not slashes - @abstractmethod def get_migrations(self) -> Sequence[str]: - raise NotImplementedError + """ + Migrations must be in the folder specified by module_path. + see class comment for migration naming scheme. + """ + # the folder that the migrations should be in + migration_folder = self.__module.replace(".", "/") + if not os.path.exists(migration_folder): + return [] + # grab the migrations, ignore all other files + migration_filenames = sorted( + map( + lambda x: os.path.basename(x)[:-3], + glob(os.path.join(migration_folder, "[0-9][0-9][0-9][0-9]_*.py")), + ) + ) + # validate no duplicate migration numbers + last = None + for fname in migration_filenames: + if last is not None and fname[:4] == last[:4]: + raise ValueError( + f"""Duplicate migration number for the following files: + {os.path.join(migration_folder,last)}.py + {os.path.join(migration_folder,fname)}.py""" + ) + last = fname + + return migration_filenames def load_migration(self, migration_id: str) -> Migration: try: @@ -51,73 +83,16 @@ class SystemLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.migrations.system_migrations") - def get_migrations(self) -> Sequence[str]: - return ["0001_migrations"] - class EventsLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.events") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_events_initial", - "0002_events_onpremise_compatibility", - "0003_errors", - "0004_errors_onpremise_compatibility", - "0005_events_tags_hash_map", - "0006_errors_tags_hash_map", - "0007_groupedmessages", - "0008_groupassignees", - "0009_errors_add_http_fields", - "0010_groupedmessages_onpremise_compatibility", - "0011_rebuild_errors", - "0012_errors_make_level_nullable", - "0013_errors_add_hierarchical_hashes", - "0014_backfill_errors", - "0015_truncate_events", - "0016_drop_legacy_events", - "0017_errors_add_indexes", - "0018_errors_ro_add_tags_hash_map", - "0019_add_replay_id_column", - "0020_add_main_thread_column", - "0021_add_replay_id_errors_ro", - "0022_add_main_thread_column_errors_ro", - "0023_add_trace_sampled_num_processing_errors_columns", - "0024_add_trace_sampled_num_processing_errors_columns_ro", - ] - class TransactionsLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.transactions") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_transactions", - "0002_transactions_onpremise_fix_orderby_and_partitionby", - "0003_transactions_onpremise_fix_columns", - "0004_transactions_add_tags_hash_map", - "0005_transactions_add_measurements", - "0006_transactions_add_http_fields", - "0007_transactions_add_discover_cols", - "0008_transactions_add_timestamp_index", - "0009_transactions_fix_title_and_message", - "0010_transactions_nullable_trace_id", - "0011_transactions_add_span_op_breakdowns", - "0012_transactions_add_spans", - "0013_transactions_reduce_spans_exclusive_time", - "0014_transactions_remove_flattened_columns", - "0015_transactions_add_source_column", - "0016_transactions_add_group_ids_column", - "0017_transactions_add_app_start_type_column", - "0018_transactions_add_profile_id", - "0019_transactions_add_indexes_and_context_hash", - "0020_transactions_add_codecs", - "0021_transactions_add_replay_id", - "0022_transactions_add_index_on_trace_id", - ] - class DiscoverLoader(DirectoryLoader): """ @@ -127,299 +102,72 @@ class DiscoverLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.discover") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_discover_merge_table", - "0002_discover_add_deleted_tags_hash_map", - "0003_discover_fix_user_column", - "0004_discover_fix_title_and_message", - "0005_discover_fix_transaction_name", - "0006_discover_add_trace_id", - "0007_discover_add_span_id", - "0008_discover_fix_add_local_table", - "0009_discover_add_replay_id", - ] - class OutcomesLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.outcomes") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_outcomes", - "0002_outcomes_remove_size_and_bytes", - "0003_outcomes_add_category_and_quantity", - "0004_outcomes_matview_additions", - "0005_outcomes_ttl", - "0006_outcomes_add_size_col", - "0007_outcomes_add_event_id_ttl_codec", - "0008_outcomes_add_indexes", - ] - class ReplaysLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.replays") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_replays", - "0002_add_url", - "0003_alter_url_allow_null", - "0004_add_error_ids_column", - "0005_add_urls_user_agent_replay_start_timestamp", - "0006_add_is_archived_column", - "0007_add_replay_type_column", - "0008_add_sample_rate", - "0009_add_dom_index_columns", - "0010_add_nullable_columns", - "0011_add_is_dead_rage", - "0012_materialize_counts", - "0013_add_low_cardinality_codecs", - "0014_add_id_event_columns", - "0015_index_frequently_accessed_columns", - "0016_materialize_new_event_counts", - "0017_add_component_name_column", - "0018_add_viewed_by_id_column", - "0019_add_materialization", - "0020_add_dist_migration_for_materialization", - ] - class MetricsLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.metrics") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_metrics_buckets", - "0002_metrics_sets", - "0003_counters_to_buckets", - "0004_metrics_counters", - "0005_metrics_distributions_buckets", - "0006_metrics_distributions", - "0007_metrics_sets_granularity_10", - "0008_metrics_counters_granularity_10", - "0009_metrics_distributions_granularity_10", - "0010_metrics_sets_granularity_1h", - "0011_metrics_counters_granularity_1h", - "0012_metrics_distributions_granularity_1h", - "0013_metrics_sets_granularity_1d", - "0014_metrics_counters_granularity_1d", - "0015_metrics_distributions_granularity_1d", - "0016_metrics_sets_consolidated_granularity", - "0017_metrics_counters_consolidated_granularity", - "0018_metrics_distributions_consolidated_granularity", - "0019_aggregate_tables_add_ttl", - "0020_polymorphic_buckets_table", - "0021_polymorphic_bucket_materialized_views", - "0022_repartition_polymorphic_table", - "0023_polymorphic_repartitioned_bucket_matview", - "0024_metrics_distributions_add_histogram", - "0025_metrics_counters_aggregate_v2", - "0026_metrics_counters_v2_writing_matview", - "0027_fix_migration_0026", - "0028_metrics_sets_aggregate_v2", - "0029_metrics_distributions_aggregate_v2", - "0030_metrics_distributions_v2_writing_mv", - "0031_metrics_sets_v2_writing_mv", - "0032_redo_0030_and_0031_without_timestamps", - "0033_metrics_cleanup_old_views", - "0034_metrics_cleanup_old_tables", - "0035_metrics_raw_timeseries_id", - ] - class SessionsLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.sessions") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_sessions", - "0002_sessions_aggregates", - "0003_sessions_matview", - "0004_sessions_ttl", - "0005_drop_sessions_tables", - ] - class QuerylogLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.querylog") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_querylog", - "0002_status_type_change", - "0003_add_profile_fields", - "0004_add_bytes_scanned", - "0005_add_codec_update_settings", - "0006_sorting_key_change", - "0007_add_offset_column", - ] - class TestMigrationLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.test_migration") - def get_migrations(self) -> Sequence[str]: - return ["0001_create_test_table", "0002_add_test_col"] - class ProfilesLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.profiles") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_profiles", - "0002_disable_vertical_merge_algorithm", - "0003_add_device_architecture", - "0004_drop_profile_column", - ] - class FunctionsLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.functions") - def get_migrations(self) -> Sequence[str]: - return ["0001_functions", "0002_add_new_columns_to_raw_functions"] - class GenericMetricsLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.generic_metrics") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_sets_aggregate_table", - "0002_sets_raw_table", - "0003_sets_mv", - "0004_sets_raw_add_granularities", - "0005_sets_replace_mv", - "0006_sets_raw_add_granularities_dist_table", - "0007_distributions_aggregate_table", - "0008_distributions_raw_table", - "0009_distributions_mv", - "0010_counters_aggregate_table", - "0011_counters_raw_table", - "0012_counters_mv", - "0013_distributions_dist_tags_hash", - "0014_distribution_add_options", - "0015_sets_add_options", - "0016_counters_add_options", - "0017_distributions_mv2", - "0018_sets_update_opt_default", - "0019_counters_update_opt_default", - "0020_sets_mv2", - "0021_counters_mv2", - "0022_gauges_aggregate_table", - "0023_gauges_raw_table", - "0024_gauges_mv", - "0025_counters_add_raw_tags_hash_column", - "0026_gauges_add_raw_tags_hash_column", - "0027_sets_add_raw_tags_column", - "0028_distributions_add_indexed_tags_column", - "0029_add_use_case_id_index", - "0030_add_record_meta_column", - "0031_counters_meta_table", - "0032_counters_meta_table_mv", - "0033_counters_meta_tag_values_table", - "0034_counters_meta_tag_values_table_mv", - "0035_recreate_counters_meta_tag_value_table_mv", - "0036_counters_meta_tables_final", - "0037_add_record_meta_column_sets", - "0038_add_record_meta_column_distributions", - "0039_add_record_meta_column_gauges", - "0040_remove_counters_meta_tables", - "0041_adjust_partitioning_meta_tables", - "0042_rename_counters_meta_tables", - "0043_sets_meta_tables", - "0044_gauges_meta_tables", - "0045_distributions_meta_tables", - "0046_distributions_add_disable_percentiles", - "0047_distributions_mv3", - "0048_counters_meta_tables_support_empty_tags", - "0049_sets_meta_tables_support_empty_tags", - "0050_distributions_meta_tables_support_empty_tags", - "0051_gauges_meta_tables_support_empty_tags", - ] - class SearchIssuesLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.search_issues") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_search_issues", - "0002_search_issues_add_tags_hash_map", - "0003_search_issues_modify_occurrence_type_id_size", - "0004_rebuild_search_issues_with_version", - "0005_search_issues_v2", - "0006_add_subtitle_culprit_level_resource_id", - "0007_add_transaction_duration", - "0008_add_profile_id_replay_id", - "0009_add_message", - ] - class SpansLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.spans") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_spans_v1", - "0002_spans_add_tags_hashmap", - "0003_spans_add_ms_columns", - "0004_spans_group_raw_col", - "0005_spans_add_sentry_tags", - "0006_spans_add_profile_id", - "0007_spans_add_metrics_summary", - "0008_spans_add_index_on_span_id", - "0009_spans_add_measure_hashmap", - "0010_spans_add_compression", - "0011_spans_add_index_on_trace_id", - "0012_spans_add_index_on_transaction_name", - "0013_spans_add_indexes_for_tag_columns", - "0014_spans_add_microsecond_precision_timestamps", - ] - class GroupAttributesLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.group_attributes") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_group_attributes", - "0002_add_priority_to_group_attributes", - "0003_add_first_release_id_to_group_attributes", - ] - class MetricsSummariesLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.metrics_summaries") - def get_migrations(self) -> Sequence[str]: - return [ - "0001_metrics_summaries_create_table", - "0002_metrics_summaries_add_tags_hashmap", - "0003_metrics_summaries_add_segment_id_duration_group_columns", - ] - class ProfileChunksLoader(DirectoryLoader): def __init__(self) -> None: super().__init__("snuba.snuba_migrations.profile_chunks") - - def get_migrations(self) -> Sequence[str]: - return [ - "0001_create_profile_chunks_table", - ] diff --git a/snuba/migrations/operations.py b/snuba/migrations/operations.py index 6ec5642471b..b36e601f3e3 100644 --- a/snuba/migrations/operations.py +++ b/snuba/migrations/operations.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from abc import ABC, abstractmethod from enum import Enum @@ -82,6 +84,11 @@ def execute(self) -> None: def format_sql(self) -> str: raise NotImplementedError + def __eq__(self, other: Any) -> bool: + if not isinstance(other, SqlOperation): + return False + return other.format_sql() == self.format_sql() + class RunSql(SqlOperation): def __init__( @@ -272,6 +279,14 @@ def format_sql(self) -> str: optional_after_clause = f" AFTER {self.__after}" if self.__after else "" return f"ALTER TABLE {self.table_name} ADD COLUMN IF NOT EXISTS {column}{optional_after_clause};" + def get_reverse(self) -> DropColumn: + return DropColumn( + storage_set=self.storage_set, + table_name=self.table_name, + column_name=self.column.name, + target=self.target, + ) + class DropColumn(SqlOperation): """ diff --git a/snuba/query/allocation_policies/concurrent_rate_limit.py b/snuba/query/allocation_policies/concurrent_rate_limit.py index 005249de074..be96331e7e2 100644 --- a/snuba/query/allocation_policies/concurrent_rate_limit.py +++ b/snuba/query/allocation_policies/concurrent_rate_limit.py @@ -28,7 +28,11 @@ _PASS_THROUGH_REFERRERS = set( [ + # these referrers are tied to ingest and are better limited by the ReferrerGuardRailPolicy "subscriptions_executor", + "tsdb-modelid:4.batch_alert_event_frequency", + "tsdb-modelid:4.batch_alert_event_uniq_user_frequency", + "tsdb-modelid:4.batch_alert_event_frequency_percent", ] ) from snuba.query.allocation_policies import MAX_THRESHOLD, NO_SUGGESTION diff --git a/snuba/web/db_query.py b/snuba/web/db_query.py index 0015e6a474f..053a1f0e2f4 100644 --- a/snuba/web/db_query.py +++ b/snuba/web/db_query.py @@ -4,6 +4,7 @@ import random import uuid from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from functools import partial from hashlib import md5 from threading import Lock @@ -26,6 +27,7 @@ from snuba.clickhouse.query_profiler import generate_profile from snuba.query import ProcessableQuery from snuba.query.allocation_policies import ( + MAX_THRESHOLD, AllocationPolicy, AllocationPolicyViolations, QueryResultOrError, @@ -76,6 +78,9 @@ redis_cache_client = get_redis_client(RedisClientKey.CACHE) +_REJECTED_BY = "rejected_by" +_THROTTLED_BY = "throttled_by" + class ResultCacheCodec(ExceptionAwareCodec[bytes, Result]): def encode(self, value: Result) -> bytes: @@ -808,6 +813,34 @@ def db_query( ) +@dataclass +class _QuotaAndPolicy: + quota_allowance: QuotaAllowance + policy_name: str + + +def _add_quota_info( + summary: dict[str, Any], + action: str, + quota_and_policy: _QuotaAndPolicy | None = None, +) -> None: + + quota_info: dict[str, Any] = {} + summary[action] = quota_info + + if quota_and_policy is not None: + quota_info["policy"] = quota_and_policy.policy_name + quota_allowance = quota_and_policy.quota_allowance + quota_info["quota_used"] = quota_allowance.quota_used + quota_info["quota_unit"] = quota_allowance.quota_unit + quota_info["suggestion"] = quota_allowance.suggestion + + if action == _REJECTED_BY: + quota_info["rejection_threshold"] = quota_allowance.rejection_threshold + else: + quota_info["throttle_threshold"] = quota_allowance.throttle_threshold + + def _apply_allocation_policies_quota( query_settings: QuerySettings, attribution_info: AttributionInfo, @@ -822,6 +855,9 @@ def _apply_allocation_policies_quota( """ quota_allowances: dict[str, QuotaAllowance] = {} can_run = True + rejection_quota_and_policy = None + throttle_quota_and_policy = None + num_threads = MAX_THRESHOLD with sentry_sdk.start_span( op="allocation_policy", description="_apply_allocation_policies_quota" ) as span: @@ -833,13 +869,23 @@ def _apply_allocation_policies_quota( allowance = allocation_policy.get_quota_allowance( attribution_info.tenant_ids, query_id ) + num_threads = min(num_threads, allowance.max_threads) can_run &= allowance.can_run quota_allowances[allocation_policy.config_key()] = allowance span.set_data( "quota_allowance", quota_allowances[allocation_policy.config_key()], ) + if allowance.is_throttled: + throttle_quota_and_policy = _QuotaAndPolicy( + quota_allowance=allowance, + policy_name=allocation_policy.config_key(), + ) if not can_run: + rejection_quota_and_policy = _QuotaAndPolicy( + quota_allowance=allowance, + policy_name=allocation_policy.config_key(), + ) break allowance_dicts = { @@ -847,6 +893,12 @@ def _apply_allocation_policies_quota( for key, quota_allowance in quota_allowances.items() } stats["quota_allowance"] = allowance_dicts + summary: dict[str, Any] = {} + summary["threads_used"] = num_threads + _add_quota_info(summary, _REJECTED_BY, rejection_quota_and_policy) + _add_quota_info(summary, _THROTTLED_BY, throttle_quota_and_policy) + stats["quota_allowance"]["summary"] = summary + if not can_run: raise AllocationPolicyViolations.from_args(quota_allowances) # Before allocation policies were a thing, the query pipeline would apply diff --git a/tests/migrations/autogeneration/test_diff.py b/tests/migrations/autogeneration/test_diff.py new file mode 100644 index 00000000000..689a3cd45a9 --- /dev/null +++ b/tests/migrations/autogeneration/test_diff.py @@ -0,0 +1,166 @@ +import pytest + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations.autogeneration.diff import generate_migration_ops +from snuba.migrations.columns import MigrationModifiers +from snuba.migrations.operations import AddColumn, DropColumn, OperationTarget +from snuba.utils.schemas import Column, DateTime, UInt + + +def mockstoragewithcolumns(cols: list[str]) -> str: + colstr = ",\n ".join([s for s in cols]) + return f""" +version: v1 +kind: writable_storage +name: errors +storage: + key: errors + set_key: events +readiness_state: complete +schema: + columns: + [ + {colstr} + ] + local_table_name: errors_local + dist_table_name: errors_dist + partition_format: + - retention_days + - date + not_deleted_mandatory_condition: deleted +local_table_name: errors_local +dist_table_name: errors_dist +""" + + +def test_add_column() -> None: + cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: event_id, type: UUID }", + ] + new_cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: newcol1, type: DateTime }", + "{ name: event_id, type: UUID }", + "{ name: newcol2, type: UInt, args: { schema_modifiers: [nullable], size: 8 } }", + ] + forwardops, backwardsops = generate_migration_ops( + mockstoragewithcolumns(cols), mockstoragewithcolumns(new_cols) + ) + expected_forward = [ + AddColumn( + storage_set=StorageSetKey("events"), + table_name="errors_local", + column=Column("newcol1", DateTime()), + after="timestamp", + target=OperationTarget.LOCAL, + ), + AddColumn( + storage_set=StorageSetKey("events"), + table_name="errors_dist", + column=Column("newcol1", DateTime()), + after="timestamp", + target=OperationTarget.DISTRIBUTED, + ), + AddColumn( + storage_set=StorageSetKey("events"), + table_name="errors_local", + column=Column( + "newcol2", UInt(size=8, modifiers=MigrationModifiers(nullable=True)) + ), + after="event_id", + target=OperationTarget.LOCAL, + ), + AddColumn( + storage_set=StorageSetKey("events"), + table_name="errors_dist", + column=Column( + "newcol2", UInt(size=8, modifiers=MigrationModifiers(nullable=True)) + ), + after="event_id", + target=OperationTarget.DISTRIBUTED, + ), + ] + expected_backwards = [ + DropColumn( + storage_set=StorageSetKey("events"), + table_name="errors_dist", + column_name="newcol2", + target=OperationTarget.DISTRIBUTED, + ), + DropColumn( + storage_set=StorageSetKey("events"), + table_name="errors_local", + column_name="newcol2", + target=OperationTarget.LOCAL, + ), + DropColumn( + storage_set=StorageSetKey("events"), + table_name="errors_dist", + column_name="newcol1", + target=OperationTarget.DISTRIBUTED, + ), + DropColumn( + storage_set=StorageSetKey("events"), + table_name="errors_local", + column_name="newcol1", + target=OperationTarget.LOCAL, + ), + ] + assert forwardops == expected_forward and backwardsops == expected_backwards + + +def test_modify_column() -> None: + cols = [ + "{ name: timestamp, type: DateTime }", + ] + new_cols = [ + "{ name: timestamp, type: UUID }", + ] + with pytest.raises( + ValueError, + match="Modification to columns in unsupported, column 'timestamp' was modified or reordered", + ): + generate_migration_ops( + mockstoragewithcolumns(cols), + mockstoragewithcolumns(new_cols), + ) + + +def test_reorder_columns() -> None: + cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + ] + new_cols = [ + "{ name: timestamp, type: DateTime }", + "{ name: project_id, type: UInt, args: { size: 64 } }", + ] + with pytest.raises( + ValueError, + match="Modification to columns in unsupported, column 'timestamp' was modified or reordered", + ): + generate_migration_ops( + mockstoragewithcolumns(cols), + mockstoragewithcolumns(new_cols), + ) + + +def test_delete_column() -> None: + cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: event_id, type: UUID }", + ] + new_cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: newcol1, type: DateTime }", + ] + with pytest.raises(ValueError, match="Column removal is not supported"): + generate_migration_ops( + mockstoragewithcolumns(cols), + mockstoragewithcolumns(new_cols), + ) diff --git a/tests/migrations/test_autogeneration.py b/tests/migrations/autogeneration/test_ui.py similarity index 87% rename from tests/migrations/test_autogeneration.py rename to tests/migrations/autogeneration/test_ui.py index e1f89c7d95c..afc7645baee 100644 --- a/tests/migrations/test_autogeneration.py +++ b/tests/migrations/autogeneration/test_ui.py @@ -1,10 +1,10 @@ import os import subprocess -from snuba.migrations.autogeneration import generate +from snuba.migrations.autogeneration.main import get_working_and_head -def test_basic() -> None: +def test_get_working_and_head() -> None: dir = "/tmp/kylesfakerepo987636" fname = "fakestorage.yaml" @@ -41,6 +41,6 @@ def test_basic() -> None: f.write("goodbye world") # make sure HEAD and curr version looks right - old_storage, new_storage = generate(os.path.join(dir, fname)) - assert old_storage == "hello world\n" + new_storage, old_storage = get_working_and_head(os.path.join(dir, fname)) assert new_storage == "hello world\ngoodbye world" + assert old_storage == "hello world\n" diff --git a/tests/web/test_db_query.py b/tests/web/test_db_query.py index 047df0da376..ea4202a3cc6 100644 --- a/tests/web/test_db_query.py +++ b/tests/web/test_db_query.py @@ -265,9 +265,18 @@ def test_db_query_success() -> None: robust=False, ) - print(stats["quota_allowance"]) - assert stats["quota_allowance"] == { + "summary": { + "threads_used": 5, + "rejected_by": {}, + "throttled_by": { + "policy": "BytesScannedRejectingPolicy", + "quota_used": 1560000000000, + "quota_unit": "bytes", + "suggestion": "scan less bytes", + "throttle_threshold": 1280000000000, + }, + }, "ReferrerGuardRailPolicy": { "can_run": True, "max_threads": 10, @@ -491,6 +500,23 @@ def _update_quota_balance( robust=False, ) assert stats["quota_allowance"] == { + "summary": { + "threads_used": 0, + "rejected_by": { + "policy": "RejectAllocationPolicy", + "rejection_threshold": MAX_THRESHOLD, + "quota_used": 0, + "quota_unit": NO_UNITS, + "suggestion": NO_SUGGESTION, + }, + "throttled_by": { + "policy": "RejectAllocationPolicy", + "throttle_threshold": MAX_THRESHOLD, + "quota_used": 0, + "quota_unit": NO_UNITS, + "suggestion": NO_SUGGESTION, + }, + }, "RejectAllocationPolicy": { "can_run": False, "explanation": { @@ -504,7 +530,7 @@ def _update_quota_balance( "rejection_threshold": MAX_THRESHOLD, "suggestion": NO_SUGGESTION, "throttle_threshold": MAX_THRESHOLD, - } + }, } # extra data contains policy failure information assert ( @@ -712,6 +738,17 @@ def _run_query() -> None: _run_query() assert e.value.extra["stats"]["quota_allowance"] == { + "summary": { + "threads_used": 0, + "rejected_by": { + "policy": "CountQueryPolicy", + "rejection_threshold": MAX_QUERIES_TO_RUN, + "quota_used": queries_run, + "quota_unit": "queries", + "suggestion": "scan less concurrent queries", + }, + "throttled_by": {}, + }, "CountQueryPolicy": { "can_run": False, "max_threads": 0,