Skip to content

Commit

Permalink
Fix migration script for dimensions-config v5 (DM-43116)
Browse files Browse the repository at this point in the history
Need to alter many more tables than I thought.
  • Loading branch information
andy-slac committed Mar 1, 2024
1 parent c59f974 commit 001239f
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions migrations/dimensions-config/2a8a32e1bec3.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,81 @@ def _update_config(config: dict) -> dict:

return config

if context.is_offline_mode():
raise RuntimeError("This script does not support offline mode")

mig_context = context.get_context()

# When we use schemas in postgres then all tables belong to the same schema
# so we can use alembic's version_table_schema to see where everything goes
schema = mig_context.version_table_schema

# Change table column type.
# New column type.
new_type = sqlalchemy.String(size)
_LOG.info("Alter instrument.name column type to %s", new_type)
op.alter_column("instrument", "name", type_=new_type, schema=schema)

# Table/column to alter.
table_columns = _columns_to_migrate(schema)

# Lock everything in advance to avoid potential deadlock.
_lock_tables([item[0] for item in table_columns], schema)

# Actual schema change.
for table, column in table_columns:
_LOG.info("Alter %s.%s column type to %s", table, column, new_type)
op.alter_column(table, column, type_=new_type, schema=schema)

# Update attributes
assert mig_context.bind is not None
attributes = ButlerAttributes(mig_context.bind, schema)
attributes.update_dimensions_json(_update_config)


def _columns_to_migrate(schema: str) -> list[tuple[str, str]]:
"""Return list of table and column names that will be migrated."""
result: list[tuple[str, str]] = []

pk_table = "instrument"
pk_column: str

inspector = sqlalchemy.inspect(op.get_bind())
table_names = list(inspector.get_table_names(schema))

for table in table_names:
if table == pk_table:
# Instrument table will be the first in the list, in case the
# order can matter. Its PK column name is "name".
pk = inspector.get_pk_constraint(table, schema)
assert len(pk["constrained_columns"]) == 1, "Expect single column in PK"
pk_column = pk["constrained_columns"][0]
result.append((table, pk_column))
_LOG.debug("found %s table, PK column = %s", table, pk_column)
break
else:
raise ValueError(f"Cannot find {pk_table} table in the schema")

for table in table_names:
# Check FK of each table to find ones that reference instrument.
fks = inspector.get_foreign_keys(table, schema)
for fk in fks:
if fk["referred_schema"] == schema and fk["referred_table"] == pk_table:
if len(fk["referred_columns"]) == 1 and fk["referred_columns"][0] == pk_column:
fk_column = fk["constrained_columns"][0]
result.append((table, fk_column))
_LOG.debug("found dependent table: %s.%s", table, fk_column)
break

return result


def _lock_tables(tables: list[str], schema: str) -> None:
"""Lock all tables that need to be migrated to avoid conflicts."""

connection = op.get_bind()
for table in tables:
# We do not need quoting for schema/table names.
if schema:
query = f"LOCK TABLE {schema}.{table} IN EXCLUSIVE MODE"
else:
query = f"LOCK TABLE {table} IN EXCLUSIVE MODE"
_LOG.info("Locking table %s", table)
connection.execute(sqlalchemy.text(query))

0 comments on commit 001239f

Please sign in to comment.