Skip to content

Commit

Permalink
Migration for new State Enum
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Dec 18, 2024
1 parent c84760c commit f6a351b
Showing 1 changed file with 144 additions and 53 deletions.
197 changes: 144 additions & 53 deletions migrations/versions/2024-12-16-1605_c7648694325f_update_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from sqlalchemy.orm import Session
from enum import Enum as PyEnum
from nlds_processors.monitor.monitor_models import SubRecord
from itertools import chain

# revision identifiers, used by Alembic.
revision = "c7648694325f"
down_revision = "f1f20ae58def"
branch_labels = None
depends_on = None


class OldState(PyEnum):
# Generic states
INITIALISING = -1
Expand All @@ -42,6 +44,7 @@ class OldState(PyEnum):
# Shared ARCHIVE states
CATALOG_ARCHIVE_ROLLBACK = 40


class NewState(PyEnum):
# Generic states
INITIALISING = -1
Expand Down Expand Up @@ -73,29 +76,74 @@ class NewState(PyEnum):
# Initial state for searching for sub-states
SEARCHING = 1000

# A map from the old values of State to the new ones, including doubling up on

# A map from the old values of State to the new ones, including doubling up on
# the catalog_restoring
state_map = {
OldState.INITIALISING: NewState.INITIALISING,
OldState.ROUTING: NewState.ROUTING,
OldState.COMPLETE: NewState.COMPLETE,
OldState.FAILED: NewState.FAILED,
# PUT workflow states
OldState.SPLITTING: NewState.SPLITTING,
OldState.INDEXING: NewState.INDEXING,
OldState.CATALOG_PUTTING: NewState.CATALOG_PUTTING,
OldState.TRANSFER_PUTTING: NewState.TRANSFER_PUTTING,
OldState.CATALOG_ROLLBACK: NewState.CATALOG_REMOVING,
# GET workflow states
OldState.CATALOG_GETTING: NewState.CATALOG_GETTING,
OldState.ARCHIVE_GETTING: NewState.ARCHIVE_GETTING,
OldState.TRANSFER_GETTING: NewState.TRANSFER_GETTING,
# DEL workflow states
# ARCHIVE_PUT workflow states
OldState.ARCHIVE_INIT: NewState.ARCHIVE_INIT,
OldState.CATALOG_ARCHIVE_AGGREGATING: NewState.TRANSFER_INIT,
OldState.ARCHIVE_PUTTING: NewState.ARCHIVE_PUTTING,
OldState.CATALOG_ARCHIVE_UPDATING: NewState.CATALOG_ARCHIVE_UPDATING,
# Shared ARCHIVE states
OldState.CATALOG_ARCHIVE_ROLLBACK: NewState.CATALOG_ARCHIVE_UPDATING,
}

# reverse mapping of above for downgrade
reverse_map = {
NewState.INITIALISING: OldState.INITIALISING,
NewState.ROUTING: OldState.ROUTING,
# PUT workflow states
NewState.SPLITTING: OldState.SPLITTING,
NewState.INDEXING: OldState.INDEXING,
NewState.CATALOG_PUTTING: OldState.CATALOG_PUTTING,
NewState.TRANSFER_PUTTING: OldState.TRANSFER_PUTTING,
# GET workflow states
NewState.CATALOG_GETTING: OldState.CATALOG_GETTING,
NewState.ARCHIVE_GETTING: OldState.ARCHIVE_GETTING,
NewState.TRANSFER_GETTING: OldState.TRANSFER_GETTING,
NewState.TRANSFER_INIT: OldState.INITIALISING,
NewState.ARCHIVE_INIT: OldState.ARCHIVE_INIT,
NewState.ARCHIVE_PUTTING: OldState.ARCHIVE_PUTTING,
NewState.ARCHIVE_PREPARING: OldState.ARCHIVE_GETTING,
# CATALOG manipulation workflow states
NewState.CATALOG_DELETING: OldState.CATALOG_ROLLBACK,
NewState.CATALOG_UPDATING: OldState.CATALOG_ARCHIVE_UPDATING,
NewState.CATALOG_ARCHIVE_UPDATING: OldState.CATALOG_ARCHIVE_UPDATING,
NewState.CATALOG_REMOVING: OldState.CATALOG_ROLLBACK,
# Complete states
NewState.COMPLETE: OldState.COMPLETE,
NewState.FAILED: OldState.FAILED,
NewState.COMPLETE_WITH_ERRORS: OldState.COMPLETE,
NewState.COMPLETE_WITH_WARNINGS: OldState.COMPLETE,
# Initial state for searching for sub-states
NewState.SEARCHING: OldState.INITIALISING
}

# Create ENUM types, one for old, one for new, one for temp
old_enum = sa.Enum(OldState, name='state')
new_enum = sa.Enum(NewState, name='state')
# clash CATALOG_ARCHIVE_AGGREGATING = 21 (old),
# ARCHIVE_PUTTING = 21 (new)
# clash ARCHIVE_PUTTING = 22 (old),
# ARCHIVE_PREPARING = 22 (new),

# Create ENUM types, one for old, one for new
old_enum = sa.Enum(OldState, name="oldstate")
new_enum = sa.Enum(NewState, name="newstate")


def upgrade(engine_name: str) -> None:
globals()["upgrade_%s" % engine_name]()
Expand All @@ -118,65 +166,108 @@ def downgrade_catalog() -> None:


def upgrade_monitor() -> None:
# create new enum
new_enum.create(op.get_bind(), checkfirst=False)
# add new_state column
with op.batch_alter_table("sub_record") as bop:
# Change the column type from enum to string
bop.alter_column("state", existing_server_default=None,
existing_nullable=False, type_=sa.String(length=32))
bop.add_column(
sa.Column(
"new_state",
sa.Enum(NewState),
nullable=False,
server_default=NewState.INITIALISING.name,
default=NewState.INITIALISING.name,
),
)

# Get the session to update the database values with
# Get the session to update the database values with
session = Session(bind=op.get_bind())
# Loop through each of the states in the map from old states to new states.
for old_state, new_state in state_map.items():
sub_records = session.query(SubRecord).filter(
SubRecord.state.name == old_state.name
).all()
# Assing the state to the new value
for sr in sub_records:
sr.state = new_state

# Remove the old enum type and create the new one
old_enum.drop(op.get_bind(), checkfirst=False)
new_enum.create(op.get_bind(), checkfirst=False)

# Loop through each of the states in the map from old states to new states.
for os in OldState:
# cannot use ORM here as it will use the model definition of SubRecord.State,
# which does not include all of the old enum values - revert to SQL
ns = state_map[os]
cr = session.execute(
sa.text(
f"UPDATE sub_record SET new_state='{ns.name}' WHERE state='{os.name}'"
)
)
# new_state now exists in each row with the correct enum value. we now have to
# 1. Delete the old state field
# 2. Delete the old enum
# 3. Rename the new enum
# 4. Rename the new state field

with op.batch_alter_table("sub_record") as bop:
bop.drop_column("state")
# Remove the old enum type
op.execute(sa.text("DROP TYPE state"))

# Rename the new enum
session.execute(sa.text("ALTER TYPE newstate RENAME to state"))
with op.batch_alter_table("sub_record") as bop:
# Change the column type back to enum, but use the new one
bop.alter_column("state", existing_server_default=None,
existing_nullable=False, type_=new_enum,
postgresql_using='state::text::state')
# Commit the changes to the db.
bop.alter_column(
"new_state",
existing_server_default=None,
existing_nullable=False,
type="state",
new_column_name="state",
)

# Commit the changes to the db.
session.commit()


def downgrade_monitor() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# create old enum
old_enum.create(op.get_bind(), checkfirst=False)
# add old_state column
with op.batch_alter_table("sub_record") as bop:
# Change the column type from enum to string
bop.alter_column("state", existing_server_default=None,
existing_nullable=False, type_=sa.String(length=32))
op.alter_column

# Get the session to update the database values with
bop.add_column(
sa.Column(
"old_state",
sa.Enum(OldState),
nullable=False,
server_default=OldState.INITIALISING.name,
default=OldState.INITIALISING.name,
),
)
# Get the session to update the database values with
session = Session(bind=op.get_bind())
# Loop through each of the states in the map from old states to new states.
for old_state, new_state in state_map.items():
sub_records = session.query(SubRecord).filter(
SubRecord.state.name == new_state.name
).all()
# Assing the state to the old value.
for sr in sub_records:
sr.state = old_state

# Remove the old enum type and create the new one
new_enum.drop(op.get_bind(), checkfirst=False)
old_enum.create(op.get_bind(), checkfirst=False)

# Change the column type to the new ENUM

# Loop through each of the states in the map from new states to old states.
for ns in NewState:
# cannot use ORM here as it will use the model definition of SubRecord.State,
# which does not include all of the old enum values - revert to SQL
# reverse dictionary lookup
os = reverse_map[ns]
cr = session.execute(
sa.text(
f"UPDATE sub_record SET old_state='{os.name}' WHERE state='{ns.name}'"
)
)
# new_state now exists in each row with the correct enum value. we now have to
# 1. Delete the state field
# 2. Delete the enum
# 3. Rename the old enum
# 4. Rename the old state field

with op.batch_alter_table("sub_record") as bop:
# Change the column type back to the old enum
bop.alter_column("state", existing_server_default=None,
existing_nullable=False, type_=old_enum,
postgresql_using='state::text::state')

# Commit the changes to the db.
bop.drop_column("state")
# Remove the old enum type
op.execute(sa.text("DROP TYPE state"))

# Rename the old enum
session.execute(sa.text("ALTER TYPE oldstate RENAME to state"))
with op.batch_alter_table("sub_record") as bop:
bop.alter_column(
"old_state",
existing_server_default=None,
existing_nullable=False,
type="state",
new_column_name="state",
)

# Commit the changes to the db.
session.commit()
# ### end Alembic commands ###

0 comments on commit f6a351b

Please sign in to comment.