diff --git a/.env-devel b/.env-devel index 1d59bcc47df..f52b9ff62ec 100644 --- a/.env-devel +++ b/.env-devel @@ -216,7 +216,7 @@ DIRECTOR_PORT=8080 DIRECTOR_V2_HOST=director-v2 DIRECTOR_V2_PORT=8000 -DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED=1 +DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED=1 DIRECTOR_V2_SERVICES_CUSTOM_CONSTRAINTS=[] DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS='{}' DIRECTOR_V2_NODE_PORTS_STORAGE_AUTH=null diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/notifications.py b/packages/models-library/src/models_library/api_schemas_directorv2/notifications.py new file mode 100644 index 00000000000..b0bbe171912 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_directorv2/notifications.py @@ -0,0 +1,8 @@ +from models_library.projects_nodes_io import NodeID +from models_library.wallets import WalletID +from pydantic import BaseModel + + +class ServiceNoMoreCredits(BaseModel): + node_id: NodeID + wallet_id: WalletID diff --git a/packages/models-library/src/models_library/api_schemas_directorv2/socketio.py b/packages/models-library/src/models_library/api_schemas_directorv2/socketio.py new file mode 100644 index 00000000000..b368d7606a3 --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_directorv2/socketio.py @@ -0,0 +1,3 @@ +from typing import Final + +SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT: Final[str] = "serviceNoMoreCredits" diff --git a/services/api-server/src/simcore_service_api_server/api/errors/httpx_client_error.py b/services/api-server/src/simcore_service_api_server/api/errors/httpx_client_error.py index eb5fbfbf895..e2abf7650c0 100644 --- a/services/api-server/src/simcore_service_api_server/api/errors/httpx_client_error.py +++ b/services/api-server/src/simcore_service_api_server/api/errors/httpx_client_error.py @@ -6,7 +6,8 @@ import logging from typing import Any -from fastapi import HTTPException, Request, status +from fastapi import Request, status +from fastapi.responses import JSONResponse from httpx import HTTPError, TimeoutException _logger = logging.getLogger(__file__) @@ -30,6 +31,6 @@ async def handle_httpx_client_exceptions(_: Request, exc: HTTPError): if status_code >= status.HTTP_500_INTERNAL_SERVER_ERROR: _logger.exception("%s. host=%s. %s", detail, exc.request.url.host, f"{exc}") - raise HTTPException( - status_code=status_code, detail=detail, headers=headers - ) from exc + return JSONResponse( + status_code=status_code, content={"detail": detail}, headers=headers + ) diff --git a/services/director-v2/requirements/_base.in b/services/director-v2/requirements/_base.in index b47183c13a2..2198739ef70 100644 --- a/services/director-v2/requirements/_base.in +++ b/services/director-v2/requirements/_base.in @@ -29,6 +29,7 @@ networkx ordered-set orjson pydantic[dotenv] +python-socketio redis rich tenacity diff --git a/services/director-v2/requirements/_base.txt b/services/director-v2/requirements/_base.txt index cf1558c7b28..99cb686408d 100644 --- a/services/director-v2/requirements/_base.txt +++ b/services/director-v2/requirements/_base.txt @@ -96,6 +96,8 @@ attrs==23.1.0 # aiohttp # jsonschema # referencing +bidict==0.23.1 + # via python-socketio blosc==1.11.1 # via -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt certifi==2023.7.22 @@ -197,6 +199,7 @@ h11==0.14.0 # via # httpcore # uvicorn + # wsproto httpcore==0.17.3 # via # dnspython @@ -421,8 +424,12 @@ python-dotenv==1.0.0 # via # pydantic # uvicorn +python-engineio==4.9.0 + # via python-socketio python-multipart==0.0.6 # via fastapi +python-socketio==5.11.1 + # via -r requirements/_base.in pyyaml==6.0.1 # via # -c requirements/../../../packages/dask-task-models-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -501,6 +508,8 @@ rpds-py==0.9.2 # via # jsonschema # referencing +simple-websocket==1.0.0 + # via python-engineio six==1.16.0 # via python-dateutil sniffio==1.3.0 @@ -668,6 +677,8 @@ watchfiles==0.19.0 # via uvicorn websockets==11.0.3 # via uvicorn +wsproto==1.2.0 + # via simple-websocket yarl==1.9.2 # via # -r requirements/../../../packages/postgres-database/requirements/_base.in diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index 9c4f367b6b1..56c1214a9e9 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -27,9 +27,11 @@ director_v0, dynamic_services, dynamic_sidecar, + notifier, osparc_variables_substitutions, rabbitmq, resource_usage_tracker_client, + socketio, storage, ) from .errors import ( @@ -165,6 +167,8 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: if dynamic_scheduler_enabled: dynamic_sidecar.setup(app) api_keys_manager.setup(app) + socketio.setup(app) + notifier.setup(app) if ( settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND.COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED @@ -174,8 +178,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: if computational_backend_enabled: comp_scheduler.setup(app) - if settings.DIRECTOR_V2_RESOURCE_USAGE_TRACKER: - resource_usage_tracker_client.setup(app) + resource_usage_tracker_client.setup(app) if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED: setup_prometheus_instrumentation(app) diff --git a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py index f229a3bf2cc..080ed3686f3 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/scheduler.py @@ -1,7 +1,7 @@ from typing import Final from models_library.projects_networks import DockerNetworkName -from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt +from pydantic import Field, NonNegativeInt, PositiveFloat from settings_library.base import BaseCustomSettings _MINUTE: Final[NonNegativeInt] = 60 @@ -59,6 +59,14 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings): description="Prometheus will scrape service placed on these networks", ) + DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED: bool = Field( + default=True, + description=( + "when the message indicating there are no more credits left in a wallet " + "the director-v2 will shutdown the services via the help of the frontend" + ), + ) + # # TIMEOUTS AND RETRY dark worlds # @@ -139,19 +147,3 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings): "allow for some time to pass before declaring it failed." ), ) - - # - # DEVELOPMENT ONLY config - # - - DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED: bool = Field( - default=True, - description=( - "when the message indicating there are no more credits left in a wallet " - "the director-v2 will shutdown services if True" - ), - ) - - DYNAMIC_SIDECAR_DOCKER_NODE_CONCURRENT_RESOURCE_SLOTS: PositiveInt = Field( - 2, description="Amount of slots per resource on a node" - ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/notifier.py b/services/director-v2/src/simcore_service_director_v2/modules/notifier.py new file mode 100644 index 00000000000..ae3b2faadba --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/notifier.py @@ -0,0 +1,59 @@ +import contextlib + +import socketio +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_directorv2.notifications import ServiceNoMoreCredits +from models_library.api_schemas_directorv2.socketio import ( + SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT, +) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from models_library.wallets import WalletID +from servicelib.fastapi.app_state import SingletonInAppStateMixin + + +class Notifier(SingletonInAppStateMixin): + app_state_name: str = "notifier" + + def __init__(self, sio_manager: socketio.AsyncAioPikaManager): + self._sio_manager = sio_manager + + async def notify_shutdown_no_more_credits( + self, user_id: UserID, node_id: NodeID, wallet_id: WalletID + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT, + data=jsonable_encoder( + ServiceNoMoreCredits(node_id=node_id, wallet_id=wallet_id) + ), + room=SocketIORoomStr.from_user_id(user_id), + ) + + +async def publish_shutdown_no_more_credits( + app: FastAPI, *, user_id: UserID, node_id: NodeID, wallet_id: WalletID +) -> None: + notifier: Notifier = Notifier.get_from_app_state(app) + await notifier.notify_shutdown_no_more_credits( + user_id=user_id, node_id=node_id, wallet_id=wallet_id + ) + + +def setup(app: FastAPI): + async def _on_startup() -> None: + assert app.state.external_socketio # nosec + + notifier = Notifier( + sio_manager=app.state.external_socketio, + ) + notifier.set_to_app_state(app) + assert Notifier.get_from_app_state(app) == notifier # nosec + + async def _on_shutdown() -> None: + with contextlib.suppress(AttributeError): + Notifier.pop_from_app_state(app) + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py index 7d8911648b6..2563a4133d7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py @@ -16,6 +16,7 @@ from ..core.errors import ConfigurationError from ..core.settings import AppSettings +from .notifier import publish_shutdown_no_more_credits _logger = logging.getLogger(__name__) @@ -27,9 +28,20 @@ async def handler_out_of_credits(app: FastAPI, data: bytes) -> bool: settings: AppSettings = app.state.settings if ( - settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED + settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED ): - _logger.debug("Skipped shutting down services for wallet %s", message.wallet_id) + _logger.warning( + "Notifying frontend to shutdown service: '%s' for user '%s' because wallet '%s' is out of credits.", + message.node_id, + message.user_id, + message.wallet_id, + ) + await publish_shutdown_no_more_credits( + app, + user_id=message.user_id, + node_id=message.node_id, + wallet_id=message.wallet_id, + ) else: await scheduler.mark_all_services_in_wallet_for_removal( wallet_id=message.wallet_id diff --git a/services/director-v2/src/simcore_service_director_v2/modules/socketio.py b/services/director-v2/src/simcore_service_director_v2/modules/socketio.py new file mode 100644 index 00000000000..d8166a6ed93 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/modules/socketio.py @@ -0,0 +1,33 @@ +import logging + +import socketio +from fastapi import FastAPI +from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager + +from ..core.settings import AppSettings + +_logger = logging.getLogger(__name__) + + +def setup(app: FastAPI): + settings: AppSettings = app.state.settings + + async def _on_startup() -> None: + assert app.state.rabbitmq_client # nosec + + # Connect to the as an external process in write-only mode + # SEE https://python-socketio.readthedocs.io/en/stable/server.html#emitting-from-external-processes + app.state.external_socketio = socketio.AsyncAioPikaManager( + url=settings.DIRECTOR_V2_RABBITMQ.dsn, + logger=_logger, + write_only=True, + ) + + async def _on_shutdown() -> None: + if external_socketio := getattr(app.state, "external_socketio"): # noqa: B009 + await cleanup_socketio_async_pubsub_manager( + server_manager=external_socketio + ) + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index 3479f905ce8..9e947ba1639 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -38,6 +38,7 @@ "pytest_simcore.postgres_service", "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", + "pytest_simcore.pytest_socketio", "pytest_simcore.rabbit_service", "pytest_simcore.redis_service", "pytest_simcore.repository_paths", @@ -237,7 +238,7 @@ def fake_workbench_as_dict(fake_workbench_file: Path) -> dict[str, Any]: @pytest.fixture def fake_workbench_without_outputs( - fake_workbench_as_dict: dict[str, Any] + fake_workbench_as_dict: dict[str, Any], ) -> dict[str, Any]: workbench = deepcopy(fake_workbench_as_dict) # remove all the outputs from the workbench diff --git a/services/director-v2/tests/unit/test_modules_notifier.py b/services/director-v2/tests/unit/test_modules_notifier.py new file mode 100644 index 00000000000..02092746608 --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_notifier.py @@ -0,0 +1,189 @@ +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +from collections.abc import AsyncIterable, Callable +from contextlib import AsyncExitStack, _AsyncGeneratorContextManager +from unittest.mock import AsyncMock + +import pytest +import socketio +from faker import Faker +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_directorv2.notifications import ServiceNoMoreCredits +from models_library.api_schemas_directorv2.socketio import ( + SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT, +) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID +from models_library.wallets import WalletID +from pydantic import NonNegativeInt, parse_obj_as +from pytest_mock import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from servicelib.utils import logged_gather +from settings_library.rabbit import RabbitSettings +from simcore_service_director_v2.core.settings import AppSettings +from simcore_service_director_v2.modules.notifier import ( + publish_shutdown_no_more_credits, +) +from socketio import AsyncServer +from tenacity import AsyncRetrying +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_fixed + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def disable_modules_setup(mocker: MockerFixture) -> None: + module_base = "simcore_service_director_v2.core.application" + mocker.patch(f"{module_base}.db.setup", autospec=True, return_value=False) + mocker.patch( + f"{module_base}.api_keys_manager.setup", autospec=True, return_value=False + ) + mocker.patch( + f"{module_base}.resource_usage_tracker_client.setup", + autospec=True, + return_value=False, + ) + + +@pytest.fixture +def mock_env( + disable_modules_setup: None, + monkeypatch: pytest.MonkeyPatch, + mock_env: EnvVarsDict, + rabbit_service: RabbitSettings, +) -> EnvVarsDict: + setenvs_from_dict( + monkeypatch, + { + "S3_ENDPOINT": "test-s3", + "S3_ACCESS_KEY": "", + "S3_SECRET_KEY": "", + "S3_BUCKET_NAME": "", + "DIRECTOR_ENABLED": "0", + "DIRECTOR_V0_ENABLED": "0", + "DIRECTOR_V2_CATALOG": "null", + "COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED": "0", + "COMPUTATIONAL_BACKEND_ENABLED": "0", + "DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED": "1", + }, + ) + return mock_env + + +@pytest.fixture +async def socketio_server( + initialized_app: FastAPI, + socketio_server_factory: Callable[ + [RabbitSettings], _AsyncGeneratorContextManager[AsyncServer] + ], +) -> AsyncIterable[AsyncServer]: + # Same configuration as simcore_service_webserver/socketio/server.py + settings: AppSettings = initialized_app.state.settings + assert settings.DIRECTOR_V2_RABBITMQ + + async with socketio_server_factory(settings.DIRECTOR_V2_RABBITMQ) as server: + yield server + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint() + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def room_name(user_id: UserID) -> SocketIORoomStr: + return SocketIORoomStr.from_user_id(user_id) + + +@pytest.fixture +def wallet_id(faker: Faker) -> WalletID: + return faker.pyint() + + +def _get_on_no_more_credits_event( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_no_more_credits(data): + assert parse_obj_as(ServiceNoMoreCredits, data) is not None + + on_event_spy = AsyncMock(wraps=on_no_more_credits) + socketio_client.on(SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT, on_event_spy) + + return on_event_spy + + +async def _assert_call_count(mock: AsyncMock, *, call_count: int) -> None: + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), stop=stop_after_attempt(500), reraise=True + ): + with attempt: + assert mock.call_count == call_count + + +async def test_notifier_publish_message( + socketio_server_events: dict[str, AsyncMock], + initialized_app: FastAPI, + user_id: UserID, + node_id: NodeID, + wallet_id: WalletID, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + number_of_clients: NonNegativeInt = 10 + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(number_of_clients) + ] + ) + await _assert_call_count(server_connect, call_count=number_of_clients) + + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=number_of_clients) + + # attach spy to client + no_no_more_credits_events: list[AsyncMock] = [ + _get_on_no_more_credits_event(c) for c in frontend_clients + ] + + # server publishes a message + await publish_shutdown_no_more_credits( + initialized_app, user_id=user_id, node_id=node_id, wallet_id=wallet_id + ) + + # check that all clients received it + for on_no_more_credits_event in no_no_more_credits_events: + await _assert_call_count(on_no_more_credits_event, call_count=1) + on_no_more_credits_event.assert_awaited_once_with( + jsonable_encoder( + ServiceNoMoreCredits(node_id=node_id, wallet_id=wallet_id) + ) + ) + + await _assert_call_count(server_disconnect, call_count=number_of_clients) diff --git a/services/director-v2/tests/unit/test_modules_rabbitmq.py b/services/director-v2/tests/unit/test_modules_rabbitmq.py index 130a57cd0a5..1d557d673a8 100644 --- a/services/director-v2/tests/unit/test_modules_rabbitmq.py +++ b/services/director-v2/tests/unit/test_modules_rabbitmq.py @@ -21,7 +21,7 @@ def ignore_limits(request: pytest.FixtureRequest) -> bool: @pytest.fixture async def mock_app(ignore_limits: bool) -> FastAPI: mock = AsyncMock() - mock.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED = ( + mock.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED = ( ignore_limits ) mock.state.dynamic_sidecar_scheduler = AsyncMock() diff --git a/services/docker-compose.yml b/services/docker-compose.yml index a7852b9d955..bb597c35387 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -264,7 +264,7 @@ services: - DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS=${DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS} - DIRECTOR_V2_DEV_FEATURES_ENABLED=${DIRECTOR_V2_DEV_FEATURES_ENABLED} - - DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED=${DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED} + - DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED=${DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED} - DIRECTOR_V2_SERVICES_CUSTOM_CONSTRAINTS=${DIRECTOR_V2_SERVICES_CUSTOM_CONSTRAINTS} - DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED=${DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED} diff --git a/services/static-webserver/client/source/class/osparc/data/TTLMap.js b/services/static-webserver/client/source/class/osparc/data/TTLMap.js new file mode 100644 index 00000000000..93e60c08042 --- /dev/null +++ b/services/static-webserver/client/source/class/osparc/data/TTLMap.js @@ -0,0 +1,68 @@ +/* ************************************************************************ + + osparc - the simcore frontend + + https://osparc.io + + Copyright: + 2023 IT'IS Foundation, https://itis.swiss + + License: + MIT: https://opensource.org/licenses/MIT + + Authors: + * Andrei Neagu (GitHK) + +************************************************************************ */ + +/** + * Automatically expire keys after ttl (time to live) is reached + */ +qx.Class.define("osparc.data.TTLMap", { + extend: qx.core.Object, + + /** + * @param ttl time for which to keep track of the entries + */ + construct: function(ttl) { + this.base(arguments); + + this._entries = new Map(); + this._ttl = ttl; + }, + + members: { + + /** + * @param {object} entry: add an entry or extend it's duration + */ + addOrUpdateEntry: function(entry) { + const now = Date.now(); + console.log("mapping", entry, "to", now); + this._entries.set(entry, now); + + // Set a timeout to potentially remove the entry after the ttl if it's the latest + setTimeout(() => { + // If the entry is still the latest, remove it + if (now === this._entries.get(entry)) { + this._entries.delete(entry); + } + }, this._ttl); + }, + + /** + * checks of the entry is still present and valid (did not reach the `ttl`) + * @param {*} entry + * @returns + */ + hasRecentEntry: function(entry) { + const now = Date.now(); + if (this._entries.has(entry)) { + const lastUpdate = this._entries.get(entry); + return now - lastUpdate <= this._ttl; + } + return false; + } + + } +}); diff --git a/services/static-webserver/client/source/class/osparc/data/model/Node.js b/services/static-webserver/client/source/class/osparc/data/model/Node.js index d08f8fcb1b3..355a6d10cd1 100644 --- a/services/static-webserver/client/source/class/osparc/data/model/Node.js +++ b/services/static-webserver/client/source/class/osparc/data/model/Node.js @@ -976,28 +976,37 @@ qx.Class.define("osparc.data.model.Node", { return true; }, - requestStopNode: function() { - const preferencesSettings = osparc.Preferences.getInstance(); - if (preferencesSettings.getConfirmStopNode()) { - const msg = this.tr("Do you really want Stop and Save the current state?"); - const win = new osparc.ui.window.Confirmation(msg).set({ - confirmText: this.tr("Stop") - }); - win.center(); - win.open(); - win.addListener("close", () => { - if (win.getConfirmed()) { - const params = { - url: { - studyId: this.getStudy().getUuid(), - nodeId: this.getNodeId() - } - }; - osparc.data.Resources.fetch("studies", "stopNode", params) - .then(() => this.stopDynamicService()) - .catch(err => console.error(err)); + requestStopNode: function(withConfirmationDialog=false) { + const self = this; + const stopService = () => { + const params = { + url: { + studyId: self.getStudy().getUuid(), + nodeId: self.getNodeId() } - }, this); + }; + osparc.data.Resources.fetch("studies", "stopNode", params) + .then(() => self.stopDynamicService()) + .catch(err => console.error(err)); + }; + + if (withConfirmationDialog) { + const preferencesSettings = osparc.Preferences.getInstance(); + if (preferencesSettings.getConfirmStopNode()) { + const msg = this.tr("Do you really want Stop and Save the current state?"); + const win = new osparc.ui.window.Confirmation(msg).set({ + confirmText: this.tr("Stop") + }); + win.center(); + win.open(); + win.addListener("close", () => { + if (win.getConfirmed()) { + stopService(); + } + }, this); + } + } else { + stopService(); } }, diff --git a/services/static-webserver/client/source/class/osparc/desktop/WorkbenchView.js b/services/static-webserver/client/source/class/osparc/desktop/WorkbenchView.js index 2b9cc359189..e789a62ccff 100644 --- a/services/static-webserver/client/source/class/osparc/desktop/WorkbenchView.js +++ b/services/static-webserver/client/source/class/osparc/desktop/WorkbenchView.js @@ -696,6 +696,8 @@ qx.Class.define("osparc.desktop.WorkbenchView", { this.listenToNodeProgress(); + this.listenToNoMoreCreditsEvents(); + // callback for events if (!socket.slotExists("event")) { socket.on("event", data => { @@ -737,6 +739,35 @@ qx.Class.define("osparc.desktop.WorkbenchView", { } }, + listenToNoMoreCreditsEvents: function() { + const slotName = "serviceNoMoreCredits"; + const flashMessageDisplayDuration = 10000; + + const socket = osparc.wrapper.WebSocket.getInstance(); + const ttlMap = new osparc.data.TTLMap(flashMessageDisplayDuration); + const store = osparc.store.Store.getInstance(); + + if (!socket.slotExists(slotName)) { + socket.on(slotName, noMoreCredits => { + // stop service + const nodeId = noMoreCredits["node_id"]; + const workbench = this.getStudy().getWorkbench(); + workbench.getNode(nodeId).requestStopNode(); + + // display flash message if not showing + const walletId = noMoreCredits["wallet_id"]; + if (ttlMap.hasRecentEntry(walletId)) { + return; + } + ttlMap.addOrUpdateEntry(walletId); + const usedWallet = store.getWallets().find(wallet => wallet.getWalletId() === walletId); + const walletName = usedWallet.getName(); + const text = `Wallet "${walletName}", running your service(s) has run out of credits. Stopping service(s) gracefully.`; + osparc.FlashMessenger.getInstance().logAs(this.tr(text), "ERROR", null, flashMessageDisplayDuration); + }, this); + } + }, + getStartStopButtons: function() { return this.__workbenchPanel.getToolbar().getChildControl("start-stop-btns"); }, diff --git a/services/static-webserver/client/source/class/osparc/workbench/WorkbenchUI.js b/services/static-webserver/client/source/class/osparc/workbench/WorkbenchUI.js index 1eecad087bb..f754a406c85 100644 --- a/services/static-webserver/client/source/class/osparc/workbench/WorkbenchUI.js +++ b/services/static-webserver/client/source/class/osparc/workbench/WorkbenchUI.js @@ -1235,7 +1235,7 @@ qx.Class.define("osparc.workbench.WorkbenchUI", { }, stopDynService: { "text": "\uf04d", // stop - "action": () => nodeUI.getNode().requestStopNode() + "action": () => nodeUI.getNode().requestStopNode(true) }, addRemoveMarker: { "text": "\uf097", // marker