Skip to content

Commit

Permalink
🎨 EFS Guardian: adding size monitoring (#6502)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Oct 10, 2024
1 parent d7026b7 commit f3e838b
Show file tree
Hide file tree
Showing 17 changed files with 605 additions and 27 deletions.
18 changes: 18 additions & 0 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ def routing_key(self) -> str | None:
return None


class DynamicServiceRunningMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field(
default="io.simcore.service.dynamic-service-running", const=True
)

project_id: ProjectID
node_id: NodeID
user_id: UserID
product_name: ProductName | None
created_at: datetime.datetime = Field(
default_factory=lambda: arrow.utcnow().datetime,
description="message creation datetime",
)

def routing_key(self) -> str | None:
return None


class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
message_type: RabbitResourceTrackingMessageType = Field(
default=RabbitResourceTrackingMessageType.TRACKING_STARTED, const=True
Expand Down
5 changes: 5 additions & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ services:
RABBIT_PORT: ${RABBIT_PORT}
RABBIT_SECURE: ${RABBIT_SECURE}
RABBIT_USER: ${RABBIT_USER}
REDIS_HOST: ${REDIS_HOST}
REDIS_PASSWORD: ${REDIS_PASSWORD}
REDIS_PORT: ${REDIS_PORT}
REDIS_SECURE: ${REDIS_SECURE}
REDIS_USER: ${REDIS_USER}
SC_USER_ID: ${SC_USER_ID}
SC_USER_NAME: ${SC_USER_NAME}
EFS_USER_ID: ${EFS_USER_ID}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import FastAPI
from models_library.progress_bar import ProgressReport
from models_library.rabbitmq_messages import (
DynamicServiceRunningMessage,
EventRabbitMessage,
LoggerRabbitMessage,
ProgressRabbitMessageNode,
Expand Down Expand Up @@ -34,6 +35,12 @@ async def post_resource_tracking_message(
await _post_rabbit_message(app, message)


async def post_dynamic_service_running_message(
app: FastAPI, message: DynamicServiceRunningMessage
):
await _post_rabbit_message(app, message)


async def post_log_message(
app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt
) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import logging
from typing import Final

from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import ContainerState
from models_library.rabbitmq_messages import (
DynamicServiceRunningMessage,
RabbitResourceTrackingHeartbeatMessage,
RabbitResourceTrackingStartedMessage,
RabbitResourceTrackingStoppedMessage,
Expand All @@ -19,7 +21,10 @@
are_all_containers_in_expected_states,
get_container_states,
)
from ...core.rabbitmq import post_resource_tracking_message
from ...core.rabbitmq import (
post_dynamic_service_running_message,
post_resource_tracking_message,
)
from ...core.settings import ApplicationSettings, ResourceTrackingSettings
from ...models.shared_store import SharedStore
from ._models import ResourceTrackingState
Expand Down Expand Up @@ -70,10 +75,21 @@ async def _heart_beat_task(app: FastAPI):
)

if are_all_containers_in_expected_states(container_states.values()):
message = RabbitResourceTrackingHeartbeatMessage(
rut_message = RabbitResourceTrackingHeartbeatMessage(
service_run_id=settings.DY_SIDECAR_RUN_ID
)
await post_resource_tracking_message(app, message)
dyn_message = DynamicServiceRunningMessage(
project_id=settings.DY_SIDECAR_PROJECT_ID,
node_id=settings.DY_SIDECAR_NODE_ID,
user_id=settings.DY_SIDECAR_USER_ID,
product_name=settings.DY_SIDECAR_PRODUCT_NAME,
)
await asyncio.gather(
*[
post_resource_tracking_message(app, rut_message),
post_dynamic_service_running_message(app, dyn_message),
]
)
else:
_logger.info(
"heart beat message skipped: container_states=%s", container_states
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
)
from ..api.rest.routes import setup_api_routes
from ..api.rpc.routes import setup_rpc_routes
from ..services.background_tasks_setup import setup as setup_background_tasks
from ..services.efs_manager_setup import setup as setup_efs_manager
from ..services.modules.rabbitmq import setup as setup_rabbitmq
from ..services.modules.redis import setup as setup_redis
from ..services.process_messages_setup import setup as setup_process_messages
from .settings import ApplicationSettings

logger = logging.getLogger(__name__)
Expand All @@ -40,11 +43,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI:

# PLUGINS SETUP
setup_rabbitmq(app)
setup_redis(app)

setup_api_routes(app)
setup_rpc_routes(app)

setup_efs_manager(app)
setup_background_tasks(app)
setup_process_messages(app)

# EVENTS
async def _on_startup() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
LogLevel,
VersionTag,
)
from pydantic import Field, PositiveInt, validator
from pydantic import ByteSize, Field, PositiveInt, parse_obj_as, validator
from settings_library.base import BaseCustomSettings
from settings_library.efs import AwsEfsSettings
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from settings_library.tracing import TracingSettings
from settings_library.utils_logging import MixinLoggingSettings

Expand Down Expand Up @@ -57,6 +58,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
EFS_GROUP_NAME: str = Field(
description="Linux group name that the EFS and Simcore linux users are part of"
)
EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: ByteSize = Field(
default=parse_obj_as(ByteSize, "500GiB")
)

# RUNTIME -----------------------------------------------------------
EFS_GUARDIAN_DEBUG: bool = Field(
Expand All @@ -76,6 +80,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):

EFS_GUARDIAN_AWS_EFS_SETTINGS: AwsEfsSettings = Field(auto_default_from_env=True)
EFS_GUARDIAN_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True)
EFS_GUARDIAN_REDIS: RedisSettings = Field(auto_default_from_env=True)
EFS_GUARDIAN_TRACING: TracingSettings | None = Field(
auto_default_from_env=True, description="settings for opentelemetry tracing"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

from fastapi import FastAPI

from ..core.settings import ApplicationSettings

_logger = logging.getLogger(__name__)


async def removal_policy_task(app: FastAPI) -> None:
_logger.info("FAKE Removal policy task started (not yet implemented)")

# After X days of inactivity remove data from EFS
# Probably use `last_modified_data` in the project DB table
# Maybe lock project during this time lock_project()

app_settings: ApplicationSettings = app.state.settings
assert app_settings # nosec
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import logging
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import TypedDict

from fastapi import FastAPI
from servicelib.background_task import stop_periodic_task
from servicelib.logging_utils import log_catch, log_context
from servicelib.redis_utils import start_exclusive_periodic_task

from .background_tasks import removal_policy_task
from .modules.redis import get_redis_lock_client

_logger = logging.getLogger(__name__)


class EfsGuardianBackgroundTask(TypedDict):
name: str
task_func: Callable


_EFS_GUARDIAN_BACKGROUND_TASKS = [
EfsGuardianBackgroundTask(
name="efs_removal_policy_task", task_func=removal_policy_task
)
]


def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _startup() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian startup.."
), log_catch(_logger, reraise=False):
app.state.efs_guardian_background_tasks = []

# Setup periodic tasks
for task in _EFS_GUARDIAN_BACKGROUND_TASKS:
exclusive_task = start_exclusive_periodic_task(
get_redis_lock_client(app),
task["task_func"],
task_period=timedelta(seconds=60), # 1 minute
retry_after=timedelta(seconds=300), # 5 minutes
task_name=task["name"],
app=app,
)
app.state.efs_guardian_background_tasks.append(exclusive_task)

return _startup


def _on_app_shutdown(
_app: FastAPI,
) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian shutdown.."
), log_catch(_logger, reraise=False):
assert _app # nosec
if _app.state.efs_guardian_background_tasks:
await asyncio.gather(
*[
stop_periodic_task(task)
for task in _app.state.efs_guardian_background_tasks
]
)

return _stop


def setup(app: FastAPI) -> None:
app.add_event_handler("startup", _on_app_startup(app))
app.add_event_handler("shutdown", _on_app_shutdown(app))
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from fastapi import FastAPI
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from pydantic import ByteSize

from ..core.settings import ApplicationSettings, get_application_settings
from . import efs_manager_utils


@dataclass(frozen=True)
Expand Down Expand Up @@ -52,3 +54,39 @@ async def create_project_specific_data_dir(
_dir_path, 0o770
) # This gives rwx permissions to user and group, and nothing to others
return _dir_path

async def check_project_node_data_directory_exits(
self, project_id: ProjectID, node_id: NodeID
) -> bool:
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

return _dir_path.exists()

async def get_project_node_data_size(
self, project_id: ProjectID, node_id: NodeID
) -> ByteSize:
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

return await efs_manager_utils.get_size_bash_async(_dir_path)

async def remove_project_node_data_write_permissions(
self, project_id: ProjectID, node_id: NodeID
) -> None:
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

await efs_manager_utils.remove_write_permissions_bash_async(_dir_path)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import logging

from pydantic import ByteSize

_logger = logging.getLogger(__name__)


async def get_size_bash_async(path) -> ByteSize:
# Create the subprocess
command = ["du", "-sb", path]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# Wait for the subprocess to complete
stdout, stderr = await process.communicate()

if process.returncode == 0:
# Parse the output
size = ByteSize(stdout.decode().split()[0])
return size
msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}"
_logger.error(msg)
raise RuntimeError(msg)


async def remove_write_permissions_bash_async(path) -> None:
# Create the subprocess
command = ["chmod", "-R", "a-w", path]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# Wait for the subprocess to complete
_, stderr = await process.communicate()

if process.returncode == 0:
return
msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}"
_logger.error(msg)
raise RuntimeError(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
from typing import cast

from fastapi import FastAPI
from servicelib.redis import RedisClientSDK
from settings_library.redis import RedisDatabase, RedisSettings

logger = logging.getLogger(__name__)


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.redis_lock_client_sdk = None
settings: RedisSettings = app.state.settings.EFS_GUARDIAN_REDIS
redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS)
app.state.redis_lock_client_sdk = lock_client = RedisClientSDK(redis_locks_dsn)
await lock_client.setup()

async def on_shutdown() -> None:
redis_lock_client_sdk: None | RedisClientSDK = app.state.redis_lock_client_sdk
if redis_lock_client_sdk:
await redis_lock_client_sdk.shutdown()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_redis_lock_client(app: FastAPI) -> RedisClientSDK:
return cast(RedisClientSDK, app.state.redis_lock_client_sdk)
Loading

0 comments on commit f3e838b

Please sign in to comment.