Skip to content

Commit

Permalink
Merge pull request #963 from AntaresSimulatorTeam/dev
Browse files Browse the repository at this point in the history
v2.5.1
  • Loading branch information
pl-buiquang authored Jul 6, 2022
2 parents 2edf8e7 + df336c4 commit cdc4cca
Show file tree
Hide file tree
Showing 280 changed files with 3,076 additions and 1,394 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM python:3.8-slim-buster
#FROM python:3.8-slim-buster
FROM brunneis/python:3.8.3-ubuntu-20.04

ENV ANTAREST_CONF /resources/application.yaml

Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# AntaREST Storage
# Antares Web

[![CI](https://github.com/AntaresSimulatorTeam/AntaREST/workflows/main/badge.svg)](https://github.com/AntaresSimulatorTeam/AntaREST/actions?query=workflow%3Amain)
[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=AntaresSimulatorTeam_api-iso-antares&metric=coverage)](https://sonarcloud.io/dashboard?id=AntaresSimulatorTeam_api-iso-antares)
[![Licence](https://img.shields.io/github/license/AntaresSimulatorTeam/AntaREST)](https://www.apache.org/licenses/LICENSE-2.0)

<!-- ![architecture api antares](./docs/images/archi-api-antares.png) -->
![Screenshot](./docs/assets/media/img/readme_screenshot.png)

## Documentation

The full project documentation can be found in the [readthedocs website](https://antares-web.readthedocs.io/en/latest).

## Build the API

Expand Down
2 changes: 1 addition & 1 deletion alembic/versions/9846e90c2868_fix_bot_foreign_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
Create Date: 2021-11-19 11:58:11.378519
"""
from sqlite3 import Connection

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
from sqlalchemy import text
from sqlalchemy.engine import Connection

from antarest.login.model import Bot, User

Expand Down
2 changes: 1 addition & 1 deletion antarest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.5.0"
__version__ = "2.5.1"

from pathlib import Path

Expand Down
7 changes: 6 additions & 1 deletion antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,15 @@ class RedisConfig:

host: str = "localhost"
port: int = 6379
password: Optional[str] = None

@staticmethod
def from_dict(data: JSON) -> "RedisConfig":
return RedisConfig(host=data["host"], port=data["port"])
return RedisConfig(
host=data["host"],
port=data["port"],
password=data.get("password", None),
)


@dataclass(frozen=True)
Expand Down
5 changes: 5 additions & 0 deletions antarest/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,10 @@ def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.NOT_FOUND, message)


class WritingInsideZippedFileException(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.BAD_REQUEST, message)


class StudyOutputNotFoundError(Exception):
pass
41 changes: 37 additions & 4 deletions antarest/core/interfaces/eventbus.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Callable, Optional, List, Awaitable

from pydantic import BaseModel

from antarest.core.model import PermissionInfo


class EventType:
class EventType(str, Enum):
ANY = "_ANY"
STUDY_CREATED = "STUDY_CREATED"
STUDY_DELETED = "STUDY_DELETED"
STUDY_EDITED = "STUDY_EDITED"
Expand All @@ -31,6 +33,9 @@ class EventType:
DOWNLOAD_FAILED = "DOWNLOAD_FAILED"
MESSAGE_INFO = "MESSAGE_INFO"
MAINTENANCE_MODE = "MAINTENANCE_MODE"
WORKER_TASK = "WORKER_TASK"
WORKER_TASK_STARTED = "WORKER_TASK_STARTED"
WORKER_TASK_ENDED = "WORKER_TASK_ENDED"


class EventChannelDirectory:
Expand All @@ -41,7 +46,7 @@ class EventChannelDirectory:


class Event(BaseModel):
type: str
type: EventType
payload: Any
permissions: PermissionInfo = PermissionInfo()
channel: Optional[str] = None
Expand All @@ -52,11 +57,25 @@ class IEventBus(ABC):
def push(self, event: Event) -> None:
pass

@abstractmethod
def queue(self, event: Event, queue: str) -> None:
pass

@abstractmethod
def add_queue_consumer(
self, listener: Callable[[Event], Awaitable[None]], queue: str
) -> str:
pass

@abstractmethod
def remove_queue_consumer(self, listener_id: str) -> None:
pass

@abstractmethod
def add_listener(
self,
listener: Callable[[Event], Awaitable[None]],
type_filter: Optional[List[str]] = None,
type_filter: Optional[List[EventType]] = None,
) -> str:
"""
Add an event listener listener
Expand All @@ -77,14 +96,28 @@ def start(self, threaded: bool = True) -> None:


class DummyEventBusService(IEventBus):
def queue(self, event: Event, queue: str) -> None:
# Noop
pass

def add_queue_consumer(
self, listener: Callable[[Event], Awaitable[None]], queue: str
) -> str:
# Noop
pass

def remove_queue_consumer(self, listener_id: str) -> None:
# Noop
pass

def push(self, event: Event) -> None:
# Noop
pass

def add_listener(
self,
listener: Callable[[Event], Awaitable[None]],
type_filter: Optional[List[str]] = None,
type_filter: Optional[List[EventType]] = None,
) -> str:
return ""

Expand Down
1 change: 1 addition & 0 deletions antarest/core/tasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TaskType(str, Enum):
ARCHIVE = "ARCHIVE"
UNARCHIVE = "UNARCHIVE"
SCAN = "SCAN"
WORKER_TASK = "WORKER_TASK"


class TaskStatus(Enum):
Expand Down
120 changes: 107 additions & 13 deletions antarest/core/tasks/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from concurrent.futures import ThreadPoolExecutor, Future
from enum import Enum
from http import HTTPStatus
from typing import Callable, Optional, List, Dict, Awaitable
from typing import Callable, Optional, List, Dict, Awaitable, Union, cast

from fastapi import HTTPException

Expand Down Expand Up @@ -38,6 +38,7 @@
from antarest.core.tasks.repository import TaskJobRepository
from antarest.core.utils.fastapi_sqlalchemy import db
from antarest.core.utils.utils import retry
from antarest.worker.worker import WorkerTaskCommand, WorkerTaskResult

logger = logging.getLogger(__name__)

Expand All @@ -46,6 +47,17 @@


class ITaskService(ABC):
@abstractmethod
def add_worker_task(
self,
task_type: str,
task_args: Dict[str, Union[int, float, bool, str]],
name: Optional[str],
ref_id: Optional[str],
request_params: RequestParameters,
) -> str:
raise NotImplementedError()

@abstractmethod
def add_task(
self,
Expand Down Expand Up @@ -101,10 +113,72 @@ def __init__(
self.threadpool = ThreadPoolExecutor(
max_workers=config.tasks.max_workers, thread_name_prefix="taskjob_"
)
self.event_bus.add_listener(self.create_task_event_callback())
self.event_bus.add_listener(
self.create_task_event_callback(), [EventType.TASK_CANCEL_REQUEST]
)
# set the status of previously running job to FAILED due to server restart
self._fix_running_status()

def _create_worker_task(
self,
task_id: str,
task_type: str,
task_args: Dict[str, Union[int, float, bool, str]],
) -> Callable[[TaskUpdateNotifier], TaskResult]:
task_result_wrapper: List[TaskResult] = []

def _create_awaiter(
res_wrapper: List[TaskResult],
) -> Callable[[Event], Awaitable[None]]:
async def _await_task_end(event: Event) -> None:
task_event = cast(WorkerTaskResult, event.payload)
if task_event.task_id == task_id:
res_wrapper.append(task_event.task_result)

return _await_task_end

def _send_worker_task(logger: TaskUpdateNotifier) -> TaskResult:
listener_id = self.event_bus.add_listener(
_create_awaiter(task_result_wrapper),
[EventType.WORKER_TASK_ENDED],
)
self.event_bus.queue(
Event(
type=EventType.WORKER_TASK,
payload=WorkerTaskCommand(
task_id=task_id,
task_type=task_type,
task_args=task_args,
),
),
task_type,
)
while not task_result_wrapper:
time.sleep(1)
self.event_bus.remove_listener(listener_id)
return task_result_wrapper[0]

return _send_worker_task

def add_worker_task(
self,
task_type: str,
task_args: Dict[str, Union[int, float, bool, str]],
name: Optional[str],
ref_id: Optional[str],
request_params: RequestParameters,
) -> str:
task = self._create_task(
name, TaskType.WORKER_TASK, ref_id, request_params
)
self._launch_task(
self._create_worker_task(str(task.id), task_type, task_args),
task,
None,
request_params,
)
return str(task.id)

def add_task(
self,
action: Task,
Expand All @@ -114,10 +188,21 @@ def add_task(
custom_event_messages: Optional[CustomTaskEventMessages],
request_params: RequestParameters,
) -> str:
task = self._create_task(name, task_type, ref_id, request_params)
self._launch_task(action, task, custom_event_messages, request_params)
return str(task.id)

def _create_task(
self,
name: Optional[str],
task_type: Optional[TaskType],
ref_id: Optional[str],
request_params: RequestParameters,
) -> TaskJob:
if not request_params.user:
raise MustBeAuthenticatedError()

task = self.repo.save(
return self.repo.save(
TaskJob(
name=name or "Unnamed",
owner_id=request_params.user.impersonator,
Expand All @@ -126,6 +211,16 @@ def add_task(
)
)

def _launch_task(
self,
action: Task,
task: TaskJob,
custom_event_messages: Optional[CustomTaskEventMessages],
request_params: RequestParameters,
) -> None:
if not request_params.user:
raise MustBeAuthenticatedError()

self.event_bus.push(
Event(
type=EventType.TASK_ADDED,
Expand All @@ -144,12 +239,10 @@ def add_task(
self._run_task, action, task.id, custom_event_messages
)
self.tasks[task.id] = future
return str(task.id)

def create_task_event_callback(self) -> Callable[[Event], Awaitable[None]]:
async def task_event_callback(event: Event) -> None:
if event.type == EventType.TASK_CANCEL_REQUEST:
self._cancel_task(str(event.payload), dispatch=False)
self._cancel_task(str(event.payload), dispatch=False)

return task_event_callback

Expand Down Expand Up @@ -227,13 +320,14 @@ def await_task(
)
end = time.time() + (timeout_sec or DEFAULT_AWAIT_MAX_TIMEOUT)
while time.time() < end:
task = self.repo.get(task_id)
if not task:
logger.error(f"Awaited task {task_id} was not found")
break
if TaskStatus(task.status).is_final():
break
time.sleep(2)
with db():
task = self.repo.get(task_id)
if not task:
logger.error(f"Awaited task {task_id} was not found")
break
if TaskStatus(task.status).is_final():
break
time.sleep(2)

def _run_task(
self,
Expand Down
18 changes: 16 additions & 2 deletions antarest/core/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from glob import escape
from pathlib import Path
from typing import IO, Any, Optional, Callable, TypeVar, List
from typing import IO, Any, Optional, Callable, TypeVar, List, Union, Awaitable
from zipfile import (
ZipFile,
BadZipFile,
Expand Down Expand Up @@ -90,7 +90,10 @@ def get_local_path() -> Path:


def new_redis_instance(config: RedisConfig) -> redis.Redis: # type: ignore
return redis.Redis(host=config.host, port=config.port, db=0)
redis_client = redis.Redis(
host=config.host, port=config.port, password=config.password, db=0
)
return redis_client


class StopWatch:
Expand Down Expand Up @@ -164,3 +167,14 @@ def unzip(
zipf.extractall(dir_path)
if remove_source_zip:
zip_path.unlink()


def suppress_exception(
callback: Callable[[], T],
logger: Callable[[Exception], None],
) -> Optional[T]:
try:
return callback()
except Exception as e:
logger(e)
return None
Loading

0 comments on commit cdc4cca

Please sign in to comment.