From cc1ba58eefe28bf6111ac05e3e93d861a600a2c0 Mon Sep 17 00:00:00 2001 From: Mohamed Abdel Wedoud Date: Fri, 11 Oct 2024 10:51:49 +0200 Subject: [PATCH] refactor(workers): remove the `simulator` worker --- antarest/service_creator.py | 12 --- antarest/singleton_services.py | 5 - antarest/worker/simulator_worker.py | 143 -------------------------- tests/worker/test_simulator_worker.py | 80 -------------- 4 files changed, 240 deletions(-) delete mode 100644 antarest/worker/simulator_worker.py delete mode 100644 tests/worker/test_simulator_worker.py diff --git a/antarest/service_creator.py b/antarest/service_creator.py index b942418b2c..fa8086b789 100644 --- a/antarest/service_creator.py +++ b/antarest/service_creator.py @@ -47,7 +47,6 @@ from antarest.study.storage.rawstudy.watcher import Watcher from antarest.study.web.watcher_blueprint import create_watcher_routes from antarest.worker.archive_worker import ArchiveWorker -from antarest.worker.simulator_worker import SimulatorWorker from antarest.worker.worker import AbstractWorker logger = logging.getLogger(__name__) @@ -72,7 +71,6 @@ class Module(str, Enum): MATRIX_GC = "matrix_gc" ARCHIVE_WORKER = "archive_worker" AUTO_ARCHIVER = "auto_archiver" - SIMULATOR_WORKER = "simulator_worker" def init_db_engine( @@ -221,16 +219,6 @@ def create_archive_worker( return ArchiveWorker(event_bus, workspace, local_root, config) -def create_simulator_worker( - config: Config, - matrix_service: MatrixService, - event_bus: t.Optional[IEventBus] = None, -) -> AbstractWorker: - if not event_bus: - event_bus, _ = create_event_bus(None, config) - return SimulatorWorker(event_bus, matrix_service, config) - - def create_services( config: Config, app_ctxt: t.Optional[AppBuildContext], create_all: bool = False ) -> t.Dict[str, t.Any]: diff --git a/antarest/singleton_services.py b/antarest/singleton_services.py index 3b2373cc0c..99fd44b23c 100644 --- a/antarest/singleton_services.py +++ b/antarest/singleton_services.py @@ -25,7 +25,6 @@ create_archive_worker, create_core_services, create_matrix_gc, - create_simulator_worker, create_watcher, init_db_engine, ) @@ -72,10 +71,6 @@ def _init(config_file: Path, services_list: List[Module]) -> Dict[Module, IServi worker = create_archive_worker(config, "test", event_bus=event_bus) services[Module.ARCHIVE_WORKER] = worker - if Module.SIMULATOR_WORKER in services_list: - worker = create_simulator_worker(config, matrix_service=matrix_service, event_bus=event_bus) - services[Module.SIMULATOR_WORKER] = worker - if Module.AUTO_ARCHIVER in services_list: auto_archive_service = AutoArchiveService(study_service, config) services[Module.AUTO_ARCHIVER] = auto_archive_service diff --git a/antarest/worker/simulator_worker.py b/antarest/worker/simulator_worker.py deleted file mode 100644 index 5dba1d13db..0000000000 --- a/antarest/worker/simulator_worker.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright (c) 2024, RTE (https://www.rte-france.com) -# -# See AUTHORS.txt -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -# -# SPDX-License-Identifier: MPL-2.0 -# -# This file is part of the Antares project. - -import io -import logging -import subprocess -import threading -import time -from pathlib import Path -from typing import cast - -from antarest.core.cache.business.local_chache import LocalCache -from antarest.core.config import Config, LocalConfig -from antarest.core.interfaces.eventbus import IEventBus -from antarest.core.serialization import AntaresBaseModel -from antarest.core.tasks.model import TaskResult -from antarest.core.utils.fastapi_sqlalchemy import db -from antarest.launcher.adapters.log_manager import follow -from antarest.matrixstore.service import MatrixService -from antarest.matrixstore.uri_resolver_service import UriResolverService -from antarest.study.storage.rawstudy.model.filesystem.factory import StudyFactory -from antarest.worker.worker import AbstractWorker, WorkerTaskCommand - -logger = logging.getLogger(__name__) - - -GENERATE_TIMESERIES_TASK_NAME = "generate-timeseries" -GENERATE_KIRSHOFF_CONSTRAINTS_TASK_NAME = "generate-kirshoff-constraints" - - -class GenerateTimeseriesTaskArgs(AntaresBaseModel): - study_id: str - study_path: str - managed: bool - study_version: str - - -class SimulatorWorker(AbstractWorker): - def __init__( - self, - event_bus: IEventBus, - matrix_service: MatrixService, - config: Config, - ): - super().__init__( - "Simulator worker", - event_bus, - [ - GENERATE_TIMESERIES_TASK_NAME, - GENERATE_KIRSHOFF_CONSTRAINTS_TASK_NAME, - ], - ) - self.config = config - # this will raise an error if not properly configured - self.binaries = (self.config.launcher.local or LocalConfig()).binaries - self.study_factory = StudyFactory( - matrix=matrix_service, - resolver=UriResolverService(matrix_service=matrix_service), - cache=LocalCache(), - ) - - def _execute_task(self, task_info: WorkerTaskCommand) -> TaskResult: - if task_info.task_type == GENERATE_TIMESERIES_TASK_NAME: - return self.execute_timeseries_generation_task(task_info) - elif task_info.task_type == GENERATE_KIRSHOFF_CONSTRAINTS_TASK_NAME: - return self.execute_kirshoff_constraint_generation_task(task_info) - raise NotImplementedError(f"{task_info.task_type} is not implemented by this worker") - - def execute_kirshoff_constraint_generation_task(self, task_info: WorkerTaskCommand) -> TaskResult: - raise NotImplementedError - - def execute_timeseries_generation_task(self, task_info: WorkerTaskCommand) -> TaskResult: - result = TaskResult(success=True, message="", return_value="") - task = GenerateTimeseriesTaskArgs.model_validate(task_info.task_args) - binary = ( - self.binaries[task.study_version] - if task.study_version in self.binaries - else list(self.binaries.values())[0] - ) - file_study = self.study_factory.create_from_fs(Path(task.study_path), task.study_id, use_cache=False) - if task.managed: - with db(): - file_study.tree.denormalize() - - def append_output(line: str) -> None: - result.return_value += line # type: ignore - - try: - end = False - - def stop_reading() -> bool: - return end - - process = subprocess.Popen( - [ - binary, - "-i", - task.study_path, - "-g", - ], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - encoding="utf-8", - ) - thread = threading.Thread( - target=lambda: follow( - cast(io.StringIO, process.stdout), - append_output, - stop_reading, - None, - ), - name=f"{self.__class__.__name__}-TS-Generator", - daemon=True, - ) - thread.start() - - while process.poll() is None: - time.sleep(1) - - result.success = process.returncode == 0 - except Exception as e: - logger.error( - f"Failed to generate timeseries for study located at {task.study_path}", - exc_info=e, - ) - result.success = False - result.message = repr(e) - finally: - if task.managed: - with db(): - file_study.tree.normalize() - end = True - return result diff --git a/tests/worker/test_simulator_worker.py b/tests/worker/test_simulator_worker.py deleted file mode 100644 index 791aa3ec3d..0000000000 --- a/tests/worker/test_simulator_worker.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) 2024, RTE (https://www.rte-france.com) -# -# See AUTHORS.txt -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -# -# SPDX-License-Identifier: MPL-2.0 -# -# This file is part of the Antares project. - -import os -import platform -import stat -from pathlib import Path -from unittest.mock import Mock, patch - -import pytest - -from antarest.core.config import Config, LauncherConfig, LocalConfig -from antarest.worker.simulator_worker import ( - GENERATE_KIRSHOFF_CONSTRAINTS_TASK_NAME, - GENERATE_TIMESERIES_TASK_NAME, - SimulatorWorker, -) -from antarest.worker.worker import WorkerTaskCommand -from tests.helpers import with_db_context - - -@with_db_context -@patch("antarest.worker.simulator_worker.logger") -def test_execute_task(logger_mock: Mock, tmp_path: Path): - simulator_mock_path = Path(__file__).parent.parent / "integration" / "launcher_mock.sh" - st = os.stat(simulator_mock_path) - os.chmod(simulator_mock_path, st.st_mode | stat.S_IEXEC) - worker = SimulatorWorker( - Mock(), - Mock(), - Config( - launcher=LauncherConfig( - local=LocalConfig( - binaries={ - "800": simulator_mock_path, - } - ) - ) - ), - ) - worker.study_factory = Mock() - - with pytest.raises(NotImplementedError): - worker._execute_task(task_info=WorkerTaskCommand(task_id="task_id", task_type="unknown", task_args={})) - - with pytest.raises(NotImplementedError): - worker._execute_task( - task_info=WorkerTaskCommand( - task_id="task_id", - task_type=GENERATE_KIRSHOFF_CONSTRAINTS_TASK_NAME, - task_args={}, - ) - ) - study_path = tmp_path / "study" - result = worker._execute_task( - task_info=WorkerTaskCommand( - task_id="task_id", - task_type=GENERATE_TIMESERIES_TASK_NAME, - task_args={ - "study_id": "some_id", - "managed": False, - "study_path": str(study_path), - "study_version": "800", - }, - ) - ) - if not platform.platform().startswith("Windows"): - assert result.success - assert result.return_value == f"-i {study_path} -g\nexit 0\n" - else: - assert not result.success