diff --git a/MANIFEST.in b/MANIFEST.in index 6057694e..11a72a5b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -24,6 +24,7 @@ recursive-include docs *.txt recursive-include reana_workflow_controller *.html recursive-include reana_workflow_controller *.py recursive-include reana_workflow_controller/templates *.template +recursive-include reana_workflow_controller/templates *.yaml recursive-include scripts *.py recursive-include tests *.py recursive-include tests *.finished diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index d9542bdd..57acb82c 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -11,6 +11,7 @@ import os import json +from distutils.util import strtobool from reana_commons.config import REANA_COMPONENT_PREFIX, SHARED_VOLUME_PATH from reana_db.models import JobStatus, RunStatus @@ -286,6 +287,63 @@ def _parse_interactive_sessions_environments(env_var): IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",") """Docker image pull secrets which allow the usage of private images.""" +DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) +"""Whether Dask is enabled in the cluster or not""" + +REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv( + "REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi" +) +"""Maximum memory limit for Dask clusters.""" + +REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int( + os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2) +) +"""Number of workers in Dask cluster by default """ + +REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv( + "REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi" +) +"""Memory for one Dask worker by default.""" + +REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv( + "REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi" +) +"""Maximum memory for one Dask worker.""" + +VOMSPROXY_CONTAINER_IMAGE = os.getenv( + "VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.3.0" +) +"""Default docker image of VOMSPROXY sidecar container.""" + +VOMSPROXY_CONTAINER_NAME = "voms-proxy" +"""Name of VOMSPROXY sidecar container.""" + +VOMSPROXY_CERT_CACHE_LOCATION = "/vomsproxy_cache/" +"""Directory of voms-proxy certificate cache. + +This directory is shared between job & VOMSPROXY container.""" + +VOMSPROXY_CERT_CACHE_FILENAME = "x509up_proxy" +"""Name of the voms-proxy certificate cache file.""" + +RUCIO_CONTAINER_IMAGE = os.getenv( + "RUCIO_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-rucio:1.1.1" +) +"""Default docker image of RUCIO sidecar container.""" + +RUCIO_CONTAINER_NAME = "reana-auth-rucio" +"""Name of RUCIO sidecar container.""" + +RUCIO_CACHE_LOCATION = "/rucio_cache/" +"""Directory of Rucio cache. + +This directory is shared between job & Rucio container.""" + +RUCIO_CFG_CACHE_FILENAME = "rucio.cfg" +"""Name of the RUCIO configuration cache file.""" + +RUCIO_CERN_BUNDLE_CACHE_FILENAME = "CERN-bundle.pem" +"""Name of the CERN Bundle cache file.""" ALIVE_STATUSES = [ RunStatus.created, diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index dd4f38b8..d096e9d6 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023 CERN. +# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -22,6 +22,8 @@ from reana_commons.k8s.api_client import ( current_k8s_batchv1_api_client, current_k8s_corev1_api_client, + current_k8s_custom_objects_api_client, + current_k8s_networking_api_client, ) from reana_commons.k8s.secrets import UserSecretsStore from reana_commons.utils import ( @@ -43,6 +45,9 @@ REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT, ) from reana_workflow_controller.errors import REANAWorkflowControllerError +from reana_workflow_controller.k8s import delete_dask_dashboard_ingress + +from reana_workflow_controller.dask import requires_dask try: from urllib import parse as urlparse @@ -161,6 +166,9 @@ def _update_workflow_status(workflow, status, logs): ) workflow.logs += "Workflow engine logs could not be retrieved.\n" + if requires_dask(workflow): + _delete_dask_cluster(workflow) + if RunStatus.should_cleanup_job(status): try: _delete_workflow_job(workflow) @@ -305,3 +313,25 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: # There might not be any pod returned by `list_namespaced_pod`, for example # when a workflow fails to be scheduled return "" + + +def _delete_dask_cluster(workflow: Workflow) -> None: + """Delete the Dask cluster resources.""" + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace="default", + name=f"reana-run-dask-{workflow.id_}", + ) + + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + name=f"dask-autoscaler-reana-run-dask-{workflow.id_}", + ) + delete_dask_dashboard_ingress( + f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_ + ) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py new file mode 100644 index 00000000..96e84433 --- /dev/null +++ b/reana_workflow_controller/dask.py @@ -0,0 +1,497 @@ +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Dask resource manager.""" +import logging +import os +import yaml + +from flask import current_app + +from reana_commons.config import ( + K8S_CERN_EOS_AVAILABLE, + K8S_CERN_EOS_MOUNT_CONFIGURATION, + KRB5_STATUS_FILE_LOCATION, + REANA_JOB_HOSTPATH_MOUNTS, + WORKFLOW_RUNTIME_USER_UID, +) +from reana_commons.k8s.api_client import ( + current_k8s_custom_objects_api_client, +) +from reana_commons.k8s.kerberos import get_kerberos_k8s_config +from reana_commons.k8s.secrets import UserSecretsStore +from reana_commons.k8s.volumes import ( + get_workspace_volume, + get_reana_shared_volume, +) + +from reana_workflow_controller.k8s import create_dask_dashboard_ingress + + +class DaskResourceManager: + """Dask resource manager.""" + + def __init__( + self, + cluster_name, + workflow_spec, + workflow_workspace, + user_id, + num_of_workers, + single_worker_memory, + ): + """Instantiate Dask resource manager. + + :param cluster_name: Name of the cluster + :type cluster_name: str + :param workflow_spec: REANA workflow specification + :type workflow_spec: dict + :param workflow_workspace: Workflow workspace path + :type workflow_workspace: str + :param user_id: Id of the user + :type user_id: str + """ + self.cluster_name = cluster_name + self.num_of_workers = num_of_workers + self.single_worker_memory = single_worker_memory + self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.workflow_spec = workflow_spec + self.workflow_workspace = workflow_workspace + self.workflow_id = workflow_workspace.split("/")[-1] + self.user_id = user_id + + self.cluster_spec = workflow_spec.get("resources", {}).get("dask", []) + self.cluster_body, self.autoscaler_body = self._load_dask_templates() + self.cluster_image = self.cluster_spec["image"] + self.dask_scheduler_uri = ( + f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786" + ) + + self.secrets_store = UserSecretsStore.fetch(self.user_id) + self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec() + self.secrets_volume_mount = ( + self.secrets_store.get_secrets_volume_mount_as_k8s_spec() + ) + self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID + + def _load_dask_templates(self): + """Load Dask templates from YAML files.""" + with open( + "reana_workflow_controller/templates/dask_cluster.yaml", "r" + ) as dask_cluster_yaml, open( + "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" + ) as dask_autoscaler_yaml: + dask_cluster_body = yaml.safe_load(dask_cluster_yaml) + dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) + dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = [] + dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = [] + dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ] = [] + dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = [] + + return dask_cluster_body, dask_autoscaler_body + + def create_dask_resources(self): + """Create necessary Dask resources for the workflow.""" + self._prepare_cluster() + self._create_dask_cluster() + self._create_dask_autoscaler() + create_dask_dashboard_ingress(self.cluster_name, self.workflow_id) + + def _prepare_cluster(self): + """Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers.""" + self._add_image_pull_secrets() + self._add_hostpath_volumes() + self._add_workspace_volume() + self._add_shared_volume() + self._add_eos_volume() + + # Add the name of the cluster, used in scheduler service name + self.cluster_body["metadata"] = {"name": self.cluster_name} + + # Add the name of the dask autoscaler + self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} + + self.cluster_body["spec"]["scheduler"]["service"]["selector"][ + "dask.org/cluster-name" + ] = self.cluster_name + + # Connect autoscaler to the cluster + self.autoscaler_body["spec"]["cluster"] = self.cluster_name + + # Add image to worker and scheduler + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "image" + ] = self.cluster_image + self.cluster_body["spec"]["scheduler"]["spec"]["containers"][0][ + "image" + ] = self.cluster_image + + # Create the worker command + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][ + 0 + ] = f'cd {self.workflow_workspace} && {self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][0]}' + + # Set resource limits for workers + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["resources"] = { + "limits": {"memory": f"{self.single_worker_memory}", "cpu": "1"} + } + + # Set max limit on autoscaler + self.autoscaler_body["spec"]["maximum"] = self.num_of_workers + + # Add DASK SCHEDULER URI env variable + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( + {"name": "DASK_SCHEDULER_URI", "value": self.dask_scheduler_uri}, + ) + + # Add secrets + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].extend( + self.secret_env_vars + ) + + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].append(self.secrets_volume_mount) + + self.cluster_body["spec"]["worker"]["spec"]["volumes"].append( + self.secrets_store.get_file_secrets_volume_as_k8s_specs() + ) + + # FIXME: Decide how to detect if krb5, rucio and voms_proxy are needed + kerberos = False + rucio = False + voms_proxy = False + + if kerberos: + self._add_krb5_containers() + if voms_proxy: + self._add_voms_proxy_init_container() + if rucio: + self._add_rucio_init_container() + + def _add_image_pull_secrets(self): + """Attach the configured image pull secrets to scheduler and worker containers.""" + image_pull_secrets = [] + for secret_name in current_app.config["IMAGE_PULL_SECRETS"]: + if secret_name: + image_pull_secrets.append({"name": secret_name}) + + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "imagePullSecrets" + ] = image_pull_secrets + + self.cluster_body["spec"]["scheduler"]["spec"]["containers"][0][ + "imagePullSecrets" + ] = image_pull_secrets + + def _add_workspace_volume(self): + """Add workspace volume to Dask workers.""" + volume_mount, volume = get_workspace_volume(self.workflow_workspace) + self._add_volumes([(volume_mount, volume)]) + + def _add_eos_volume(self): + """Add EOS volume to Dask cluster body.""" + if K8S_CERN_EOS_AVAILABLE: + self._add_volumes( + [ + ( + K8S_CERN_EOS_MOUNT_CONFIGURATION["volumeMounts"], + K8S_CERN_EOS_MOUNT_CONFIGURATION["volume"], + ) + ] + ) + + def _add_shared_volume(self): + """Add shared CephFS volume to Dask workers.""" + shared_volume = get_reana_shared_volume() + + if not any( + v["name"] == shared_volume["name"] + for v in self.cluster_body["spec"]["worker"]["spec"]["volumes"] + ): + self.cluster_body["spec"]["worker"]["spec"]["volumes"].append(shared_volume) + + def _add_hostpath_volumes(self): + """Add hostPath mounts from configuration to the Dask workers.""" + volumes_to_mount = [] + for mount in REANA_JOB_HOSTPATH_MOUNTS: + volume_mount = { + "name": mount["name"], + "mountPath": mount.get("mountPath", mount["hostPath"]), + } + volume = {"name": mount["name"], "hostPath": {"path": mount["hostPath"]}} + volumes_to_mount.append((volume_mount, volume)) + + self._add_volumes(volumes_to_mount) + + def _add_volumes(self, volumes): + """Add provided volumes to Dask cluster body.""" + for volume_mount, volume in volumes: + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].append(volume_mount) + self.cluster_body["spec"]["worker"]["spec"]["volumes"].append(volume) + + def _add_krb5_containers(self): + """Add krb5 init and renew containers for Dask workers.""" + krb5_config = get_kerberos_k8s_config( + self.secrets_store, + kubernetes_uid=self.kubernetes_uid, + ) + + self.cluster_body["spec"]["worker"]["spec"]["volumes"].extend( + krb5_config.volumes + ) + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].extend(krb5_config.volume_mounts) + # Add the Kerberos token cache file location to the job container + # so every instance of Kerberos picks it up even if it doesn't read + # the configuration file. + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].extend( + krb5_config.env + ) + # Add Kerberos init container used to generate ticket + self.cluster_body["spec"]["worker"]["spec"]["initContainers"].append( + krb5_config.init_container + ) + + # Add Kerberos renew container to renew ticket periodically for long-running jobs + self.cluster_body["spec"]["worker"]["spec"]["containers"].append( + krb5_config.renew_container + ) + + # Extend the main job command to create a file after it's finished + existing_args = self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "args" + ] + + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"] = [ + f"trap 'touch {KRB5_STATUS_FILE_LOCATION}' EXIT; " + existing_args[0] + ] + + def _add_voms_proxy_init_container(self): + """Add sidecar container for Dask workers.""" + ticket_cache_volume = {"name": "voms-proxy-cache", "emptyDir": {}} + volume_mounts = [ + { + "name": ticket_cache_volume["name"], + "mountPath": current_app.config["VOMSPROXY_CERT_CACHE_LOCATION"], + } + ] + + voms_proxy_file_path = os.path.join( + current_app.config["VOMSPROXY_CERT_CACHE_LOCATION"], + current_app.config["VOMSPROXY_CERT_CACHE_FILENAME"], + ) + + voms_proxy_vo = os.environ.get("VONAME", "") + voms_proxy_user_file = os.environ.get("VOMSPROXY_FILE", "") + + if voms_proxy_user_file: + # multi-user deployment mode, where we rely on VOMS proxy file supplied by the user + voms_proxy_container = { + "image": current_app.config["VOMSPROXY_CONTAINER_IMAGE"], + "command": ["/bin/bash"], + "args": [ + "-c", + 'if [ ! -f "/etc/reana/secrets/{voms_proxy_user_file}" ]; then \ + echo "[ERROR] VOMSPROXY_FILE {voms_proxy_user_file} does not exist in user secrets."; \ + exit; \ + fi; \ + cp /etc/reana/secrets/{voms_proxy_user_file} {voms_proxy_file_path}; \ + chown {kubernetes_uid} {voms_proxy_file_path}'.format( + voms_proxy_user_file=voms_proxy_user_file, + voms_proxy_file_path=voms_proxy_file_path, + kubernetes_uid=self.kubernetes_uid, + ), + ], + "name": current_app.config["VOMSPROXY_CONTAINER_NAME"], + "imagePullPolicy": "IfNotPresent", + "volumeMounts": [self.secrets_volume_mount] + volume_mounts, + "env": self.secret_env_vars, + } + else: + # single-user deployment mode, where we generate VOMS proxy file in the sidecar from user secrets + voms_proxy_container = { + "image": current_app.config["VOMSPROXY_CONTAINER_IMAGE"], + "command": ["/bin/bash"], + "args": [ + "-c", + 'if [ ! -f "/etc/reana/secrets/userkey.pem" ]; then \ + echo "[ERROR] File userkey.pem does not exist in user secrets."; \ + exit; \ + fi; \ + if [ ! -f "/etc/reana/secrets/usercert.pem" ]; then \ + echo "[ERROR] File usercert.pem does not exist in user secrets."; \ + exit; \ + fi; \ + if [ -z "$VOMSPROXY_PASS" ]; then \ + echo "[ERROR] Environment variable VOMSPROXY_PASS is not set in user secrets."; \ + exit; \ + fi; \ + if [ -z "$VONAME" ]; then \ + echo "[ERROR] Environment variable VONAME is not set in user secrets."; \ + exit; \ + fi; \ + cp /etc/reana/secrets/userkey.pem /tmp/userkey.pem; \ + chmod 400 /tmp/userkey.pem; \ + echo $VOMSPROXY_PASS | base64 -d | voms-proxy-init \ + --voms {voms_proxy_vo} --key /tmp/userkey.pem \ + --cert $(readlink -f /etc/reana/secrets/usercert.pem) \ + --pwstdin --out {voms_proxy_file_path}; \ + chown {kubernetes_uid} {voms_proxy_file_path}'.format( + voms_proxy_vo=voms_proxy_vo.lower(), + voms_proxy_file_path=voms_proxy_file_path, + kubernetes_uid=self.kubernetes_uid, + ), + ], + "name": current_app.config["VOMSPROXY_CONTAINER_NAME"], + "imagePullPolicy": "IfNotPresent", + "volumeMounts": [self.secrets_volume_mount] + volume_mounts, + "env": self.secret_env_vars, + } + + self.cluster_body["spec"]["worker"]["spec"]["volumes"].extend( + [ticket_cache_volume] + ) + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].extend(volume_mounts) + + # XrootD will look for a valid grid proxy in the location pointed to + # by the environment variable $X509_USER_PROXY + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( + {"name": "X509_USER_PROXY", "value": voms_proxy_file_path} + ) + + self.cluster_body["spec"]["worker"]["spec"]["initContainers"].append( + voms_proxy_container + ) + + def _add_rucio_init_container(self): + """Add sidecar container for Dask workers.""" + ticket_cache_volume = {"name": "rucio-cache", "emptyDir": {}} + volume_mounts = [ + { + "name": ticket_cache_volume["name"], + "mountPath": current_app.config["RUCIO_CACHE_LOCATION"], + } + ] + + rucio_config_file_path = os.path.join( + current_app.config["RUCIO_CACHE_LOCATION"], + current_app.config["RUCIO_CFG_CACHE_FILENAME"], + ) + + cern_bundle_path = os.path.join( + current_app.config["RUCIO_CACHE_LOCATION"], + current_app.config["RUCIO_CERN_BUNDLE_CACHE_FILENAME"], + ) + + rucio_account = os.environ.get("RUCIO_USERNAME", "") + voms_proxy_vo = os.environ.get("VONAME", "") + + # Detect Rucio hosts from VO names + if voms_proxy_vo == "atlas": + rucio_host = "https://voatlasrucio-server-prod.cern.ch" + rucio_auth_host = "https://voatlasrucio-auth-prod.cern.ch" + else: + rucio_host = f"https://{voms_proxy_vo}-rucio.cern.ch" + rucio_auth_host = f"https://{voms_proxy_vo}-rucio-auth.cern.ch" + + # Allow overriding detected Rucio hosts by user-provided environment variables + rucio_host = os.environ.get("RUCIO_RUCIO_HOST", rucio_host) + rucio_auth_host = os.environ.get("RUCIO_AUTH_HOST", rucio_auth_host) + + rucio_config_container = { + "image": current_app.config["RUCIO_CONTAINER_IMAGE"], + "command": ["/bin/bash"], + "args": [ + "-c", + 'if [ -z "$VONAME" ]; then \ + echo "[ERROR] Environment variable VONAME is not set in user secrets."; \ + exit; \ + fi; \ + if [ -z "$RUCIO_USERNAME" ]; then \ + echo "[ERROR] Environment variable RUCIO_USERNAME is not set in user secrets."; \ + exit; \ + fi; \ + export RUCIO_CFG_ACCOUNT={rucio_account} \ + RUCIO_CFG_CLIENT_VO={voms_proxy_vo} \ + RUCIO_CFG_RUCIO_HOST={rucio_host} \ + RUCIO_CFG_AUTH_HOST={rucio_auth_host}; \ + cp /etc/pki/tls/certs/CERN-bundle.pem {cern_bundle_path}; \ + j2 /opt/user/rucio.cfg.j2 > {rucio_config_file_path}'.format( + rucio_host=rucio_host, + rucio_auth_host=rucio_auth_host, + rucio_account=rucio_account, + voms_proxy_vo=voms_proxy_vo, + cern_bundle_path=cern_bundle_path, + rucio_config_file_path=rucio_config_file_path, + ), + ], + "name": current_app.config["RUCIO_CONTAINER_NAME"], + "imagePullPolicy": "IfNotPresent", + "volumeMounts": [self.secrets_volume_mount] + volume_mounts, + "env": self.secret_env_vars, + } + + self.cluster_body["spec"]["worker"]["spec"]["volumes"].extend( + [ticket_cache_volume] + ) + self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ + "volumeMounts" + ].extend(volume_mounts) + + self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( + {"name": "RUCIO_CONFIG", "value": rucio_config_file_path} + ) + + self.cluster_body["spec"]["worker"]["spec"]["initContainers"].append( + rucio_config_container + ) + + def _create_dask_cluster(self): + """Create Dask cluster resource.""" + try: + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace="default", + body=self.cluster_body, + ) + except Exception: + logging.exception( + "An error occurred while trying to create a Dask cluster." + ) + raise + + def _create_dask_autoscaler(self): + """Create Dask autoscaler resource.""" + try: + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + body=self.autoscaler_body, + ) + except Exception: + logging.exception( + "An error occurred while trying to create a Dask autoscaler." + ) + raise + + +def requires_dask(workflow): + """Check whether Dask is necessary to run the workflow.""" + return bool( + workflow.reana_specification["workflow"].get("resources", {}).get("dask", False) + ) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 731509ef..5661f2c5 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -16,6 +16,7 @@ current_k8s_appsv1_api_client, current_k8s_corev1_api_client, current_k8s_networking_api_client, + current_k8s_custom_objects_api_client, ) from reana_commons.k8s.secrets import UserSecretsStore from reana_commons.k8s.volumes import ( @@ -398,3 +399,81 @@ def delete_k8s_ingress_object(ingress_name, namespace): "Exception when calling ExtensionsV1beta1->" "Api->delete_namespaced_ingress: {}\n".format(k8s_api_exception) ) + + +def create_dask_dashboard_ingress(cluster_name, workflow_id): + """Create K8S Ingress object for Dask dashboard.""" + # Define the middleware spec + middleware_spec = { + "apiVersion": "traefik.io/v1alpha1", + "kind": "Middleware", + "metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"}, + "spec": { + "replacePathRegex": { + "regex": f"/{workflow_id}/dashboard/*", + "replacement": "/$1", + } + }, + } + + ingress = client.V1Ingress( + api_version="networking.k8s.io/v1", + kind="Ingress", + metadata=client.V1ObjectMeta( + name=f"dask-dashboard-ingress-{cluster_name}", + annotations={ + **REANA_INGRESS_ANNOTATIONS, + "traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd", + }, + ), + spec=client.V1IngressSpec( + rules=[ + client.V1IngressRule( + host=REANA_INGRESS_HOST, + http=client.V1HTTPIngressRuleValue( + paths=[ + client.V1HTTPIngressPath( + path=f"/{workflow_id}/dashboard", + path_type="Prefix", + backend=client.V1IngressBackend( + service=client.V1IngressServiceBackend( + name=f"{cluster_name}-scheduler", + port=client.V1ServiceBackendPort(number=8787), + ) + ), + ) + ] + ), + ) + ] + ), + ) + if REANA_INGRESS_CLASS_NAME: + ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME + + # Create middleware for ingress + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace="default", + plural="middlewares", + body=middleware_spec, + ) + # Create the ingress resource + current_k8s_networking_api_client.create_namespaced_ingress( + namespace="default", body=ingress + ) + + +def delete_dask_dashboard_ingress(cluster_name, workflow_id): + """Delete K8S Ingress Object for Dask dashboard.""" + current_k8s_networking_api_client.delete_namespaced_ingress( + name=cluster_name, namespace="default", body=client.V1DeleteOptions() + ) + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace="default", + plural="middlewares", + name=f"replacepath-{workflow_id}", + ) diff --git a/reana_workflow_controller/templates/dask_autoscaler.yaml b/reana_workflow_controller/templates/dask_autoscaler.yaml new file mode 100644 index 00000000..86792c14 --- /dev/null +++ b/reana_workflow_controller/templates/dask_autoscaler.yaml @@ -0,0 +1,4 @@ +apiVersion: kubernetes.dask.org/v1 +kind: DaskAutoscaler +spec: + minimum: 0 \ No newline at end of file diff --git a/reana_workflow_controller/templates/dask_cluster.yaml b/reana_workflow_controller/templates/dask_cluster.yaml new file mode 100644 index 00000000..12098eb9 --- /dev/null +++ b/reana_workflow_controller/templates/dask_cluster.yaml @@ -0,0 +1,55 @@ +apiVersion: kubernetes.dask.org/v1 +kind: DaskCluster +spec: + worker: + replicas: 0 + spec: + containers: + - name: worker + imagePullPolicy: "IfNotPresent" + command: ["/bin/sh", "-c"] + args: + - exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788 + ports: + - name: http-dashboard + containerPort: 8788 + protocol: TCP + scheduler: + spec: + containers: + - name: scheduler + imagePullPolicy: "IfNotPresent" + args: + - dask-scheduler + ports: + - name: tcp-comm + containerPort: 8786 + protocol: TCP + - name: http-dashboard + containerPort: 8787 + protocol: TCP + readinessProbe: + httpGet: + port: http-dashboard + path: /health + initialDelaySeconds: 5 + periodSeconds: 10 + livenessProbe: + httpGet: + port: http-dashboard + path: /health + initialDelaySeconds: 15 + periodSeconds: 20 + service: + type: ClusterIP + selector: + dask.org/component: scheduler + ports: + - name: tcp-comm + protocol: TCP + port: 8786 + targetPort: "tcp-comm" + - name: http-dashboard + protocol: TCP + port: 8787 + targetPort: "http-dashboard" \ No newline at end of file diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index ec5ed776..ba870374 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -52,6 +52,8 @@ from reana_workflow_controller.errors import REANAInteractiveSessionError +from reana_workflow_controller.dask import DaskResourceManager, requires_dask + from reana_workflow_controller.k8s import ( build_interactive_k8s_objects, delete_k8s_ingress_object, @@ -83,6 +85,8 @@ WORKFLOW_ENGINE_SERIAL_ENV_VARS, WORKFLOW_ENGINE_SNAKEMAKE_ENV_VARS, WORKFLOW_ENGINE_YADAGE_ENV_VARS, + REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS, + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, ) @@ -369,13 +373,37 @@ def start_batch_workflow_run( ) try: - # Create PVC needed for CVMFS repos - if self.retrieve_required_cvmfs_repos(): - create_cvmfs_persistent_volume_claim() + # Create the dask cluster and required resources + if requires_dask(self.workflow): + DaskResourceManager( + cluster_name=f"reana-run-dask-{self.workflow.id_}", + workflow_spec=self.workflow.reana_specification["workflow"], + workflow_workspace=self.workflow.workspace_path, + user_id=self.workflow.owner_id, + num_of_workers=self.workflow.reana_specification["workflow"] + .get("resources", {}) + .get("dask", {}) + .get( + "number_of_workers", + REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS, + ), + single_worker_memory=self.workflow.reana_specification["workflow"] + .get("resources", {}) + .get("dask", {}) + .get( + "single_worker_memory", + REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY, + ), + ).create_dask_resources() current_k8s_batchv1_api_client.create_namespaced_job( namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=job ) + + # Create PVC needed for CVMFS repos + if self.retrieve_required_cvmfs_repos(): + create_cvmfs_persistent_volume_claim() + except ApiException as e: msg = "Workflow engine/job controller pod " "creation failed {}".format(e) logging.error(msg, exc_info=True) @@ -596,6 +624,7 @@ def _create_job_spec( command=["/bin/bash", "-c"], args=command, ) + workflow_engine_env_vars.extend( [ { @@ -725,6 +754,13 @@ def _create_job_spec( "value": os.getenv("REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL"), }, ) + if requires_dask(self.workflow): + job_controller_container.env.append( + { + "name": "DASK_SCHEDULER_URI", + "value": f"reana-run-dask-{self.workflow.id_}-scheduler.default.svc.cluster.local:8786", + }, + ) secrets_volume_mount = user_secrets.get_secrets_volume_mount_as_k8s_spec() job_controller_container.volume_mounts = [workspace_mount, secrets_volume_mount] diff --git a/tests/conftest.py b/tests/conftest.py index 8782ca8a..30a79309 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,11 +10,15 @@ from __future__ import absolute_import, print_function +import logging import os import shutil import uuid +import yaml +from flask import current_app import pytest +from mock import patch from reana_db.models import ( Base, Job, @@ -23,7 +27,10 @@ WorkspaceRetentionAuditLog, WorkspaceRetentionRule, ) +from reana_commons.k8s.secrets import UserSecretsStore, UserSecrets, Secret + from sqlalchemy_utils import create_database, database_exists, drop_database +from sqlalchemy.orm.attributes import flag_modified from reana_workflow_controller.config import ( REANA_INTERACTIVE_SESSIONS_DEFAULT_IMAGES, @@ -31,6 +38,7 @@ REANA_INTERACTIVE_SESSIONS_RECOMMENDED_IMAGES, ) from reana_workflow_controller.factory import create_app +from reana_workflow_controller.dask import DaskResourceManager @pytest.fixture(scope="module") @@ -152,3 +160,64 @@ def interactive_session_environments(monkeypatch): "jupyter", {"docker_image_1", "docker_image_2"}, ) + + +@pytest.fixture() +def sample_serial_workflow_in_db_with_dask(session, sample_serial_workflow_in_db): + """Sample workflow with Dask resource.""" + workflow = sample_serial_workflow_in_db + new_reana_spec = workflow.reana_specification.copy() + new_reana_spec["workflow"]["resources"] = { + "dask": { + "image": "coffeateam/coffea-dask-almalinux8", + "memory": "10M", + } + } + workflow.reana_specification = new_reana_spec + flag_modified(workflow, "reana_specification") + + session.add(workflow) + session.commit() + + yield workflow + + +@pytest.fixture +def mock_user_secrets(monkeypatch): + user_id = uuid.uuid4() + user_secrets = UserSecrets( + user_id=str(user_id), + k8s_secret_name="k8s-secret", + secrets=[Secret(name="third_env", type_="env", value="3")], + ) + monkeypatch.setattr( + UserSecretsStore, + "fetch", + lambda _: user_secrets, + ) + return user_secrets + + +@pytest.fixture +def dask_resource_manager(sample_serial_workflow_in_db_with_dask, mock_user_secrets): + """Fixture to create a DaskResourceManager instance.""" + manager = DaskResourceManager( + cluster_name="test-cluster", + workflow_spec=sample_serial_workflow_in_db_with_dask.reana_specification[ + "workflow" + ], + workflow_workspace="/path/to/workspace", + user_id="user-123", + num_of_workers=2, + single_worker_memory="256Mi", + ) + return manager + + +@pytest.fixture +def mock_k8s_client(): + with patch( + "reana_workflow_controller.dask.current_k8s_custom_objects_api_client" + ) as mock_client: + mock_client.create_namespaced_custom_object.return_value = None + yield mock_client diff --git a/tests/test_dask.py b/tests/test_dask.py new file mode 100644 index 00000000..142e2420 --- /dev/null +++ b/tests/test_dask.py @@ -0,0 +1,475 @@ +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +import pytest +from uuid import uuid4 + +from flask import current_app +from mock import patch, MagicMock + + +from reana_workflow_controller.dask import requires_dask + + +@pytest.mark.parametrize( + "workflow_fixture, expected_result", + [ + ("sample_serial_workflow_in_db", False), + ("sample_serial_workflow_in_db_with_dask", True), + ], +) +def test_requires_dask(workflow_fixture, expected_result, request): + """Test requires_dask with 2 workflows one of which uses Dask and other not.""" + workflow = request.getfixturevalue(workflow_fixture) + result = requires_dask(workflow) + assert ( + result == expected_result + ), f"Expected requires_dask to return {expected_result} for {workflow_fixture}" + + +def test_create_dask_cluster(mock_k8s_client, dask_resource_manager): + """Test creation of dask cluster.""" + # Arrange + dask_resource_manager.cluster_body = {"mock": "cluster_body"} + + # Act + dask_resource_manager._create_dask_cluster() + + # Assert + mock_k8s_client.create_namespaced_custom_object.assert_called_once_with( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace="default", + body={"mock": "cluster_body"}, + ) + + +def test_create_dask_autoscaler( + mock_k8s_client, dask_resource_manager, mock_user_secrets +): + """Test creation of dask autoscaler.""" + # Arrange + dask_resource_manager.autoscaler_body = {"mock": "autoscaler_body"} + + # Act + dask_resource_manager._create_dask_autoscaler() + + # Assert + mock_k8s_client.create_namespaced_custom_object.assert_called_once_with( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + body={"mock": "autoscaler_body"}, + ) + + +def test_add_image_pull_secrets(dask_resource_manager): + """Test _add_image_pull_secrets function.""" + # Arrange + with patch.object(current_app, "config", {"IMAGE_PULL_SECRETS": ["my-secret"]}): + dask_resource_manager.cluster_body = { + "spec": { + "worker": {"spec": {"containers": [{}]}}, + "scheduler": {"spec": {"containers": [{}]}}, + } + } + + # Act + dask_resource_manager._add_image_pull_secrets() + + # Assert + expected_image_pull_secrets = [{"name": "my-secret"}] + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ]["imagePullSecrets"] + == expected_image_pull_secrets + ) + assert ( + dask_resource_manager.cluster_body["spec"]["scheduler"]["spec"][ + "containers" + ][0]["imagePullSecrets"] + == expected_image_pull_secrets + ) + + +def test_add_hostpath_volumes_with_mounts( + mock_k8s_client, dask_resource_manager, mock_user_secrets +): + """Test _add_hostpath_volumes function.""" + REANA_JOB_HOSTPATH_MOUNTS = [ + { + "name": "volume1", + "hostPath": "/host/path/volume1", + "mountPath": "/container/path/volume1", + }, + { + "name": "volume2", + "hostPath": "/host/path/volume2", + }, + ] + # Arrange + with patch( + "reana_workflow_controller.dask.REANA_JOB_HOSTPATH_MOUNTS", + REANA_JOB_HOSTPATH_MOUNTS, + ): + dask_resource_manager.cluster_body = { + "spec": { + "worker": { + "spec": {"containers": [{"volumeMounts": []}], "volumes": []} + }, + } + } + + # Act + dask_resource_manager._add_hostpath_volumes() + + # Assert + expected_volume_mounts = [ + {"name": "volume1", "mountPath": "/container/path/volume1"}, + { + "name": "volume2", + "mountPath": "/host/path/volume2", + }, + ] + expected_volumes = [ + {"name": "volume1", "hostPath": {"path": "/host/path/volume1"}}, + {"name": "volume2", "hostPath": {"path": "/host/path/volume2"}}, + ] + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ]["volumeMounts"] + == expected_volume_mounts + ) + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["volumes"] + == expected_volumes + ) + + +def test_create_dask_resources(dask_resource_manager): + """Test create_dask_resources method.""" + # Patch internal methods that should be called + with patch.object( + dask_resource_manager, "_prepare_cluster" + ) as mock_prepare_cluster, patch.object( + dask_resource_manager, "_create_dask_cluster" + ) as mock_create_cluster, patch.object( + dask_resource_manager, "_create_dask_autoscaler" + ) as mock_create_autoscaler, patch( + "reana_workflow_controller.dask.create_dask_dashboard_ingress" + ) as mock_create_dashboard_ingress: + + # Act + dask_resource_manager.create_dask_resources() + + # Assert + mock_prepare_cluster.assert_called_once() + mock_create_cluster.assert_called_once() + mock_create_autoscaler.assert_called_once() + mock_create_dashboard_ingress.assert_called_once_with( + dask_resource_manager.cluster_name, dask_resource_manager.workflow_id + ) + + +def test_add_workspace_volume(dask_resource_manager): + """Test _add_workspace_volume method.""" + # Mock the get_workspace_volume function + with patch( + "reana_workflow_controller.dask.get_workspace_volume" + ) as mock_get_workspace_volume, patch.object( + dask_resource_manager, "_add_volumes" + ) as mock_add_volumes: + + mock_volume_mount = {"name": "workspace-volume-mount"} + mock_volume = {"name": "workspace-volume"} + mock_get_workspace_volume.return_value = (mock_volume_mount, mock_volume) + + # Act + dask_resource_manager._add_workspace_volume() + + # Assert + mock_get_workspace_volume.assert_called_once_with( + dask_resource_manager.workflow_workspace + ) + mock_add_volumes.assert_called_once_with([(mock_volume_mount, mock_volume)]) + + +def test_add_eos_volume_when_eos_is_available(dask_resource_manager): + """Test _add_eos_volume method when EOS is available.""" + # Mock the configuration and the method _add_volumes + with patch("reana_workflow_controller.dask.K8S_CERN_EOS_AVAILABLE", True), patch( + "reana_workflow_controller.dask.K8S_CERN_EOS_MOUNT_CONFIGURATION", + { + "volumeMounts": {"name": "eos-volume-mount"}, + "volume": {"name": "eos-volume"}, + }, + ), patch.object(dask_resource_manager, "_add_volumes") as mock_add_volumes: + + # Act + dask_resource_manager._add_eos_volume() + + # Assert + mock_add_volumes.assert_called_once_with( + [({"name": "eos-volume-mount"}, {"name": "eos-volume"})] + ) + + +def test_add_eos_volume_when_eos_is_not_available(dask_resource_manager): + """Test _add_eos_volume method when EOS is not available.""" + with patch( + "reana_workflow_controller.dask.K8S_CERN_EOS_AVAILABLE", False + ), patch.object(dask_resource_manager, "_add_volumes") as mock_add_volumes: + + # Act + dask_resource_manager._add_eos_volume() + + # Assert + mock_add_volumes.assert_not_called() + + +def test_add_shared_volume_not_already_added(dask_resource_manager): + """Test _add_shared_volume when shared volume is not already in the list.""" + with patch( + "reana_workflow_controller.dask.get_reana_shared_volume" + ) as mock_get_shared_volume: + + mock_shared_volume = {"name": "shared-volume"} + mock_get_shared_volume.return_value = mock_shared_volume + + dask_resource_manager.cluster_body = { + "spec": {"worker": {"spec": {"volumes": []}}} + } + + # Act + dask_resource_manager._add_shared_volume() + + # Assert + assert ( + mock_shared_volume + in dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["volumes"] + ) + mock_get_shared_volume.assert_called_once() + + +def test_add_shared_volume_already_added(dask_resource_manager): + """Test _add_shared_volume when shared volume is already in the list.""" + with patch( + "reana_workflow_controller.dask.get_reana_shared_volume" + ) as mock_get_shared_volume: + + mock_shared_volume = {"name": "shared-volume"} + mock_get_shared_volume.return_value = mock_shared_volume + + dask_resource_manager.cluster_body = { + "spec": { + "worker": { + "spec": { + "volumes": [ + mock_shared_volume + ] # Already contains the shared volume + } + } + } + } + + # Act + dask_resource_manager._add_shared_volume() + + # Assert + assert ( + len(dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["volumes"]) + == 1 + ) + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["volumes"][0] + == mock_shared_volume + ) + mock_get_shared_volume.assert_called_once() + + +def test_add_krb5_containers(dask_resource_manager): + """Test _add_krb5_containers method.""" + with patch( + "reana_workflow_controller.dask.get_kerberos_k8s_config" + ) as mock_get_krb5_config, patch( + "reana_workflow_controller.dask.KRB5_STATUS_FILE_LOCATION", "/tmp/krb5_status" + ): + + KRB5_STATUS_FILE_LOCATION = "/tmp/krb5_status" + + mock_krb5_config = MagicMock() + mock_krb5_config.volumes = [{"name": "krb5-volume"}] + mock_krb5_config.volume_mounts = [{"mountPath": "/krb5"}] + mock_krb5_config.env = [{"name": "KRB5CCNAME", "value": "/tmp/krb5cc"}] + mock_krb5_config.init_container = {"name": "krb5-init-container"} + mock_krb5_config.renew_container = {"name": "krb5-renew-container"} + mock_get_krb5_config.return_value = mock_krb5_config + + dask_resource_manager.cluster_body = { + "spec": { + "worker": { + "spec": { + "volumes": [], + "containers": [ + {"volumeMounts": [], "env": [], "args": ["some-command"]} + ], + "initContainers": [], + } + } + } + } + + # Act + dask_resource_manager._add_krb5_containers() + + # Assert + assert {"name": "krb5-volume"} in dask_resource_manager.cluster_body["spec"][ + "worker" + ]["spec"]["volumes"] + + assert {"mountPath": "/krb5"} in dask_resource_manager.cluster_body["spec"][ + "worker" + ]["spec"]["containers"][0]["volumeMounts"] + + assert { + "name": "KRB5CCNAME", + "value": "/tmp/krb5cc", + } in dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ][ + "env" + ] + + assert {"name": "krb5-init-container"} in dask_resource_manager.cluster_body[ + "spec" + ]["worker"]["spec"]["initContainers"] + + assert {"name": "krb5-renew-container"} in dask_resource_manager.cluster_body[ + "spec" + ]["worker"]["spec"]["containers"] + + expected_args = [f"trap 'touch {KRB5_STATUS_FILE_LOCATION}' EXIT; some-command"] + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ]["args"] + == expected_args + ) + + +def test_prepare_cluster(dask_resource_manager): + """Test _prepare_cluster method.""" + # Mock the private methods that should be called + with patch.object( + dask_resource_manager, "_add_image_pull_secrets" + ) as mock_add_image_pull_secrets, patch.object( + dask_resource_manager, "_add_hostpath_volumes" + ) as mock_add_hostpath_volumes, patch.object( + dask_resource_manager, "_add_workspace_volume" + ) as mock_add_workspace_volume, patch.object( + dask_resource_manager, "_add_shared_volume" + ) as mock_add_shared_volume, patch.object( + dask_resource_manager, "_add_eos_volume" + ) as mock_add_eos_volume, patch.object( + dask_resource_manager.secrets_store, "get_file_secrets_volume_as_k8s_specs" + ) as mock_get_file_secrets_volume: + + mock_get_file_secrets_volume.return_value = {"name": "secrets-volume"} + + dask_resource_manager.cluster_body = { + "spec": { + "worker": { + "spec": { + "containers": [ + {"args": ["worker-command"], "env": [], "volumeMounts": []} + ], + "volumes": [], + } + }, + "scheduler": { + "spec": {"containers": [{}]}, + "service": {"selector": {}}, + }, + } + } + dask_resource_manager.autoscaler_body = {"spec": {}} + + # Act + dask_resource_manager._prepare_cluster() + + # Assert + mock_add_image_pull_secrets.assert_called_once() + mock_add_hostpath_volumes.assert_called_once() + mock_add_workspace_volume.assert_called_once() + mock_add_shared_volume.assert_called_once() + mock_add_eos_volume.assert_called_once() + + assert ( + dask_resource_manager.cluster_body["metadata"]["name"] + == dask_resource_manager.cluster_name + ) + assert ( + dask_resource_manager.autoscaler_body["metadata"]["name"] + == dask_resource_manager.autoscaler_name + ) + + assert { + "name": "DASK_SCHEDULER_URI", + "value": dask_resource_manager.dask_scheduler_uri, + } in dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ][ + "env" + ] + + expected_command = ( + f"cd {dask_resource_manager.workflow_workspace} && worker-command" + ) + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ]["args"][0] + == expected_command + ) + + assert ( + dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][ + 0 + ]["image"] + == dask_resource_manager.cluster_image + ) + assert ( + dask_resource_manager.cluster_body["spec"]["scheduler"]["spec"][ + "containers" + ][0]["image"] + == dask_resource_manager.cluster_image + ) + + assert ( + dask_resource_manager.secrets_volume_mount + in dask_resource_manager.cluster_body["spec"]["worker"]["spec"][ + "containers" + ][0]["volumeMounts"] + ) + assert {"name": "secrets-volume"} in dask_resource_manager.cluster_body["spec"][ + "worker" + ]["spec"]["volumes"] + + assert ( + dask_resource_manager.autoscaler_body["spec"]["cluster"] + == dask_resource_manager.cluster_name + ) + + assert ( + dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][ + "selector" + ]["dask.org/cluster-name"] + == dask_resource_manager.cluster_name + )