diff --git a/AUTHORS.md b/AUTHORS.md index ef3d8bd3..3ce000e1 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -3,6 +3,7 @@ The list of contributors in alphabetical order: - [Adelina Lintuluoto](https://orcid.org/0000-0002-0726-1452) +- [Alp Tuna](https://orcid.org/0009-0001-1915-3993) - [Anton Khodak](https://orcid.org/0000-0003-3263-4553) - [Audrius Mecionis](https://orcid.org/0000-0002-3759-1663) - [Bruno Rosendo](https://orcid.org/0000-0002-0923-3148) diff --git a/docs/openapi.json b/docs/openapi.json index e3500970..7da1cafa 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -429,6 +429,26 @@ "slurmcern" ] }, + "dask_cluster_default_number_of_workers": { + "title": "Default number of workers for Dask clusters", + "value": "2Gi" + }, + "dask_cluster_default_single_worker_memory": { + "title": "Amount of memory for one Dask worker by default", + "value": "2Gi" + }, + "dask_cluster_max_memory_limit": { + "title": "Maximum memory limit for Dask clusters", + "value": "16Gi" + }, + "dask_cluster_max_single_worker_memory": { + "title": "Maximum amount of memory for one Dask worker", + "value": "8Gi" + }, + "dask_enabled": { + "title": "Dask workflows allowed in the cluster", + "value": "False" + }, "default_kubernetes_jobs_timeout": { "title": "Default timeout for Kubernetes jobs", "value": "604800" @@ -479,6 +499,61 @@ }, "type": "object" }, + "dask_cluster_default_number_of_workers": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_default_single_worker_memory": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_max_memory_limit": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_cluster_max_single_worker_memory": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, + "dask_enabled": { + "properties": { + "title": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "type": "object" + }, "default_kubernetes_jobs_timeout": { "properties": { "title": { diff --git a/reana_server/config.py b/reana_server/config.py index 472361f8..b2bbfa62 100644 --- a/reana_server/config.py +++ b/reana_server/config.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023 CERN. +# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -58,6 +58,29 @@ os.getenv("LOGIN_PROVIDERS_SECRETS", "{}") ) +DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) +"""Whether Dask is enabled in the cluster or not""" + +REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv( + "REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi" +) +"""Maximum memory limit for Dask clusters.""" + +REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int( + os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2) +) +"""Number of workers in Dask cluster by default """ + +REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi" +) +"""Memory for one Dask worker by default.""" + +REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv( + "REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi" +) +"""Maximum memory for one Dask worker.""" + REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT") """Maximum memory limit for user job containers for workflow complexity estimation.""" diff --git a/reana_server/rest/info.py b/reana_server/rest/info.py index 0b12dae4..d2be56e2 100644 --- a/reana_server/rest/info.py +++ b/reana_server/rest/info.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2021, 2022 CERN. +# Copyright (C) 2021, 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -24,6 +24,11 @@ REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT, REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT, REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD, + DASK_ENABLED, + REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS, + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY, ) from reana_server.decorators import signin_required @@ -125,6 +130,41 @@ def info(user, **kwargs): # noqa type: string type: array type: object + dask_enabled: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_max_memory_limit: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_default_number_of_workers: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_default_single_worker_memory: + properties: + title: + type: string + value: + type: string + type: object + dask_cluster_max_single_worker_memory: + properties: + title: + type: string + value: + type: string + type: object type: object examples: application/json: @@ -165,6 +205,26 @@ def info(user, **kwargs): # noqa "title": "Maximum timeout for Kubernetes jobs", "value": "1209600" }, + "dask_enabled": { + "title": "Dask workflows allowed in the cluster", + "value": "False" + }, + "dask_cluster_max_memory_limit": { + "title": "Maximum memory limit for Dask clusters", + "value": "16Gi" + }, + "dask_cluster_default_number_of_workers": { + "title": "Default number of workers for Dask clusters", + "value": "2Gi" + }, + "dask_cluster_default_single_worker_memory": { + "title": "Amount of memory for one Dask worker by default", + "value": "2Gi" + }, + "dask_cluster_max_single_worker_memory": { + "title": "Maximum amount of memory for one Dask worker", + "value": "8Gi" + }, } 500: description: >- @@ -217,7 +277,29 @@ def info(user, **kwargs): # noqa title="Maximum inactivity period in days before automatic closure of interactive sessions", value=REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD, ), + dask_enabled=dict( + title="Dask workflows allowed in the cluster", + value=bool(DASK_ENABLED), + ), ) + if DASK_ENABLED: + cluster_information["dask_cluster_default_number_of_workers"] = dict( + title="Number of workers in Dask clusters by default", + value=REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS, + ) + cluster_information["dask_cluster_max_memory_limit"] = dict( + title="Maximum memory limit for Dask clusters", + value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT, + ) + cluster_information["dask_cluster_default_single_worker_memory"] = dict( + title="Memory for one Dask worker by default", + value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + ) + cluster_information["dask_cluster_max_single_worker_memory"] = dict( + title="Maximum memory for one Dask worker", + value=REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY, + ) + return InfoSchema().dump(cluster_information) except Exception as e: @@ -260,3 +342,10 @@ class InfoSchema(Schema): maximum_interactive_session_inactivity_period = fields.Nested( StringNullableInfoValue ) + kubernetes_max_memory_limit = fields.Nested(StringInfoValue) + dask_enabled = fields.Nested(StringInfoValue) + if DASK_ENABLED: + dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue) + dask_cluster_max_memory_limit = fields.Nested(StringInfoValue) + dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue) + dask_cluster_max_single_worker_memory = fields.Nested(StringInfoValue) diff --git a/reana_server/rest/workflows.py b/reana_server/rest/workflows.py index 5ae2aa8b..475ff47c 100644 --- a/reana_server/rest/workflows.py +++ b/reana_server/rest/workflows.py @@ -49,6 +49,7 @@ validate_inputs, validate_workflow, validate_workspace_path, + validate_dask_memory_and_cores_limits, ) from webargs import fields, validate from webargs.flaskparser import use_kwargs @@ -566,6 +567,8 @@ def create_workflow(user): # noqa validate_inputs(reana_spec_file) + validate_dask_memory_and_cores_limits(reana_spec_file) + retention_days = reana_spec_file.get("workspace", {}).get("retention_days") retention_rules = get_workspace_retention_rules(retention_days) diff --git a/reana_server/validation.py b/reana_server/validation.py index 8a50388a..4e85830c 100644 --- a/reana_server/validation.py +++ b/reana_server/validation.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2022 CERN. +# Copyright (C) 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -18,8 +18,17 @@ from reana_commons.validation.operational_options import validate_operational_options from reana_commons.validation.parameters import build_parameters_validator from reana_commons.validation.utils import validate_reana_yaml, validate_workspace - -from reana_server.config import SUPPORTED_COMPUTE_BACKENDS, WORKSPACE_RETENTION_PERIOD +from reana_commons.job_utils import kubernetes_memory_to_bytes + +from reana_server.config import ( + SUPPORTED_COMPUTE_BACKENDS, + WORKSPACE_RETENTION_PERIOD, + DASK_ENABLED, + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT, + REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY, +) from reana_server import utils @@ -153,3 +162,41 @@ def validate_retention_rule(rule: str, days: int) -> None: "Maximum workflow retention period was reached. " f"Please use less than {WORKSPACE_RETENTION_PERIOD} days." ) + + +def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None: + """Validate Dask workflows are allowed in the cluster and memory limits are respected.""" + # Validate Dask workflows are allowed in the cluster + dask_resources = reana_yaml["workflow"].get("resources", {}).get("dask", {}) + if not DASK_ENABLED and dask_resources != {}: + raise REANAValidationError("Dask workflows are not allowed in this cluster.") + + # Validate Dask memory limit requested by the workflow + if dask_resources: + single_worker_memory = dask_resources.get( + "single_worker_memory", REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY + ) + if kubernetes_memory_to_bytes( + single_worker_memory + ) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY): + raise REANAValidationError( + f'The "single_worker_memory" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY}).' + ) + + number_of_workers = int( + dask_resources.get( + "number_of_workers", REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS + ) + ) + requested_dask_cluster_memory = ( + kubernetes_memory_to_bytes(single_worker_memory) * number_of_workers + ) + + if requested_dask_cluster_memory > kubernetes_memory_to_bytes( + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT + ): + raise REANAValidationError( + f'The "memory" requested in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).\nDecrease the number of workers requested or amount of memory consumed by a single worker.' + ) + + return None