From f6a351b17f31b8136f1fdec162051da561f47ff5 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Wed, 18 Dec 2024 16:00:34 +0000 Subject: [PATCH] Migration for new State Enum --- ...4-12-16-1605_c7648694325f_update_states.py | 197 +++++++++++++----- 1 file changed, 144 insertions(+), 53 deletions(-) diff --git a/migrations/versions/2024-12-16-1605_c7648694325f_update_states.py b/migrations/versions/2024-12-16-1605_c7648694325f_update_states.py index 8bb2b84..e9e55e0 100644 --- a/migrations/versions/2024-12-16-1605_c7648694325f_update_states.py +++ b/migrations/versions/2024-12-16-1605_c7648694325f_update_states.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import Session from enum import Enum as PyEnum from nlds_processors.monitor.monitor_models import SubRecord +from itertools import chain # revision identifiers, used by Alembic. revision = "c7648694325f" @@ -18,6 +19,7 @@ branch_labels = None depends_on = None + class OldState(PyEnum): # Generic states INITIALISING = -1 @@ -42,6 +44,7 @@ class OldState(PyEnum): # Shared ARCHIVE states CATALOG_ARCHIVE_ROLLBACK = 40 + class NewState(PyEnum): # Generic states INITIALISING = -1 @@ -73,29 +76,74 @@ class NewState(PyEnum): # Initial state for searching for sub-states SEARCHING = 1000 -# A map from the old values of State to the new ones, including doubling up on + +# A map from the old values of State to the new ones, including doubling up on # the catalog_restoring state_map = { OldState.INITIALISING: NewState.INITIALISING, OldState.ROUTING: NewState.ROUTING, OldState.COMPLETE: NewState.COMPLETE, OldState.FAILED: NewState.FAILED, + # PUT workflow states OldState.SPLITTING: NewState.SPLITTING, OldState.INDEXING: NewState.INDEXING, OldState.CATALOG_PUTTING: NewState.CATALOG_PUTTING, OldState.TRANSFER_PUTTING: NewState.TRANSFER_PUTTING, OldState.CATALOG_ROLLBACK: NewState.CATALOG_REMOVING, + # GET workflow states OldState.CATALOG_GETTING: NewState.CATALOG_GETTING, OldState.ARCHIVE_GETTING: NewState.ARCHIVE_GETTING, OldState.TRANSFER_GETTING: NewState.TRANSFER_GETTING, + # DEL workflow states + # ARCHIVE_PUT workflow states OldState.ARCHIVE_INIT: NewState.ARCHIVE_INIT, + OldState.CATALOG_ARCHIVE_AGGREGATING: NewState.TRANSFER_INIT, OldState.ARCHIVE_PUTTING: NewState.ARCHIVE_PUTTING, OldState.CATALOG_ARCHIVE_UPDATING: NewState.CATALOG_ARCHIVE_UPDATING, + # Shared ARCHIVE states + OldState.CATALOG_ARCHIVE_ROLLBACK: NewState.CATALOG_ARCHIVE_UPDATING, +} + +# reverse mapping of above for downgrade +reverse_map = { + NewState.INITIALISING: OldState.INITIALISING, + NewState.ROUTING: OldState.ROUTING, + # PUT workflow states + NewState.SPLITTING: OldState.SPLITTING, + NewState.INDEXING: OldState.INDEXING, + NewState.CATALOG_PUTTING: OldState.CATALOG_PUTTING, + NewState.TRANSFER_PUTTING: OldState.TRANSFER_PUTTING, + # GET workflow states + NewState.CATALOG_GETTING: OldState.CATALOG_GETTING, + NewState.ARCHIVE_GETTING: OldState.ARCHIVE_GETTING, + NewState.TRANSFER_GETTING: OldState.TRANSFER_GETTING, + NewState.TRANSFER_INIT: OldState.INITIALISING, + NewState.ARCHIVE_INIT: OldState.ARCHIVE_INIT, + NewState.ARCHIVE_PUTTING: OldState.ARCHIVE_PUTTING, + NewState.ARCHIVE_PREPARING: OldState.ARCHIVE_GETTING, + # CATALOG manipulation workflow states + NewState.CATALOG_DELETING: OldState.CATALOG_ROLLBACK, + NewState.CATALOG_UPDATING: OldState.CATALOG_ARCHIVE_UPDATING, + NewState.CATALOG_ARCHIVE_UPDATING: OldState.CATALOG_ARCHIVE_UPDATING, + NewState.CATALOG_REMOVING: OldState.CATALOG_ROLLBACK, + # Complete states + NewState.COMPLETE: OldState.COMPLETE, + NewState.FAILED: OldState.FAILED, + NewState.COMPLETE_WITH_ERRORS: OldState.COMPLETE, + NewState.COMPLETE_WITH_WARNINGS: OldState.COMPLETE, + # Initial state for searching for sub-states + NewState.SEARCHING: OldState.INITIALISING } -# Create ENUM types, one for old, one for new, one for temp -old_enum = sa.Enum(OldState, name='state') -new_enum = sa.Enum(NewState, name='state') +# clash CATALOG_ARCHIVE_AGGREGATING = 21 (old), +# ARCHIVE_PUTTING = 21 (new) +# clash ARCHIVE_PUTTING = 22 (old), +# ARCHIVE_PREPARING = 22 (new), + +# Create ENUM types, one for old, one for new +old_enum = sa.Enum(OldState, name="oldstate") +new_enum = sa.Enum(NewState, name="newstate") + def upgrade(engine_name: str) -> None: globals()["upgrade_%s" % engine_name]() @@ -118,65 +166,108 @@ def downgrade_catalog() -> None: def upgrade_monitor() -> None: + # create new enum + new_enum.create(op.get_bind(), checkfirst=False) + # add new_state column with op.batch_alter_table("sub_record") as bop: - # Change the column type from enum to string - bop.alter_column("state", existing_server_default=None, - existing_nullable=False, type_=sa.String(length=32)) + bop.add_column( + sa.Column( + "new_state", + sa.Enum(NewState), + nullable=False, + server_default=NewState.INITIALISING.name, + default=NewState.INITIALISING.name, + ), + ) - # Get the session to update the database values with + # Get the session to update the database values with session = Session(bind=op.get_bind()) - # Loop through each of the states in the map from old states to new states. - for old_state, new_state in state_map.items(): - sub_records = session.query(SubRecord).filter( - SubRecord.state.name == old_state.name - ).all() - # Assing the state to the new value - for sr in sub_records: - sr.state = new_state - - # Remove the old enum type and create the new one - old_enum.drop(op.get_bind(), checkfirst=False) - new_enum.create(op.get_bind(), checkfirst=False) + # Loop through each of the states in the map from old states to new states. + for os in OldState: + # cannot use ORM here as it will use the model definition of SubRecord.State, + # which does not include all of the old enum values - revert to SQL + ns = state_map[os] + cr = session.execute( + sa.text( + f"UPDATE sub_record SET new_state='{ns.name}' WHERE state='{os.name}'" + ) + ) + # new_state now exists in each row with the correct enum value. we now have to + # 1. Delete the old state field + # 2. Delete the old enum + # 3. Rename the new enum + # 4. Rename the new state field + + with op.batch_alter_table("sub_record") as bop: + bop.drop_column("state") + # Remove the old enum type + op.execute(sa.text("DROP TYPE state")) + + # Rename the new enum + session.execute(sa.text("ALTER TYPE newstate RENAME to state")) with op.batch_alter_table("sub_record") as bop: - # Change the column type back to enum, but use the new one - bop.alter_column("state", existing_server_default=None, - existing_nullable=False, type_=new_enum, - postgresql_using='state::text::state') - # Commit the changes to the db. + bop.alter_column( + "new_state", + existing_server_default=None, + existing_nullable=False, + type="state", + new_column_name="state", + ) + + # Commit the changes to the db. session.commit() def downgrade_monitor() -> None: - # ### commands auto generated by Alembic - please adjust! ### + # create old enum + old_enum.create(op.get_bind(), checkfirst=False) + # add old_state column with op.batch_alter_table("sub_record") as bop: - # Change the column type from enum to string - bop.alter_column("state", existing_server_default=None, - existing_nullable=False, type_=sa.String(length=32)) - op.alter_column - - # Get the session to update the database values with + bop.add_column( + sa.Column( + "old_state", + sa.Enum(OldState), + nullable=False, + server_default=OldState.INITIALISING.name, + default=OldState.INITIALISING.name, + ), + ) + # Get the session to update the database values with session = Session(bind=op.get_bind()) - # Loop through each of the states in the map from old states to new states. - for old_state, new_state in state_map.items(): - sub_records = session.query(SubRecord).filter( - SubRecord.state.name == new_state.name - ).all() - # Assing the state to the old value. - for sr in sub_records: - sr.state = old_state - - # Remove the old enum type and create the new one - new_enum.drop(op.get_bind(), checkfirst=False) - old_enum.create(op.get_bind(), checkfirst=False) - - # Change the column type to the new ENUM + + # Loop through each of the states in the map from new states to old states. + for ns in NewState: + # cannot use ORM here as it will use the model definition of SubRecord.State, + # which does not include all of the old enum values - revert to SQL + # reverse dictionary lookup + os = reverse_map[ns] + cr = session.execute( + sa.text( + f"UPDATE sub_record SET old_state='{os.name}' WHERE state='{ns.name}'" + ) + ) + # new_state now exists in each row with the correct enum value. we now have to + # 1. Delete the state field + # 2. Delete the enum + # 3. Rename the old enum + # 4. Rename the old state field + with op.batch_alter_table("sub_record") as bop: - # Change the column type back to the old enum - bop.alter_column("state", existing_server_default=None, - existing_nullable=False, type_=old_enum, - postgresql_using='state::text::state') - - # Commit the changes to the db. + bop.drop_column("state") + # Remove the old enum type + op.execute(sa.text("DROP TYPE state")) + + # Rename the old enum + session.execute(sa.text("ALTER TYPE oldstate RENAME to state")) + with op.batch_alter_table("sub_record") as bop: + bop.alter_column( + "old_state", + existing_server_default=None, + existing_nullable=False, + type="state", + new_column_name="state", + ) + + # Commit the changes to the db. session.commit() - # ### end Alembic commands ###