Skip to content

Commit

Permalink
refactor(dask): remove hard-coded values for dask components (reanahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Nov 21, 2024
1 parent 0a59ccb commit e549351
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 29 deletions.
11 changes: 5 additions & 6 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
calculate_hash_of_dir,
calculate_job_input_hash,
build_unique_component_name,
get_dask_component_name,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus, Service
Expand Down Expand Up @@ -323,7 +324,7 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
version="v1",
plural="daskclusters",
namespace="default",
name=f"reana-run-dask-{workflow.id_}",
name=get_dask_component_name(workflow.id_, "cluster"),
)

if DASK_AUTOSCALER_ENABLED:
Expand All @@ -332,16 +333,14 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
name=get_dask_component_name(workflow.id_, "autoscaler"),
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
delete_dask_dashboard_ingress(workflow.id_)

dask_service = (
Session.query(Service)
.filter_by(name=f"dask-service-{workflow.id_}")
.filter_by(name=get_dask_component_name(workflow.id_, "db_service"))
.one_or_none()
)
workflow.services.remove(dask_service)
Expand Down
20 changes: 12 additions & 8 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress
Expand All @@ -38,7 +39,7 @@ class DaskResourceManager:

def __init__(
self,
cluster_name,
workflow_id,
workflow_spec,
workflow_workspace,
user_id,
Expand All @@ -56,10 +57,9 @@ def __init__(
:param user_id: Id of the user
:type user_id: str
"""
self.cluster_name = cluster_name
self.cluster_name = get_dask_component_name(workflow_id, "cluster")
self.num_of_workers = num_of_workers
self.single_worker_memory = single_worker_memory
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.workflow_spec = workflow_spec
self.workflow_workspace = workflow_workspace
self.workflow_id = workflow_workspace.split("/")[-1]
Expand All @@ -68,9 +68,7 @@ def __init__(
self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = (
f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786"
)
self.dask_scheduler_uri = get_dask_scheduler_uri(workflow_id)

self.secrets_store = UserSecretsStore.fetch(self.user_id)
self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec()
Expand All @@ -80,7 +78,7 @@ def __init__(
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.autoscaler_name = get_dask_component_name(workflow_id, "autoscaler")
self.autoscaler_body = self._load_dask_autoscaler_template()

def _load_dask_cluster_template(self):
Expand Down Expand Up @@ -116,7 +114,7 @@ def create_dask_resources(self):
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
create_dask_dashboard_ingress(self.workflow_id)

def _prepare_cluster(self):
"""Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
Expand All @@ -129,6 +127,8 @@ def _prepare_cluster(self):
# Add the name of the cluster, used in scheduler service name
self.cluster_body["metadata"] = {"name": self.cluster_name}

# self.cluster_body["spec"]["worker"]["spec"]["metadata"] = {"name": "amcik"}

self.cluster_body["spec"]["scheduler"]["service"]["selector"][
"dask.org/cluster-name"
] = self.cluster_name
Expand Down Expand Up @@ -517,3 +517,7 @@ def requires_dask(workflow):
return bool(
workflow.reana_specification["workflow"].get("resources", {}).get("dask", False)
)


def get_dask_scheduler_uri(workflow_id):
return f"reana-dask-{workflow_id}-scheduler.default.svc.cluster.local:8786"
26 changes: 18 additions & 8 deletions reana_workflow_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_k8s_cvmfs_volumes,
get_workspace_volume,
)
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import ( # isort:skip
JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT,
Expand Down Expand Up @@ -401,13 +402,18 @@ def delete_k8s_ingress_object(ingress_name, namespace):
)


def create_dask_dashboard_ingress(cluster_name, workflow_id):
def create_dask_dashboard_ingress(workflow_id):
"""Create K8S Ingress object for Dask dashboard."""
# Define the middleware spec
middleware_spec = {
"apiVersion": "traefik.io/v1alpha1",
"kind": "Middleware",
"metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"},
"metadata": {
"name": get_dask_component_name(
workflow_id, "dashboard_ingress_middleware"
),
"namespace": "default",
},
"spec": {
"replacePathRegex": {
"regex": f"/{workflow_id}/dashboard/*",
Expand All @@ -420,10 +426,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=client.V1ObjectMeta(
name=f"dask-dashboard-ingress-{cluster_name}",
name=get_dask_component_name(workflow_id, "dashboard_ingress"),
annotations={
**REANA_INGRESS_ANNOTATIONS,
"traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd",
"traefik.ingress.kubernetes.io/router.middlewares": f"default-{get_dask_component_name(workflow_id, "dashboard_ingress_middleware")}@kubernetescrd",
},
),
spec=client.V1IngressSpec(
Expand All @@ -437,7 +443,9 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
path_type="Prefix",
backend=client.V1IngressBackend(
service=client.V1IngressServiceBackend(
name=f"{cluster_name}-scheduler",
name=get_dask_component_name(
workflow_id, "dashboard_service"
),
port=client.V1ServiceBackendPort(number=8787),
)
),
Expand Down Expand Up @@ -465,17 +473,19 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
)


def delete_dask_dashboard_ingress(cluster_name, workflow_id):
def delete_dask_dashboard_ingress(workflow_id):
"""Delete K8S Ingress Object for Dask dashboard."""
current_k8s_networking_api_client.delete_namespaced_ingress(
name=cluster_name, namespace="default", body=client.V1DeleteOptions()
name=get_dask_component_name(workflow_id, "dashboard_ingress"),
namespace="default",
body=client.V1DeleteOptions(),
)
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="traefik.io",
version="v1alpha1",
namespace="default",
plural="middlewares",
name=f"replacepath-{workflow_id}",
name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"),
)


Expand Down
6 changes: 3 additions & 3 deletions reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from webargs import fields, validate
from webargs.flaskparser import use_args, use_kwargs
from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.utils import build_unique_component_name
from reana_commons.utils import build_unique_component_name, get_dask_component_name
from reana_db.database import Session
from reana_db.models import (
RunStatus,
Expand Down Expand Up @@ -412,7 +412,7 @@ def get_workflows(args, paginate=None): # noqa
dask_service = workflow.services.first()
if dask_service and dask_service.status == RunStatus.created:
pod_status = check_pod_by_prefix(
pod_name_prefix=f"reana-run-dask-{workflow.id_}"
pod_name_prefix=get_dask_component_name(workflow.id_, "cluster")
)
if pod_status == "Running":
dask_service.status = RunStatus.running
Expand Down Expand Up @@ -625,7 +625,7 @@ def create_workflow(): # noqa
)
if requires_dask(workflow):
dask_service = Service(
name=f"dask-service-{workflow_uuid}",
name=get_dask_component_name(workflow.id_, "db_service"),
uri=f"{REANA_HOSTNAME}{workflow_uuid}/dashboard/status",
type_=ServiceType.dask,
status=RunStatus.created,
Expand Down
4 changes: 3 additions & 1 deletion reana_workflow_controller/templates/dask_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ spec:
imagePullPolicy: "IfNotPresent"
command: ["/bin/sh", "-c"]
args:
- exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788
- exec dask-worker --dashboard --dashboard-address 8788
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
metadata:
generateName: dask-hello-world-
scheduler:
spec:
containers:
Expand Down
11 changes: 8 additions & 3 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@

from reana_workflow_controller.errors import REANAInteractiveSessionError

from reana_workflow_controller.dask import DaskResourceManager, requires_dask
from reana_workflow_controller.dask import (
DaskResourceManager,
requires_dask,
get_dask_scheduler_uri,
)

from reana_workflow_controller.k8s import (
build_interactive_k8s_objects,
Expand Down Expand Up @@ -374,9 +378,10 @@ def start_batch_workflow_run(

try:
# Create the dask cluster and required resources

if requires_dask(self.workflow):
DaskResourceManager(
cluster_name=f"reana-run-dask-{self.workflow.id_}",
workflow_id=self.workflow.id_,
workflow_spec=self.workflow.reana_specification["workflow"],
workflow_workspace=self.workflow.workspace_path,
user_id=self.workflow.owner_id,
Expand Down Expand Up @@ -758,7 +763,7 @@ def _create_job_spec(
job_controller_container.env.append(
{
"name": "DASK_SCHEDULER_URI",
"value": f"reana-run-dask-{self.workflow.id_}-scheduler.default.svc.cluster.local:8786",
"value": get_dask_scheduler_uri(self.workflow.id_),
},
)

Expand Down

0 comments on commit e549351

Please sign in to comment.