diff --git a/snuba/migrations/group_loader.py b/snuba/migrations/group_loader.py index ef464378eb..dd184b683a 100644 --- a/snuba/migrations/group_loader.py +++ b/snuba/migrations/group_loader.py @@ -318,6 +318,9 @@ def get_migrations(self) -> Sequence[str]: "0027_sets_add_raw_tags_column", "0028_distributions_add_indexed_tags_column", "0029_add_use_case_id_index", + "0030_add_record_meta_column", + "0031_counters_meta_table", + "0032_counters_meta_table_mv", ] diff --git a/snuba/snuba_migrations/generic_metrics/0030_add_record_meta_column.py b/snuba/snuba_migrations/generic_metrics/0030_add_record_meta_column.py new file mode 100644 index 0000000000..53fd2120ea --- /dev/null +++ b/snuba/snuba_migrations/generic_metrics/0030_add_record_meta_column.py @@ -0,0 +1,33 @@ +from typing import Sequence + +from snuba.clickhouse.columns import Column, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + local_table_name = "generic_metric_counters_raw_local" + storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.AddColumn( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + column=Column("record_meta", UInt(8, Modifiers(default=str("0")))), + target=operations.OperationTarget.LOCAL, + after="materialization_version", + ), + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.DropColumn( + column_name="record_meta", + storage_set=self.storage_set_key, + table_name=self.local_table_name, + target=operations.OperationTarget.LOCAL, + ), + ] diff --git a/snuba/snuba_migrations/generic_metrics/0031_counters_meta_table.py b/snuba/snuba_migrations/generic_metrics/0031_counters_meta_table.py new file mode 100644 index 0000000000..397066fc78 --- /dev/null +++ b/snuba/snuba_migrations/generic_metrics/0031_counters_meta_table.py @@ -0,0 +1,68 @@ +from typing import Sequence + +from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations, table_engines +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget +from snuba.utils.schemas import Float + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + granularity = "2048" + local_table_name = "generic_metric_counters_meta_aggregated_local" + dist_table_name = "generic_metric_counters_meta_aggregated_dist" + storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS + columns: Sequence[Column[Modifiers]] = [ + Column("org_id", UInt(64)), + Column("project_id", UInt(64)), + Column("use_case_id", String(Modifiers(low_cardinality=True))), + Column("metric_id", UInt(64)), + Column("tag_key", String()), + Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))), + Column("retention_days", UInt(16)), + Column("tag_values", AggregateFunction("groupUniqArray", [String()])), + Column("count", AggregateFunction("sum", [Float(64)])), + ] + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.CreateTable( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + engine=table_engines.AggregatingMergeTree( + storage_set=self.storage_set_key, + order_by="(org_id, project_id, use_case_id, metric_id, tag_key, timestamp)", + primary_key="(org_id, project_id, use_case_id, metric_id, tag_key, timestamp)", + partition_by="(retention_days, toMonday(timestamp))", + settings={"index_granularity": self.granularity}, + ttl="timestamp + toIntervalDay(retention_days)", + ), + columns=self.columns, + target=OperationTarget.LOCAL, + ), + operations.CreateTable( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + engine=table_engines.Distributed( + local_table_name=self.local_table_name, sharding_key=None + ), + columns=self.columns, + target=OperationTarget.DISTRIBUTED, + ), + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.dist_table_name, + target=OperationTarget.DISTRIBUTED, + ), + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.local_table_name, + target=OperationTarget.LOCAL, + ), + ] diff --git a/snuba/snuba_migrations/generic_metrics/0032_counters_meta_table_mv.py b/snuba/snuba_migrations/generic_metrics/0032_counters_meta_table_mv.py new file mode 100644 index 0000000000..a49291bdb5 --- /dev/null +++ b/snuba/snuba_migrations/generic_metrics/0032_counters_meta_table_mv.py @@ -0,0 +1,70 @@ +from typing import Sequence + +from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget +from snuba.utils.schemas import Float + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + view_name = "generic_metric_counters_meta_aggregation_mv" + dest_table_name = "generic_metric_counters_meta_aggregated_local" + dest_table_columns: Sequence[Column[Modifiers]] = [ + Column("org_id", UInt(64)), + Column("project_id", UInt(64)), + Column("use_case_id", String(Modifiers(low_cardinality=True))), + Column("metric_id", UInt(64)), + Column("tag_key", String()), + Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))), + Column("retention_days", UInt(16)), + Column("tag_values", AggregateFunction("groupUniqArray", [String()])), + Column("value", AggregateFunction("sum", [Float(64)])), + ] + storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS + + def forwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.CreateMaterializedView( + storage_set=self.storage_set_key, + view_name=self.view_name, + columns=self.dest_table_columns, + destination_table_name=self.dest_table_name, + target=OperationTarget.LOCAL, + query=""" + SELECT + org_id, + project_id, + use_case_id, + metric_id, + tag_key, + toStartOfWeek(timestamp) as timestamp, + retention_days, + groupUniqArrayState(tag_value) as `tag_values`, + sumState(count_value) as count + FROM generic_metric_counters_raw_local + ARRAY JOIN + tags.key AS tag_key, tags.raw_value AS tag_value + WHERE record_meta = 1 + GROUP BY + org_id, + project_id, + use_case_id, + metric_id, + tag_key, + timestamp, + retention_days + """, + ), + ] + + def backwards_ops(self) -> Sequence[operations.SqlOperation]: + return [ + operations.DropTable( + storage_set=self.storage_set_key, + table_name=self.view_name, + target=OperationTarget.LOCAL, + ) + ]