Skip to content

Commit

Permalink
Merge pull request #743 from AntaresSimulatorTeam/dev
Browse files Browse the repository at this point in the history
v2.3.0
  • Loading branch information
pl-buiquang authored Feb 22, 2022
2 parents 86f2529 + aeca946 commit cb0d9b3
Show file tree
Hide file tree
Showing 136 changed files with 6,200 additions and 2,110 deletions.
32 changes: 32 additions & 0 deletions alembic/versions/f5aed532a99c_add_snapshot_last_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""add_snapshot_last_command
Revision ID: f5aed532a99c
Revises: 23ff8deddb4a
Create Date: 2022-02-08 11:15:31.796081
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'f5aed532a99c'
down_revision = '23ff8deddb4a'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('variant_study_snapshot', schema=None) as batch_op:
batch_op.add_column(sa.Column('last_executed_command', sa.String(), nullable=True))

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('variant_study_snapshot', schema=None) as batch_op:
batch_op.drop_column('last_executed_command')

# ### end Alembic commands ###
2 changes: 1 addition & 1 deletion antarest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.2.3"
__version__ = "2.3.0"

from pathlib import Path

Expand Down
6 changes: 6 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ class DbConfig:
db_url: str = ""
db_admin_url: Optional[str] = None
db_connect_timeout: int = 10
pool_recycle: Optional[int] = None

@staticmethod
def from_dict(data: JSON) -> "DbConfig":
return DbConfig(
db_admin_url=data.get("admin_url", None),
db_url=data.get("url", ""),
db_connect_timeout=data.get("db_connect_timeout", 10),
pool_recycle=data.get("pool_recycle", None),
)


Expand All @@ -112,6 +114,8 @@ class StorageConfig:
watcher_lock: bool = True
watcher_lock_delay: int = 10
download_default_expiration_timeout_minutes: int = 1440
matrix_gc_sleeping_time: int = 3600
matrix_gc_dry_run: bool = False

@staticmethod
def from_dict(data: JSON) -> "StorageConfig":
Expand All @@ -129,6 +133,8 @@ def from_dict(data: JSON) -> "StorageConfig":
download_default_expiration_timeout_minutes=data.get(
"download_default_expiration_timeout_minutes", 1440
),
matrix_gc_sleeping_time=data.get("matrix_gc_sleeping_time", 3600),
matrix_gc_dry_run=data.get("matrix_gc_dry_run", False),
)


Expand Down
1 change: 0 additions & 1 deletion antarest/core/filetransfer/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
UserHasNotPermissionError,
)


logger = logging.getLogger(__name__)


Expand Down
2 changes: 1 addition & 1 deletion antarest/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional

from fastapi import HTTPException
from markupsafe import escape # type: ignore
from markupsafe import escape

from antarest.core.jwt import JWTUser

Expand Down
1 change: 1 addition & 0 deletions antarest/core/tasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TaskType(str, Enum):
VARIANT_GENERATION = "VARIANT_GENERATION"
COPY = "COPY"
ARCHIVE = "ARCHIVE"
UNARCHIVE = "UNARCHIVE"


class TaskStatus(Enum):
Expand Down
19 changes: 16 additions & 3 deletions antarest/core/tasks/repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from http import HTTPStatus
from operator import and_
from typing import Optional, List
from typing import Optional, List, Any

from fastapi import HTTPException

Expand All @@ -28,11 +28,22 @@ def get_or_raise(self, id: str) -> TaskJob:
raise HTTPException(HTTPStatus.NOT_FOUND, f"Task {id} not found")
return task

@staticmethod
def _combine_clauses(where_clauses: List[Any]) -> Any:
assert len(where_clauses) > 0
if len(where_clauses) > 1:
return and_(
where_clauses[0],
TaskJobRepository._combine_clauses(where_clauses[1:]),
)
else:
return where_clauses[0]

def list(
self, filter: TaskListFilter, user: Optional[int] = None
) -> List[TaskJob]:
query = db.session.query(TaskJob)
where_clauses = []
where_clauses: List[Any] = []
if user:
where_clauses.append(TaskJob.owner_id == user)
if len(filter.status) > 0:
Expand Down Expand Up @@ -82,7 +93,9 @@ def list(
)
)
if len(where_clauses) > 1:
query = query.where(and_(*where_clauses))
query = query.where(
TaskJobRepository._combine_clauses(where_clauses)
)
elif len(where_clauses) == 1:
query = query.where(*where_clauses)

Expand Down
30 changes: 30 additions & 0 deletions antarest/launcher/adapters/abstractlauncher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, NamedTuple, Optional, Any
from uuid import UUID

from antarest.core.config import Config
from antarest.core.interfaces.eventbus import (
Event,
EventType,
EventChannelDirectory,
IEventBus,
)
from antarest.core.model import JSON
from antarest.core.requests import RequestParameters
from antarest.launcher.model import JobStatus, LogType
Expand All @@ -14,9 +21,12 @@ class LauncherInitException(Exception):


class LauncherCallbacks(NamedTuple):
# args: job_id, job status, message, output_id
update_status: Callable[
[str, JobStatus, Optional[str], Optional[str]], None
]
# args: job_id, study_id, study_export_path, launcher_params
after_export_flat: Callable[[str, str, Path, Optional[JSON]], None]


class AbstractLauncher(ABC):
Expand All @@ -25,10 +35,12 @@ def __init__(
config: Config,
storage_service: StudyService,
callbacks: LauncherCallbacks,
event_bus: IEventBus,
):
self.config = config
self.storage_service = storage_service
self.callbacks = callbacks
self.event_bus = event_bus

@abstractmethod
def run_study(
Expand All @@ -47,3 +59,21 @@ def get_log(self, job_id: str, log_type: LogType) -> Optional[str]:
@abstractmethod
def kill_job(self, job_id: str) -> None:
raise NotImplementedError()

def create_update_log(
self, job_id: str, study_id: str
) -> Callable[[str], None]:
def update_log(log_line: str) -> None:
self.event_bus.push(
Event(
type=EventType.STUDY_JOB_LOG_UPDATE,
payload={
"log": log_line,
"job_id": job_id,
"study_id": study_id,
},
channel=EventChannelDirectory.JOB_LOGS + job_id,
)
)

return update_log
2 changes: 1 addition & 1 deletion antarest/launcher/adapters/factory_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def build_launcher(
dict_launchers: Dict[str, AbstractLauncher] = dict()
if config.launcher.local is not None:
dict_launchers["local"] = LocalLauncher(
config, storage_service, callbacks
config, storage_service, callbacks, event_bus
)
if config.launcher.slurm is not None:
dict_launchers["slurm"] = SlurmLauncher(
Expand Down
Loading

0 comments on commit cb0d9b3

Please sign in to comment.