Skip to content

Commit

Permalink
Merge branch 'master' into buy-credits
Browse files Browse the repository at this point in the history
  • Loading branch information
ignapas committed Feb 28, 2024
2 parents c76f02d + 524b781 commit c9dd136
Show file tree
Hide file tree
Showing 19 changed files with 472 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ DIRECTOR_PORT=8080

DIRECTOR_V2_HOST=director-v2
DIRECTOR_V2_PORT=8000
DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED=1
DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED=1
DIRECTOR_V2_SERVICES_CUSTOM_CONSTRAINTS=[]
DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS='{}'
DIRECTOR_V2_NODE_PORTS_STORAGE_AUTH=null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from models_library.projects_nodes_io import NodeID
from models_library.wallets import WalletID
from pydantic import BaseModel


class ServiceNoMoreCredits(BaseModel):
node_id: NodeID
wallet_id: WalletID
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Final

SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT: Final[str] = "serviceNoMoreCredits"
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import logging
from typing import Any

from fastapi import HTTPException, Request, status
from fastapi import Request, status
from fastapi.responses import JSONResponse
from httpx import HTTPError, TimeoutException

_logger = logging.getLogger(__file__)
Expand All @@ -30,6 +31,6 @@ async def handle_httpx_client_exceptions(_: Request, exc: HTTPError):

if status_code >= status.HTTP_500_INTERNAL_SERVER_ERROR:
_logger.exception("%s. host=%s. %s", detail, exc.request.url.host, f"{exc}")
raise HTTPException(
status_code=status_code, detail=detail, headers=headers
) from exc
return JSONResponse(
status_code=status_code, content={"detail": detail}, headers=headers
)
1 change: 1 addition & 0 deletions services/director-v2/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ networkx
ordered-set
orjson
pydantic[dotenv]
python-socketio
redis
rich
tenacity
11 changes: 11 additions & 0 deletions services/director-v2/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ attrs==23.1.0
# aiohttp
# jsonschema
# referencing
bidict==0.23.1
# via python-socketio
blosc==1.11.1
# via -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
certifi==2023.7.22
Expand Down Expand Up @@ -197,6 +199,7 @@ h11==0.14.0
# via
# httpcore
# uvicorn
# wsproto
httpcore==0.17.3
# via
# dnspython
Expand Down Expand Up @@ -421,8 +424,12 @@ python-dotenv==1.0.0
# via
# pydantic
# uvicorn
python-engineio==4.9.0
# via python-socketio
python-multipart==0.0.6
# via fastapi
python-socketio==5.11.1
# via -r requirements/_base.in
pyyaml==6.0.1
# via
# -c requirements/../../../packages/dask-task-models-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
Expand Down Expand Up @@ -501,6 +508,8 @@ rpds-py==0.9.2
# via
# jsonschema
# referencing
simple-websocket==1.0.0
# via python-engineio
six==1.16.0
# via python-dateutil
sniffio==1.3.0
Expand Down Expand Up @@ -668,6 +677,8 @@ watchfiles==0.19.0
# via uvicorn
websockets==11.0.3
# via uvicorn
wsproto==1.2.0
# via simple-websocket
yarl==1.9.2
# via
# -r requirements/../../../packages/postgres-database/requirements/_base.in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
director_v0,
dynamic_services,
dynamic_sidecar,
notifier,
osparc_variables_substitutions,
rabbitmq,
resource_usage_tracker_client,
socketio,
storage,
)
from .errors import (
Expand Down Expand Up @@ -165,6 +167,8 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
if dynamic_scheduler_enabled:
dynamic_sidecar.setup(app)
api_keys_manager.setup(app)
socketio.setup(app)
notifier.setup(app)

if (
settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND.COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED
Expand All @@ -174,8 +178,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
if computational_backend_enabled:
comp_scheduler.setup(app)

if settings.DIRECTOR_V2_RESOURCE_USAGE_TRACKER:
resource_usage_tracker_client.setup(app)
resource_usage_tracker_client.setup(app)

if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED:
setup_prometheus_instrumentation(app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Final

from models_library.projects_networks import DockerNetworkName
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt
from pydantic import Field, NonNegativeInt, PositiveFloat
from settings_library.base import BaseCustomSettings

_MINUTE: Final[NonNegativeInt] = 60
Expand Down Expand Up @@ -59,6 +59,14 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings):
description="Prometheus will scrape service placed on these networks",
)

DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED: bool = Field(
default=True,
description=(
"when the message indicating there are no more credits left in a wallet "
"the director-v2 will shutdown the services via the help of the frontend"
),
)

#
# TIMEOUTS AND RETRY dark worlds
#
Expand Down Expand Up @@ -139,19 +147,3 @@ class DynamicServicesSchedulerSettings(BaseCustomSettings):
"allow for some time to pass before declaring it failed."
),
)

#
# DEVELOPMENT ONLY config
#

DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED: bool = Field(
default=True,
description=(
"when the message indicating there are no more credits left in a wallet "
"the director-v2 will shutdown services if True"
),
)

DYNAMIC_SIDECAR_DOCKER_NODE_CONCURRENT_RESOURCE_SLOTS: PositiveInt = Field(
2, description="Amount of slots per resource on a node"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import contextlib

import socketio
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from models_library.api_schemas_directorv2.notifications import ServiceNoMoreCredits
from models_library.api_schemas_directorv2.socketio import (
SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT,
)
from models_library.api_schemas_webserver.socketio import SocketIORoomStr
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from models_library.wallets import WalletID
from servicelib.fastapi.app_state import SingletonInAppStateMixin


class Notifier(SingletonInAppStateMixin):
app_state_name: str = "notifier"

def __init__(self, sio_manager: socketio.AsyncAioPikaManager):
self._sio_manager = sio_manager

async def notify_shutdown_no_more_credits(
self, user_id: UserID, node_id: NodeID, wallet_id: WalletID
) -> None:
await self._sio_manager.emit(
SOCKET_IO_SERVICE_NO_MORE_CREDITS_EVENT,
data=jsonable_encoder(
ServiceNoMoreCredits(node_id=node_id, wallet_id=wallet_id)
),
room=SocketIORoomStr.from_user_id(user_id),
)


async def publish_shutdown_no_more_credits(
app: FastAPI, *, user_id: UserID, node_id: NodeID, wallet_id: WalletID
) -> None:
notifier: Notifier = Notifier.get_from_app_state(app)
await notifier.notify_shutdown_no_more_credits(
user_id=user_id, node_id=node_id, wallet_id=wallet_id
)


def setup(app: FastAPI):
async def _on_startup() -> None:
assert app.state.external_socketio # nosec

notifier = Notifier(
sio_manager=app.state.external_socketio,
)
notifier.set_to_app_state(app)
assert Notifier.get_from_app_state(app) == notifier # nosec

async def _on_shutdown() -> None:
with contextlib.suppress(AttributeError):
Notifier.pop_from_app_state(app)

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from ..core.errors import ConfigurationError
from ..core.settings import AppSettings
from .notifier import publish_shutdown_no_more_credits

_logger = logging.getLogger(__name__)

Expand All @@ -27,9 +28,20 @@ async def handler_out_of_credits(app: FastAPI, data: bytes) -> bool:
settings: AppSettings = app.state.settings

if (
settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_IGNORE_SERVICES_SHUTDOWN_WHEN_CREDITS_LIMIT_REACHED
settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_CLOSE_SERVICES_VIA_FRONTEND_WHEN_CREDITS_LIMIT_REACHED
):
_logger.debug("Skipped shutting down services for wallet %s", message.wallet_id)
_logger.warning(
"Notifying frontend to shutdown service: '%s' for user '%s' because wallet '%s' is out of credits.",
message.node_id,
message.user_id,
message.wallet_id,
)
await publish_shutdown_no_more_credits(
app,
user_id=message.user_id,
node_id=message.node_id,
wallet_id=message.wallet_id,
)
else:
await scheduler.mark_all_services_in_wallet_for_removal(
wallet_id=message.wallet_id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging

import socketio
from fastapi import FastAPI
from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager

from ..core.settings import AppSettings

_logger = logging.getLogger(__name__)


def setup(app: FastAPI):
settings: AppSettings = app.state.settings

async def _on_startup() -> None:
assert app.state.rabbitmq_client # nosec

# Connect to the as an external process in write-only mode
# SEE https://python-socketio.readthedocs.io/en/stable/server.html#emitting-from-external-processes
app.state.external_socketio = socketio.AsyncAioPikaManager(
url=settings.DIRECTOR_V2_RABBITMQ.dsn,
logger=_logger,
write_only=True,
)

async def _on_shutdown() -> None:
if external_socketio := getattr(app.state, "external_socketio"): # noqa: B009
await cleanup_socketio_async_pubsub_manager(
server_manager=external_socketio
)

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
3 changes: 2 additions & 1 deletion services/director-v2/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"pytest_simcore.postgres_service",
"pytest_simcore.pydantic_models",
"pytest_simcore.pytest_global_environs",
"pytest_simcore.pytest_socketio",
"pytest_simcore.rabbit_service",
"pytest_simcore.redis_service",
"pytest_simcore.repository_paths",
Expand Down Expand Up @@ -237,7 +238,7 @@ def fake_workbench_as_dict(fake_workbench_file: Path) -> dict[str, Any]:

@pytest.fixture
def fake_workbench_without_outputs(
fake_workbench_as_dict: dict[str, Any]
fake_workbench_as_dict: dict[str, Any],
) -> dict[str, Any]:
workbench = deepcopy(fake_workbench_as_dict)
# remove all the outputs from the workbench
Expand Down
Loading

0 comments on commit c9dd136

Please sign in to comment.