diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 19e4147ba2..359148c189 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -384,13 +384,19 @@ def add_node( @migrations.command() @click.argument("storage_path", type=str) -def generate( - storage_path: str, -) -> None: +@click.option("--name", type=str, help="optional name for the migration") +def generate(storage_path: str, name: Optional[str] = None) -> None: """ - Given a path to modified storage yaml definition (inside of snuba repo), - creates a snuba migration for the given modifications. - The migration will be written into the local directory. The user is responsible for making + Given a path to user-modified storage.yaml definition (inside snuba/datasets/configuration/*/storages/*.py), + and an optional name for the migration, + generates a snuba migration based on the schema modifications to the storage.yaml. + + Currently only column addition is supported. + + The migration is generated based on the diff between HEAD and working dir. Therefore modifications to the + storage should be uncommitted in the working dir. + + The generated migration will be written into the local directory. The user is responsible for making the commit, PR, and merging. """ expected_pattern = r"(.+/)?snuba/datasets/configuration/.*/storages/.*\.(yml|yaml)" @@ -399,5 +405,5 @@ def generate( f"Storage path {storage_path} does not match expected pattern {expected_pattern}" ) - autogeneration.generate(storage_path) - click.echo("This function is under construction.") + path = autogeneration.generate(storage_path, migration_name=name) + click.echo(f"Migration successfully generated at {path}") diff --git a/snuba/migrations/autogeneration/diff.py b/snuba/migrations/autogeneration/diff.py index 3931ae6024..ed9147f4ad 100644 --- a/snuba/migrations/autogeneration/diff.py +++ b/snuba/migrations/autogeneration/diff.py @@ -1,6 +1,4 @@ -from typing import cast - -import yaml +from typing import Any, Sequence, cast from snuba.clusters.storage_sets import StorageSetKey from snuba.datasets.configuration.utils import parse_columns @@ -13,13 +11,27 @@ """ -def generate_migration_ops( - oldstorage: str, newstorage: str +def generate_python_migration( + oldstorage: dict[str, Any], newstorage: dict[str, Any] +) -> str: + """ + Input: + 2 storage.yaml files in yaml.safe_load format. + These representing the diff of a modified storage.yaml i.e. original and modified + + Generates and returns a python migration based on the changes. + """ + forwards, backwards = _storage_diff_to_migration_ops(oldstorage, newstorage) + return _migration_ops_to_migration(forwards, backwards) + + +def _storage_diff_to_migration_ops( + oldstorage: dict[str, Any], newstorage: dict[str, Any] ) -> tuple[list[AddColumn], list[DropColumn]]: """ Input: - old_storage, the original storage yaml in str format - new_storage, the modified storage yaml in str format + old_storage, the original storage yaml in yaml.safe_load format + new_storage, the modified storage yaml in yaml.safe_load format Returns a tuple (forwardops, backwardsops) this are the forward and backward migration operations required to migrate the storage as described in the given yaml files. @@ -30,29 +42,26 @@ def generate_migration_ops( if not valid: raise ValueError(reason) - oldcol_names = set( - col["name"] for col in yaml.safe_load(oldstorage)["schema"]["columns"] - ) - newstorage_dict = yaml.safe_load(newstorage) - newcols = newstorage_dict["schema"]["columns"] + oldcol_names = set(col["name"] for col in oldstorage["schema"]["columns"]) + newcols = newstorage["schema"]["columns"] forwardops: list[AddColumn] = [] for i, col in enumerate(newcols): if col["name"] not in oldcol_names: column = _schema_column_to_migration_column(parse_columns([col])[0]) after = newcols[i - 1]["name"] - storage_set = StorageSetKey(newstorage_dict["storage"]["set_key"]) + storage_set = StorageSetKey(newstorage["storage"]["set_key"]) forwardops += [ AddColumn( storage_set=storage_set, - table_name=newstorage_dict["schema"]["local_table_name"], + table_name=newstorage["schema"]["local_table_name"], column=column, after=after, target=OperationTarget.LOCAL, ), AddColumn( storage_set=storage_set, - table_name=newstorage_dict["schema"]["dist_table_name"], + table_name=newstorage["schema"]["dist_table_name"], column=column, after=after, target=OperationTarget.DISTRIBUTED, @@ -61,34 +70,63 @@ def generate_migration_ops( return (forwardops, [op.get_reverse() for op in reversed(forwardops)]) -def _is_valid_add_column(oldstorage: str, newstorage: str) -> tuple[bool, str]: +def _migration_ops_to_migration( + forwards_ops: Sequence[AddColumn], + backwards_ops: Sequence[DropColumn], +) -> str: + """ + Given a lists of forward and backwards ops, returns a python class + definition for the migration as a str. The migration must be non-blocking. + """ + return f""" +from typing import Sequence + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations.columns import MigrationModifiers +from snuba.migrations.migration import ClickhouseNodeMigration +from snuba.migrations.operations import AddColumn, DropColumn, OperationTarget, SqlOperation +from snuba.utils import schemas +from snuba.utils.schemas import Column + +class Migration(ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + return {repr(forwards_ops)} + + def backwards_ops(self) -> Sequence[SqlOperation]: + return {repr(backwards_ops)} +""" + + +def _is_valid_add_column( + oldstorage: dict[str, Any], newstorage: dict[str, Any] +) -> tuple[bool, str]: """ Input: - old_storage, the old storage yaml in str format - new_storage, the modified storage yaml in str format + old_storage, the old storage yaml in yaml.safe_load format + new_storage, the new (modified) storage yaml in yaml.safe_load format Returns true if the changes to the storage is valid column addition, false otherwise, along with a reasoning. """ - oldstorage_dict = yaml.safe_load(oldstorage) - newstorage_dict = yaml.safe_load(newstorage) - if oldstorage_dict == newstorage_dict: + if oldstorage == newstorage: return True, "storages are the same" - # nothing changed but the columns - t1 = oldstorage_dict["schema"].pop("columns") - t2 = newstorage_dict["schema"].pop("columns") - if not (oldstorage_dict == newstorage_dict): + # verify nothing changed but the columns + t1 = oldstorage["schema"].pop("columns") + t2 = newstorage["schema"].pop("columns") + if not (oldstorage == newstorage): return ( False, "Expected the only change to the storage to be the columns, but that is not true", ) - oldstorage_dict["schema"]["columns"] = t1 - newstorage_dict["schema"]["columns"] = t2 + oldstorage["schema"]["columns"] = t1 + newstorage["schema"]["columns"] = t2 - # only changes to columns is additions - oldstorage_cols = oldstorage_dict["schema"]["columns"] - newstorage_cols = newstorage_dict["schema"]["columns"] + # verify only changes to columns is additions + oldstorage_cols = oldstorage["schema"]["columns"] + newstorage_cols = newstorage["schema"]["columns"] colnames_old = set(e["name"] for e in oldstorage_cols) colnames_new = set(e["name"] for e in newstorage_cols) diff --git a/snuba/migrations/autogeneration/main.py b/snuba/migrations/autogeneration/main.py index 1be2456951..6d0aefb952 100644 --- a/snuba/migrations/autogeneration/main.py +++ b/snuba/migrations/autogeneration/main.py @@ -1,15 +1,30 @@ import os import subprocess +from typing import Optional -from snuba.migrations.autogeneration.diff import generate_migration_ops +from black import Mode, format_str # type: ignore +from yaml import safe_load +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import group_loader +from snuba.migrations.autogeneration.diff import generate_python_migration -def generate(storage_path: str) -> None: + +def generate(storage_path: str, migration_name: Optional[str] = None) -> str: # load into memory the given storage and the version of it at HEAD - new_storage, old_storage = get_working_and_head(storage_path) + tmpnew, tmpold = get_working_and_head(storage_path) + new_storage = safe_load(tmpnew) + old_storage = safe_load(tmpold) # generate the migration operations - generate_migration_ops(old_storage, new_storage) + migration = generate_python_migration(old_storage, new_storage) + + # write the migration and return the path + return write_migration( + migration, + StorageSetKey(new_storage["storage"]["set_key"]), + migration_name if migration_name else "generated_migration", + ) def get_working_and_head(path: str) -> tuple[str, str]: @@ -52,3 +67,48 @@ def get_working_and_head(path: str) -> tuple[str, str]: working_file = f.read() return (working_file, head_file) + + +def write_migration( + migration: str, + storage_set: StorageSetKey, + migration_name: str, +) -> str: + """ + Input: + migration - python migration file (see snuba/snuba_migrations/*/000x_*.py for examples) + storage_set - the key of the storage-set you are writing the migration for + Writes the given migration to a new file at the correct place in the repo + (which is determined by storage-set key), and adds a reference to the new migration + in the group loader (snuba/group_loader/migrations.py) + """ + # make sure storage_set migration path exist + path = "snuba/snuba_migrations/" + storage_set.value + if not os.path.exists(path): + raise ValueError( + f"Migration path: '{path}' does not exist, perhaps '{storage_set.value}' is not a valid storage set key?" + ) + + # grab the group_loader for the storage set + group_loader_name = ( + "".join([word.capitalize() for word in storage_set.value.split("_")]) + "Loader" + ) + loader = getattr(group_loader, group_loader_name)() + assert isinstance(loader, group_loader.GroupLoader) + + # get the next migration number + existing_migrations = loader.get_migrations() + if not existing_migrations: + nextnum = 0 + nextnum = int(existing_migrations[-1].split("_")[0]) + 1 + + # write migration to file + newpath = f"{path}/{str(nextnum).zfill(4)}_{migration_name}.py" + if os.path.exists(newpath): + # this should never happen, but just in case + raise ValueError( + f"Error: The migration number {nextnum} was larger than the last migration in the group loader '{group_loader_name}', but the migration already exists" + ) + with open(newpath, "w") as f: + f.write(format_str(migration, mode=Mode())) + return newpath diff --git a/snuba/migrations/operations.py b/snuba/migrations/operations.py index b36e601f3e..e0c0cfc1c6 100644 --- a/snuba/migrations/operations.py +++ b/snuba/migrations/operations.py @@ -31,6 +31,9 @@ class OperationTarget(Enum): DISTRIBUTED = "distributed" UNSET = "unset" # target is not set. will throw an error if executed + def __repr__(self) -> str: + return f"OperationTarget.{self.value.upper()}" + class SqlOperation(ABC): def __init__( @@ -287,6 +290,9 @@ def get_reverse(self) -> DropColumn: target=self.target, ) + def __repr__(self) -> str: + return f"AddColumn(storage_set={repr(self.storage_set)}, table_name={repr(self.table_name)}, column={repr(self.column)}, after={repr(self.__after)}, target={repr(self.target)})" + class DropColumn(SqlOperation): """ @@ -315,6 +321,9 @@ def format_sql(self) -> str: f"ALTER TABLE {self.table_name} DROP COLUMN IF EXISTS {self.column_name};" ) + def __repr__(self) -> str: + return f"DropColumn(storage_set={repr(self.storage_set)}, table_name={repr(self.table_name)}, column_name={repr(self.column_name)}, target={repr(self.target)})" + class ModifyColumn(SqlOperation): """ diff --git a/snuba/utils/schemas.py b/snuba/utils/schemas.py index 55c3578641..eae57fc268 100644 --- a/snuba/utils/schemas.py +++ b/snuba/utils/schemas.py @@ -86,7 +86,16 @@ def __init__(self, modifiers: Optional[TModifiers] = None): self.__modifiers = modifiers def __repr__(self) -> str: - return f"{self.__class__.__name__}({self._repr_content()})[{self.__modifiers}]" + # return f"{self.__class__.__name__}({self._repr_content()})[{self.__modifiers}]" + repr_content = self._repr_content() + if repr_content: + return "schemas.{}({}, modifiers={})".format( + self.__class__.__name__, repr_content, repr(self.__modifiers) + ) + else: + return "schemas.{}(modifiers={})".format( + self.__class__.__name__, repr(self.__modifiers) + ) def _repr_content(self) -> str: """ diff --git a/tests/migrations/autogeneration/test_diff.py b/tests/migrations/autogeneration/test_diff.py deleted file mode 100644 index 689a3cd45a..0000000000 --- a/tests/migrations/autogeneration/test_diff.py +++ /dev/null @@ -1,166 +0,0 @@ -import pytest - -from snuba.clusters.storage_sets import StorageSetKey -from snuba.migrations.autogeneration.diff import generate_migration_ops -from snuba.migrations.columns import MigrationModifiers -from snuba.migrations.operations import AddColumn, DropColumn, OperationTarget -from snuba.utils.schemas import Column, DateTime, UInt - - -def mockstoragewithcolumns(cols: list[str]) -> str: - colstr = ",\n ".join([s for s in cols]) - return f""" -version: v1 -kind: writable_storage -name: errors -storage: - key: errors - set_key: events -readiness_state: complete -schema: - columns: - [ - {colstr} - ] - local_table_name: errors_local - dist_table_name: errors_dist - partition_format: - - retention_days - - date - not_deleted_mandatory_condition: deleted -local_table_name: errors_local -dist_table_name: errors_dist -""" - - -def test_add_column() -> None: - cols = [ - "{ name: project_id, type: UInt, args: { size: 64 } }", - "{ name: timestamp, type: DateTime }", - "{ name: event_id, type: UUID }", - ] - new_cols = [ - "{ name: project_id, type: UInt, args: { size: 64 } }", - "{ name: timestamp, type: DateTime }", - "{ name: newcol1, type: DateTime }", - "{ name: event_id, type: UUID }", - "{ name: newcol2, type: UInt, args: { schema_modifiers: [nullable], size: 8 } }", - ] - forwardops, backwardsops = generate_migration_ops( - mockstoragewithcolumns(cols), mockstoragewithcolumns(new_cols) - ) - expected_forward = [ - AddColumn( - storage_set=StorageSetKey("events"), - table_name="errors_local", - column=Column("newcol1", DateTime()), - after="timestamp", - target=OperationTarget.LOCAL, - ), - AddColumn( - storage_set=StorageSetKey("events"), - table_name="errors_dist", - column=Column("newcol1", DateTime()), - after="timestamp", - target=OperationTarget.DISTRIBUTED, - ), - AddColumn( - storage_set=StorageSetKey("events"), - table_name="errors_local", - column=Column( - "newcol2", UInt(size=8, modifiers=MigrationModifiers(nullable=True)) - ), - after="event_id", - target=OperationTarget.LOCAL, - ), - AddColumn( - storage_set=StorageSetKey("events"), - table_name="errors_dist", - column=Column( - "newcol2", UInt(size=8, modifiers=MigrationModifiers(nullable=True)) - ), - after="event_id", - target=OperationTarget.DISTRIBUTED, - ), - ] - expected_backwards = [ - DropColumn( - storage_set=StorageSetKey("events"), - table_name="errors_dist", - column_name="newcol2", - target=OperationTarget.DISTRIBUTED, - ), - DropColumn( - storage_set=StorageSetKey("events"), - table_name="errors_local", - column_name="newcol2", - target=OperationTarget.LOCAL, - ), - DropColumn( - storage_set=StorageSetKey("events"), - table_name="errors_dist", - column_name="newcol1", - target=OperationTarget.DISTRIBUTED, - ), - DropColumn( - storage_set=StorageSetKey("events"), - table_name="errors_local", - column_name="newcol1", - target=OperationTarget.LOCAL, - ), - ] - assert forwardops == expected_forward and backwardsops == expected_backwards - - -def test_modify_column() -> None: - cols = [ - "{ name: timestamp, type: DateTime }", - ] - new_cols = [ - "{ name: timestamp, type: UUID }", - ] - with pytest.raises( - ValueError, - match="Modification to columns in unsupported, column 'timestamp' was modified or reordered", - ): - generate_migration_ops( - mockstoragewithcolumns(cols), - mockstoragewithcolumns(new_cols), - ) - - -def test_reorder_columns() -> None: - cols = [ - "{ name: project_id, type: UInt, args: { size: 64 } }", - "{ name: timestamp, type: DateTime }", - ] - new_cols = [ - "{ name: timestamp, type: DateTime }", - "{ name: project_id, type: UInt, args: { size: 64 } }", - ] - with pytest.raises( - ValueError, - match="Modification to columns in unsupported, column 'timestamp' was modified or reordered", - ): - generate_migration_ops( - mockstoragewithcolumns(cols), - mockstoragewithcolumns(new_cols), - ) - - -def test_delete_column() -> None: - cols = [ - "{ name: project_id, type: UInt, args: { size: 64 } }", - "{ name: timestamp, type: DateTime }", - "{ name: event_id, type: UUID }", - ] - new_cols = [ - "{ name: project_id, type: UInt, args: { size: 64 } }", - "{ name: timestamp, type: DateTime }", - "{ name: newcol1, type: DateTime }", - ] - with pytest.raises(ValueError, match="Column removal is not supported"): - generate_migration_ops( - mockstoragewithcolumns(cols), - mockstoragewithcolumns(new_cols), - ) diff --git a/tests/migrations/autogeneration/test_generate_python_migration.py b/tests/migrations/autogeneration/test_generate_python_migration.py new file mode 100644 index 0000000000..9ab7e4987b --- /dev/null +++ b/tests/migrations/autogeneration/test_generate_python_migration.py @@ -0,0 +1,206 @@ +from typing import Any + +import pytest +import yaml +from black import Mode, format_str # type: ignore + +from snuba.migrations.autogeneration.diff import generate_python_migration + + +def mockstoragewithcolumns(cols: list[str]) -> Any: + colstr = ",\n ".join([s for s in cols]) + storage = f""" +version: v1 +kind: writable_storage +name: errors +storage: + key: errors + set_key: events +readiness_state: complete +schema: + columns: + [ + {colstr} + ] + local_table_name: errors_local + dist_table_name: errors_dist + partition_format: + - retention_days + - date + not_deleted_mandatory_condition: deleted +local_table_name: errors_local +dist_table_name: errors_dist +""" + return yaml.safe_load(storage) + + +def test_add_column() -> None: + cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: event_id, type: UUID }", + ] + new_cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: newcol1, type: DateTime }", + "{ name: event_id, type: UUID }", + "{ name: newcol2, type: UInt, args: { schema_modifiers: [nullable], size: 8 } }", + ] + expected_migration = """ +from typing import Sequence + +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations.columns import MigrationModifiers +from snuba.migrations.migration import ClickhouseNodeMigration +from snuba.migrations.operations import AddColumn, DropColumn, OperationTarget, SqlOperation +from snuba.utils import schemas +from snuba.utils.schemas import Column + + +class Migration(ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + return [ + AddColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_local", + column=Column("newcol1", schemas.DateTime(modifiers=None)), + after="timestamp", + target=OperationTarget.LOCAL, + ), + AddColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_dist", + column=Column("newcol1", schemas.DateTime(modifiers=None)), + after="timestamp", + target=OperationTarget.DISTRIBUTED, + ), + AddColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_local", + column=Column( + "newcol2", + schemas.UInt( + 8, + modifiers=MigrationModifiers( + nullable=True, + low_cardinality=False, + default=None, + materialized=None, + codecs=None, + ttl=None, + ), + ), + ), + after="event_id", + target=OperationTarget.LOCAL, + ), + AddColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_dist", + column=Column( + "newcol2", + schemas.UInt( + 8, + modifiers=MigrationModifiers( + nullable=True, + low_cardinality=False, + default=None, + materialized=None, + codecs=None, + ttl=None, + ), + ), + ), + after="event_id", + target=OperationTarget.DISTRIBUTED, + ), + ] + + def backwards_ops(self) -> Sequence[SqlOperation]: + return [ + DropColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_dist", + column_name="newcol2", + target=OperationTarget.DISTRIBUTED, + ), + DropColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_local", + column_name="newcol2", + target=OperationTarget.LOCAL, + ), + DropColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_dist", + column_name="newcol1", + target=OperationTarget.DISTRIBUTED, + ), + DropColumn( + storage_set=StorageSetKey.EVENTS, + table_name="errors_local", + column_name="newcol1", + target=OperationTarget.LOCAL, + ), + ] +""" + migration = generate_python_migration( + mockstoragewithcolumns(cols), mockstoragewithcolumns(new_cols) + ) + assert format_str(migration, mode=Mode()) == format_str( + expected_migration, mode=Mode() + ) + + +def test_modify_column() -> None: + cols = [ + "{ name: timestamp, type: DateTime }", + ] + new_cols = [ + "{ name: timestamp, type: UUID }", + ] + with pytest.raises( + ValueError, + match="Modification to columns in unsupported, column 'timestamp' was modified or reordered", + ): + generate_python_migration( + mockstoragewithcolumns(cols), mockstoragewithcolumns(new_cols) + ) + + +def test_reorder_columns() -> None: + cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + ] + new_cols = [ + "{ name: timestamp, type: DateTime }", + "{ name: project_id, type: UInt, args: { size: 64 } }", + ] + with pytest.raises( + ValueError, + match="Modification to columns in unsupported, column 'timestamp' was modified or reordered", + ): + generate_python_migration( + mockstoragewithcolumns(cols), mockstoragewithcolumns(new_cols) + ) + + +def test_delete_column() -> None: + cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: event_id, type: UUID }", + ] + new_cols = [ + "{ name: project_id, type: UInt, args: { size: 64 } }", + "{ name: timestamp, type: DateTime }", + "{ name: newcol1, type: DateTime }", + ] + with pytest.raises(ValueError, match="Column removal is not supported"): + generate_python_migration( + mockstoragewithcolumns(cols), mockstoragewithcolumns(new_cols) + ) diff --git a/tests/migrations/autogeneration/test_ui.py b/tests/migrations/autogeneration/test_ui.py index afc7645bae..bf387d9311 100644 --- a/tests/migrations/autogeneration/test_ui.py +++ b/tests/migrations/autogeneration/test_ui.py @@ -1,7 +1,9 @@ import os import subprocess +from glob import glob -from snuba.migrations.autogeneration.main import get_working_and_head +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations.autogeneration.main import get_working_and_head, write_migration def test_get_working_and_head() -> None: @@ -44,3 +46,17 @@ def test_get_working_and_head() -> None: new_storage, old_storage = get_working_and_head(os.path.join(dir, fname)) assert new_storage == "hello world\ngoodbye world" assert old_storage == "hello world\n" + + +def test_write_migration() -> None: + content = "bllop" + name = "kyles_migration" + write_migration(content, StorageSetKey.EVENTS, name) + written_migration = sorted( + glob(f"snuba/snuba_migrations/events/[0-9][0-9][0-9][0-9]_{name}.py") + )[0] + try: + with open(written_migration) as f: + assert f.read().strip() == content + finally: + os.remove(written_migration)