Skip to content

Commit

Permalink
Merge branch 'dev' into feature/poc-cache-study-dao
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle authored Oct 11, 2024
2 parents ec33230 + 8bf9ab3 commit e1e1607
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 15 deletions.
199 changes: 199 additions & 0 deletions alembic/versions/490b80a84bb5_add_cascade_delete_for_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""add_cascade_delete_for_sqlite
Revision ID: 490b80a84bb5
Revises: c0c4aaf84861
Create Date: 2024-10-11 11:38:45.108227
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '490b80a84bb5'
down_revision = 'c0c4aaf84861'
branch_labels = None
depends_on = None


def upgrade():
_migrate(upgrade=True)

def downgrade():
_migrate(upgrade=False)


def _migrate(upgrade: bool):
# Use on_cascade=DELETE to avoid foreign keys issues in SQLite.
# As it doesn't support dropping foreign keys, we have to do the migration ourselves.
# https://www.sqlite.org/lang_altertable.html#otheralter
# 1 - Create table with the right columns
# 2 - Copy all the data from the old table inside the new one
# 3 - Remove the old table
# 4 - Rename the new table to have the old name

dialect_name: str = op.get_context().dialect.name
if dialect_name == "postgresql":
return

# =============================
# STUDY_ADDITIONAL_DATA
# =============================

op.create_table('study_additional_data_copy',
sa.Column('study_id', sa.String(length=36), nullable=False),
sa.Column('author', sa.String(length=255), nullable=True),
sa.Column('horizon', sa.String(), nullable=True),
sa.Column('patch', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['study_id'], ['study.id'], ondelete='CASCADE' if upgrade else None),
sa.PrimaryKeyConstraint('study_id')
)
bind = op.get_bind()
content = bind.execute("SELECT * FROM study_additional_data")
for row in content:
bind.execute(
"INSERT INTO study_additional_data_copy (study_id, author, horizon, patch) VALUES (?,?,?,?)",
(row[0], row[1], row[2], row[3])
)
op.drop_table("study_additional_data")
op.rename_table("study_additional_data_copy", "study_additional_data")

# =============================
# RAW_METADATA
# =============================

op.create_table('rawstudycopy',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('content_status', sa.Enum('VALID', 'WARNING', 'ERROR', name='studycontentstatus'),
nullable=True),
sa.Column('workspace', sa.String(length=255), nullable=False),
sa.Column('missing', sa.String(length=255), nullable=True),
sa.ForeignKeyConstraint(['id'], ['study.id'], ondelete='CASCADE' if upgrade else None),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table("rawstudycopy", schema=None) as batch_op:
if upgrade:
batch_op.create_index(batch_op.f("ix_rawstudycopy_missing"), ["missing"], unique=False)
batch_op.create_index(batch_op.f("ix_rawstudycopy_workspace"), ["workspace"], unique=False)
else:
batch_op.drop_index(batch_op.f("ix_rawstudycopy_missing"))
batch_op.drop_index(batch_op.f("ix_rawstudycopy_workspace"))

bind = op.get_bind()
content = bind.execute("SELECT * FROM rawstudy")
for row in content:
bind.execute(
"INSERT INTO rawstudycopy (id, content_status, workspace, missing) VALUES (?,?,?,?)",
(row[0], row[1], row[2], row[3])
)
op.drop_table("rawstudy")
op.rename_table("rawstudycopy", "rawstudy")

# =============================
# COMMAND BLOCK
# =============================

op.create_table(
"commandblock_copy",
sa.Column("id", sa.String(length=36), nullable=False),
sa.Column("study_id", sa.String(length=36), nullable=True),
sa.Column("block_index", sa.Integer(), nullable=True),
sa.Column("command", sa.String(length=255), nullable=True),
sa.Column("version", sa.Integer(), nullable=True),
sa.Column("args", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["study_id"],
["variantstudy.id"],
ondelete="CASCADE" if upgrade else None
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("id"),
)
bind = op.get_bind()
content = bind.execute("SELECT * FROM commandblock")
for row in content:
bind.execute(
"INSERT INTO commandblock_copy (id, study_id, block_index, command, version, args) VALUES (?,?,?,?,?,?)",
(row[0], row[1], row[2], row[3], row[4], row[5])
)
op.alter_column(table_name="commandblock_copy", column_name="block_index", new_column_name="index")
op.drop_table("commandblock")
op.rename_table("commandblock_copy", "commandblock")

# =============================
# VARIANT STUDY SNAPSHOT
# =============================

op.create_table(
"variant_study_snapshot_copy",
sa.Column("id", sa.String(length=36), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column('last_executed_command', sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["id"],
["variantstudy.id"],
ondelete="CASCADE" if upgrade else None
),
sa.PrimaryKeyConstraint("id"),
)
bind = op.get_bind()
content = bind.execute("SELECT * FROM variant_study_snapshot")
for row in content:
bind.execute(
"INSERT INTO variant_study_snapshot_copy (id, created_at, last_executed_command) VALUES (?,?,?)",
(row[0], row[1], row[2])
)
op.drop_table("variant_study_snapshot")
op.rename_table("variant_study_snapshot_copy", "variant_study_snapshot")

# =============================
# VARIANT STUDY
# =============================

op.create_table(
"variantstudy_copy",
sa.Column("id", sa.String(length=36), nullable=False),
sa.Column('generation_task', sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["id"],
["study.id"],
ondelete="CASCADE" if upgrade else None
),
sa.PrimaryKeyConstraint("id"),
)
bind = op.get_bind()
content = bind.execute("SELECT * FROM variantstudy")
for row in content:
bind.execute(
"INSERT INTO variantstudy_copy (id, generation_task) VALUES (?,?)",
(row[0], row[1])
)
op.drop_table("variantstudy")
op.rename_table("variantstudy_copy", "variantstudy")

# =============================
# GROUP METADATA
# =============================

op.create_table('groupmetadatacopy',
sa.Column('group_id', sa.String(length=36), nullable=False),
sa.Column('study_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['group_id'], ['groups.id'], ondelete="CASCADE" if upgrade else None),
sa.ForeignKeyConstraint(['study_id'], ['study.id'], ondelete="CASCADE" if upgrade else None)
)
with op.batch_alter_table("groupmetadatacopy", schema=None) as batch_op:
if upgrade:
batch_op.create_index(batch_op.f("ix_groupmetadatacopy_group_id"), ["group_id"], unique=False)
batch_op.create_index(batch_op.f("ix_groupmetadatacopy_study_id"), ["study_id"], unique=False)
else:
batch_op.drop_index(batch_op.f("ix_groupmetadatacopy_group_id"))
batch_op.drop_index(batch_op.f("ix_groupmetadatacopy_study_id"))
bind = op.get_bind()
content = bind.execute("SELECT * FROM group_metadata")
for row in content:
bind.execute(
"INSERT INTO groupmetadatacopy (group_id, study_id) VALUES (?,?)",
(row[0], row[1])
)
op.drop_table("group_metadata")
op.rename_table("groupmetadatacopy", "group_metadata")

23 changes: 11 additions & 12 deletions antarest/study/storage/variantstudy/variant_study_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
from uuid import uuid4

import humanize
from fastapi import HTTPException
from filelock import FileLock

Expand Down Expand Up @@ -1056,13 +1057,13 @@ def initialize_additional_data(self, variant_study: VariantStudy) -> bool:
)
return False

def clear_all_snapshots(self, retention_hours: timedelta, params: t.Optional[RequestParameters] = None) -> str:
def clear_all_snapshots(self, retention_time: timedelta, params: t.Optional[RequestParameters] = None) -> str:
"""
Admin command that clear all variant snapshots older than `retention_hours` (in hours).
Only available for admin users.
Args:
retention_hours: number of retention hours
retention_time: number of retention hours
params: request parameters used to identify the user status
Returns: None
Expand All @@ -1072,11 +1073,9 @@ def clear_all_snapshots(self, retention_hours: timedelta, params: t.Optional[Req
if params is None or (params.user and not params.user.is_site_admin() and not params.user.is_admin_token()):
raise UserHasNotPermissionError()

task_name = f"Cleaning all snapshot updated or accessed at least {retention_hours} hours ago."
task_name = f"Cleaning all snapshot updated or accessed at least {humanize.precisedelta(retention_time)} ago."

snapshot_clearing_task_instance = SnapshotCleanerTask(
variant_study_service=self, retention_hours=retention_hours
)
snapshot_clearing_task_instance = SnapshotCleanerTask(variant_study_service=self, retention_time=retention_time)

return self.task_service.add_task(
snapshot_clearing_task_instance,
Expand All @@ -1092,10 +1091,10 @@ class SnapshotCleanerTask:
def __init__(
self,
variant_study_service: VariantStudyService,
retention_hours: timedelta,
retention_time: timedelta,
) -> None:
self._variant_study_service = variant_study_service
self._retention_hours = retention_hours
self._retention_time = retention_time

def _clear_all_snapshots(self) -> None:
with db():
Expand All @@ -1106,15 +1105,15 @@ def _clear_all_snapshots(self) -> None:
)
)
for variant in variant_list:
if variant.updated_at and variant.updated_at < datetime.utcnow() - self._retention_hours:
if variant.last_access and variant.last_access < datetime.utcnow() - self._retention_hours:
if variant.updated_at and variant.updated_at < datetime.utcnow() - self._retention_time:
if variant.last_access and variant.last_access < datetime.utcnow() - self._retention_time:
self._variant_study_service.clear_snapshot(variant)

def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult:
msg = f"Start cleaning all snapshots updated or accessed {self._retention_hours} hours ago."
msg = f"Start cleaning all snapshots updated or accessed {humanize.precisedelta(self._retention_time)} ago."
notifier(msg)
self._clear_all_snapshots()
msg = f"All selected snapshots were successfully cleared."
msg = "All selected snapshots were successfully cleared."
notifier(msg)
return TaskResult(success=True, message=msg)

Expand Down
7 changes: 4 additions & 3 deletions antarest/study/web/variant_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import logging
from typing import List, Optional, Union

import humanize
from fastapi import APIRouter, Body, Depends

from antarest.core.config import Config
Expand Down Expand Up @@ -431,15 +432,15 @@ def clear_variant_snapshots(
current_user: JWTUser = Depends(auth.get_current_user),
) -> str:
"""
Endpoint that clear `limit` hours old and older variant snapshots.
Endpoint that clear snapshots of variant which were updated or accessed `hours` hours ago.
Args: limit (int, optional): Number of hours to clear. Defaults to 24.
Args: hours (int, optional): Number of hours to clear. Defaults to 24.
Returns: ID of the task running the snapshot clearing.
"""
retention_hours = datetime.timedelta(hours=hours)
logger.info(
f"Delete all variant snapshots older than {retention_hours} hours.",
f"Delete all variant snapshots older than {humanize.precisedelta(retention_hours)}.",
extra={"user": current_user.id},
)
params = RequestParameters(user=current_user)
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ click~=8.0.3
contextvars~=2.4
filelock~=3.4.2
gunicorn~=20.1.0
humanize~=4.10.0; python_version <= '3.8'
humanize~=4.11.0; python_version > '3.8'
jsonref~=0.2
PyJWT~=2.9.0
MarkupSafe~=2.0.1
Expand Down

0 comments on commit e1e1607

Please sign in to comment.