Skip to content

Commit

Permalink
feat(dask): adjust parameters (reanahub#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 24, 2024
1 parent 73ab0ea commit 989c87e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 93 deletions.
34 changes: 12 additions & 22 deletions reana_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,27 @@
)

DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_CORES_LIMIT = float(
os.getenv("REANA_DASK_CLUSTER_MAX_CORES_LIMIT", 8)
)
"""Maximum cores limit for dask clusters."""
"""Whether Dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "8Gi"
)
"""Maximum memory limit for dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = float(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT", 4)
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
)
"""Default cores limit for dask clusters."""
"""Maximum memory limit for Dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT", "4Gi"
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2)
)
"""Default memory limit for dask clusters."""
"""Number of workers in Dask cluster by default """

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES = float(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES", 0.5)
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi"
)
"""Number of cores for one dask worker by default."""
"""Memory for one Dask worker by default."""

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "512Mi"
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi"
)
"""Memory for one dask worker by default."""
"""Maximum memory for one Dask worker."""

REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
"""Maximum memory limit for user job containers for workflow complexity estimation."""
Expand Down
90 changes: 28 additions & 62 deletions reana_server/rest/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT,
REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_MAX_CORES_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)
from reana_server.decorators import signin_required

Expand Down Expand Up @@ -139,42 +137,28 @@ def info(user, **kwargs): # noqa
value:
type: string
type: object
dask_cluster_default_cores_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_cores_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_memory_limit:
dask_cluster_max_memory_limit:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_max_memory_limit:
dask_cluster_default_number_of_workers:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_single_worker_cores:
dask_cluster_default_single_worker_memory:
properties:
title:
type: string
value:
type: string
type: object
dask_cluster_default_single_worker_memory:
dask_cluster_max_single_worker_memory:
properties:
title:
type: string
Expand Down Expand Up @@ -236,29 +220,21 @@ def info(user, **kwargs): # noqa
"title": "Dask workflows allowed in the cluster",
"value": "False"
},
"dask_cluster_default_cores_limit": {
"title": "Default cores limit for dask clusters",
"value": "4"
},
"dask_cluster_max_cores_limit": {
"title": "Maximum cores limit for dask clusters",
"value": "8"
},
"dask_cluster_default_memory_limit": {
"title": "Default memory limit for dask clusters",
"value": "2Gi"
},
"dask_cluster_max_memory_limit": {
"title": "Maximum memory limit for dask clusters",
"value": "4Gi"
"title": "Maximum memory limit for Dask clusters",
"value": "16Gi"
},
"dask_cluster_default_single_worker_cores": {
"title": "Number of cores for one dask worker by default",
"value": "0.5"
"dask_cluster_default_number_of_workers": {
"title": "Default number of workers for Dask clusters",
"value": "2Gi"
},
"dask_cluster_default_single_worker_memory": {
"title": "Amount of memory for one dask worker by default",
"value": "256Mi"
"title": "Amount of memory for one Dask worker by default",
"value": "2Gi"
},
"dask_cluster_max_single_worker_memory": {
"title": "Maximum amount of memory for one Dask worker",
"value": "8Gi"
},
}
500:
Expand Down Expand Up @@ -318,30 +294,22 @@ def info(user, **kwargs): # noqa
),
)
if DASK_ENABLED:
cluster_information["dask_cluster_default_cores_limit"] = dict(
title="Default cores limit for dask clusters",
value=REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT,
)
cluster_information["dask_cluster_max_cores_limit"] = dict(
title="Maximum cores limit for dask clusters",
value=REANA_DASK_CLUSTER_MAX_CORES_LIMIT,
)
cluster_information["dask_cluster_default_memory_limit"] = dict(
title="Default memory limit for dask clusters",
value=REANA_DASK_CLUSTER_DEFAULT_MEMORY_LIMIT,
cluster_information["dask_cluster_default_number_of_workers"] = dict(
title="Number of workers in Dask clusters by default",
value=REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
)
cluster_information["dask_cluster_max_memory_limit"] = dict(
title="Maximum memory limit for dask clusters",
title="Maximum memory limit for Dask clusters",
value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
)
cluster_information["dask_cluster_default_single_worker_cores"] = dict(
title="Number of cores for one dask worker by default",
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_CORES,
)
cluster_information["dask_cluster_default_single_worker_memory"] = dict(
title="Memory for one dask worker by default",
title="Memory for one Dask worker by default",
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
)
cluster_information["dask_cluster_max_single_worker_memory"] = dict(
title="Maximum memory for one Dask worker",
value=REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)

return InfoSchema().dump(cluster_information)

Expand Down Expand Up @@ -388,9 +356,7 @@ class InfoSchema(Schema):
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
dask_enabled = fields.Nested(StringInfoValue)
if DASK_ENABLED:
dask_cluster_default_cores_limit = fields.Nested(StringInfoValue)
dask_cluster_max_cores_limit = fields.Nested(StringInfoValue)
dask_cluster_default_memory_limit = fields.Nested(StringInfoValue)
dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue)
dask_cluster_max_memory_limit = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_cores = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue)
dask_cluster_max_single_worker_memory = fields.Nested(StringInfoValue)
32 changes: 23 additions & 9 deletions reana_server/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
WORKSPACE_RETENTION_PERIOD,
DASK_ENABLED,
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
REANA_DASK_CLUSTER_MAX_CORES_LIMIT,
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
)
from reana_server import utils

Expand Down Expand Up @@ -171,18 +173,30 @@ def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None:

# Validate Dask memory limit requested by the workflow
if dask_resources:
requested_dask_cluster_memory = dask_resources.get("memory", "0Mi")
single_worker_memory = dask_resources.get(
"single_worker_memory", REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY
)
if kubernetes_memory_to_bytes(
requested_dask_cluster_memory
) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT):
single_worker_memory
) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY):
raise REANAValidationError(
f'The "memory" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).'
f'The "single_worker_memory" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY}).'
)

number_of_workers = int(
dask_resources.get(
"number_of_workers", REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS
)
# Validate Dask cores limit requested by the workflow
requested_dask_cluster_cores = dask_resources.get("cores", 0)
if requested_dask_cluster_cores > REANA_DASK_CLUSTER_MAX_CORES_LIMIT:
)
requested_dask_cluster_memory = (
kubernetes_memory_to_bytes(single_worker_memory) * number_of_workers
)

if requested_dask_cluster_memory > kubernetes_memory_to_bytes(
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT
):
raise REANAValidationError(
f'The "cores" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_CORES_LIMIT}).'
f'The "memory" requested in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).\n Decrease the number of workers requested or amount of memory consumed by a single worker.'
)

return None

0 comments on commit 989c87e

Please sign in to comment.