Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into enocht/support-grou…
Browse files Browse the repository at this point in the history
…pby-formula-join
  • Loading branch information
enochtangg committed Jun 28, 2024
2 parents fad89e6 + 1ccd0e3 commit f52392f
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 210 deletions.
22 changes: 14 additions & 8 deletions snuba/cli/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,19 @@ def add_node(

@migrations.command()
@click.argument("storage_path", type=str)
def generate(
storage_path: str,
) -> None:
@click.option("--name", type=str, help="optional name for the migration")
def generate(storage_path: str, name: Optional[str] = None) -> None:
"""
Given a path to modified storage yaml definition (inside of snuba repo),
creates a snuba migration for the given modifications.
The migration will be written into the local directory. The user is responsible for making
Given a path to user-modified storage.yaml definition (inside snuba/datasets/configuration/*/storages/*.py),
and an optional name for the migration,
generates a snuba migration based on the schema modifications to the storage.yaml.
Currently only column addition is supported.
The migration is generated based on the diff between HEAD and working dir. Therefore modifications to the
storage should be uncommitted in the working dir.
The generated migration will be written into the local directory. The user is responsible for making
the commit, PR, and merging.
"""
expected_pattern = r"(.+/)?snuba/datasets/configuration/.*/storages/.*\.(yml|yaml)"
Expand All @@ -399,5 +405,5 @@ def generate(
f"Storage path {storage_path} does not match expected pattern {expected_pattern}"
)

autogeneration.generate(storage_path)
click.echo("This function is under construction.")
path = autogeneration.generate(storage_path, migration_name=name)
click.echo(f"Migration successfully generated at {path}")
98 changes: 68 additions & 30 deletions snuba/migrations/autogeneration/diff.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import cast

import yaml
from typing import Any, Sequence, cast

from snuba.clusters.storage_sets import StorageSetKey
from snuba.datasets.configuration.utils import parse_columns
Expand All @@ -13,13 +11,27 @@
"""


def generate_migration_ops(
oldstorage: str, newstorage: str
def generate_python_migration(
oldstorage: dict[str, Any], newstorage: dict[str, Any]
) -> str:
"""
Input:
2 storage.yaml files in yaml.safe_load format.
These representing the diff of a modified storage.yaml i.e. original and modified
Generates and returns a python migration based on the changes.
"""
forwards, backwards = _storage_diff_to_migration_ops(oldstorage, newstorage)
return _migration_ops_to_migration(forwards, backwards)


def _storage_diff_to_migration_ops(
oldstorage: dict[str, Any], newstorage: dict[str, Any]
) -> tuple[list[AddColumn], list[DropColumn]]:
"""
Input:
old_storage, the original storage yaml in str format
new_storage, the modified storage yaml in str format
old_storage, the original storage yaml in yaml.safe_load format
new_storage, the modified storage yaml in yaml.safe_load format
Returns a tuple (forwardops, backwardsops) this are the forward and backward migration
operations required to migrate the storage as described in the given yaml files.
Expand All @@ -30,29 +42,26 @@ def generate_migration_ops(
if not valid:
raise ValueError(reason)

oldcol_names = set(
col["name"] for col in yaml.safe_load(oldstorage)["schema"]["columns"]
)
newstorage_dict = yaml.safe_load(newstorage)
newcols = newstorage_dict["schema"]["columns"]
oldcol_names = set(col["name"] for col in oldstorage["schema"]["columns"])
newcols = newstorage["schema"]["columns"]

forwardops: list[AddColumn] = []
for i, col in enumerate(newcols):
if col["name"] not in oldcol_names:
column = _schema_column_to_migration_column(parse_columns([col])[0])
after = newcols[i - 1]["name"]
storage_set = StorageSetKey(newstorage_dict["storage"]["set_key"])
storage_set = StorageSetKey(newstorage["storage"]["set_key"])
forwardops += [
AddColumn(
storage_set=storage_set,
table_name=newstorage_dict["schema"]["local_table_name"],
table_name=newstorage["schema"]["local_table_name"],
column=column,
after=after,
target=OperationTarget.LOCAL,
),
AddColumn(
storage_set=storage_set,
table_name=newstorage_dict["schema"]["dist_table_name"],
table_name=newstorage["schema"]["dist_table_name"],
column=column,
after=after,
target=OperationTarget.DISTRIBUTED,
Expand All @@ -61,34 +70,63 @@ def generate_migration_ops(
return (forwardops, [op.get_reverse() for op in reversed(forwardops)])


def _is_valid_add_column(oldstorage: str, newstorage: str) -> tuple[bool, str]:
def _migration_ops_to_migration(
forwards_ops: Sequence[AddColumn],
backwards_ops: Sequence[DropColumn],
) -> str:
"""
Given a lists of forward and backwards ops, returns a python class
definition for the migration as a str. The migration must be non-blocking.
"""
return f"""
from typing import Sequence
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations.columns import MigrationModifiers
from snuba.migrations.migration import ClickhouseNodeMigration
from snuba.migrations.operations import AddColumn, DropColumn, OperationTarget, SqlOperation
from snuba.utils import schemas
from snuba.utils.schemas import Column
class Migration(ClickhouseNodeMigration):
blocking = False
def forwards_ops(self) -> Sequence[SqlOperation]:
return {repr(forwards_ops)}
def backwards_ops(self) -> Sequence[SqlOperation]:
return {repr(backwards_ops)}
"""


def _is_valid_add_column(
oldstorage: dict[str, Any], newstorage: dict[str, Any]
) -> tuple[bool, str]:
"""
Input:
old_storage, the old storage yaml in str format
new_storage, the modified storage yaml in str format
old_storage, the old storage yaml in yaml.safe_load format
new_storage, the new (modified) storage yaml in yaml.safe_load format
Returns true if the changes to the storage is valid column addition, false otherwise,
along with a reasoning.
"""
oldstorage_dict = yaml.safe_load(oldstorage)
newstorage_dict = yaml.safe_load(newstorage)
if oldstorage_dict == newstorage_dict:
if oldstorage == newstorage:
return True, "storages are the same"

# nothing changed but the columns
t1 = oldstorage_dict["schema"].pop("columns")
t2 = newstorage_dict["schema"].pop("columns")
if not (oldstorage_dict == newstorage_dict):
# verify nothing changed but the columns
t1 = oldstorage["schema"].pop("columns")
t2 = newstorage["schema"].pop("columns")
if not (oldstorage == newstorage):
return (
False,
"Expected the only change to the storage to be the columns, but that is not true",
)
oldstorage_dict["schema"]["columns"] = t1
newstorage_dict["schema"]["columns"] = t2
oldstorage["schema"]["columns"] = t1
newstorage["schema"]["columns"] = t2

# only changes to columns is additions
oldstorage_cols = oldstorage_dict["schema"]["columns"]
newstorage_cols = newstorage_dict["schema"]["columns"]
# verify only changes to columns is additions
oldstorage_cols = oldstorage["schema"]["columns"]
newstorage_cols = newstorage["schema"]["columns"]

colnames_old = set(e["name"] for e in oldstorage_cols)
colnames_new = set(e["name"] for e in newstorage_cols)
Expand Down
68 changes: 64 additions & 4 deletions snuba/migrations/autogeneration/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import os
import subprocess
from typing import Optional

from snuba.migrations.autogeneration.diff import generate_migration_ops
from black import Mode, format_str # type: ignore
from yaml import safe_load

from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import group_loader
from snuba.migrations.autogeneration.diff import generate_python_migration

def generate(storage_path: str) -> None:

def generate(storage_path: str, migration_name: Optional[str] = None) -> str:
# load into memory the given storage and the version of it at HEAD
new_storage, old_storage = get_working_and_head(storage_path)
tmpnew, tmpold = get_working_and_head(storage_path)
new_storage = safe_load(tmpnew)
old_storage = safe_load(tmpold)

# generate the migration operations
generate_migration_ops(old_storage, new_storage)
migration = generate_python_migration(old_storage, new_storage)

# write the migration and return the path
return write_migration(
migration,
StorageSetKey(new_storage["storage"]["set_key"]),
migration_name if migration_name else "generated_migration",
)


def get_working_and_head(path: str) -> tuple[str, str]:
Expand Down Expand Up @@ -52,3 +67,48 @@ def get_working_and_head(path: str) -> tuple[str, str]:
working_file = f.read()

return (working_file, head_file)


def write_migration(
migration: str,
storage_set: StorageSetKey,
migration_name: str,
) -> str:
"""
Input:
migration - python migration file (see snuba/snuba_migrations/*/000x_*.py for examples)
storage_set - the key of the storage-set you are writing the migration for
Writes the given migration to a new file at the correct place in the repo
(which is determined by storage-set key), and adds a reference to the new migration
in the group loader (snuba/group_loader/migrations.py)
"""
# make sure storage_set migration path exist
path = "snuba/snuba_migrations/" + storage_set.value
if not os.path.exists(path):
raise ValueError(
f"Migration path: '{path}' does not exist, perhaps '{storage_set.value}' is not a valid storage set key?"
)

# grab the group_loader for the storage set
group_loader_name = (
"".join([word.capitalize() for word in storage_set.value.split("_")]) + "Loader"
)
loader = getattr(group_loader, group_loader_name)()
assert isinstance(loader, group_loader.GroupLoader)

# get the next migration number
existing_migrations = loader.get_migrations()
if not existing_migrations:
nextnum = 0
nextnum = int(existing_migrations[-1].split("_")[0]) + 1

# write migration to file
newpath = f"{path}/{str(nextnum).zfill(4)}_{migration_name}.py"
if os.path.exists(newpath):
# this should never happen, but just in case
raise ValueError(
f"Error: The migration number {nextnum} was larger than the last migration in the group loader '{group_loader_name}', but the migration already exists"
)
with open(newpath, "w") as f:
f.write(format_str(migration, mode=Mode()))
return newpath
9 changes: 9 additions & 0 deletions snuba/migrations/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class OperationTarget(Enum):
DISTRIBUTED = "distributed"
UNSET = "unset" # target is not set. will throw an error if executed

def __repr__(self) -> str:
return f"OperationTarget.{self.value.upper()}"


class SqlOperation(ABC):
def __init__(
Expand Down Expand Up @@ -287,6 +290,9 @@ def get_reverse(self) -> DropColumn:
target=self.target,
)

def __repr__(self) -> str:
return f"AddColumn(storage_set={repr(self.storage_set)}, table_name={repr(self.table_name)}, column={repr(self.column)}, after={repr(self.__after)}, target={repr(self.target)})"


class DropColumn(SqlOperation):
"""
Expand Down Expand Up @@ -315,6 +321,9 @@ def format_sql(self) -> str:
f"ALTER TABLE {self.table_name} DROP COLUMN IF EXISTS {self.column_name};"
)

def __repr__(self) -> str:
return f"DropColumn(storage_set={repr(self.storage_set)}, table_name={repr(self.table_name)}, column_name={repr(self.column_name)}, target={repr(self.target)})"


class ModifyColumn(SqlOperation):
"""
Expand Down
11 changes: 10 additions & 1 deletion snuba/utils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,16 @@ def __init__(self, modifiers: Optional[TModifiers] = None):
self.__modifiers = modifiers

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._repr_content()})[{self.__modifiers}]"
# return f"{self.__class__.__name__}({self._repr_content()})[{self.__modifiers}]"
repr_content = self._repr_content()
if repr_content:
return "schemas.{}({}, modifiers={})".format(
self.__class__.__name__, repr_content, repr(self.__modifiers)
)
else:
return "schemas.{}(modifiers={})".format(
self.__class__.__name__, repr(self.__modifiers)
)

def _repr_content(self) -> str:
"""
Expand Down
Loading

0 comments on commit f52392f

Please sign in to comment.