Skip to content

Commit

Permalink
feat(sessions): expose user secrets in interactive sessions (reanahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
mdonadoni committed Jun 26, 2024
1 parent a7c9c85 commit 784efee
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 75 deletions.
81 changes: 41 additions & 40 deletions reana_workflow_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
current_k8s_corev1_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.k8s.volumes import (
get_k8s_cvmfs_volumes,
get_workspace_volume,
Expand Down Expand Up @@ -77,6 +78,19 @@ def __init__(
name=deployment_name,
labels={"reana_workflow_mode": "session"},
)
self._session_container = client.V1Container(
name=self.deployment_name, image=self.image, env=[], volume_mounts=[]
)
self._pod_spec = client.V1PodSpec(
containers=[self._session_container],
volumes=[],
node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL,
# Disable service discovery with env variables, so that the environment is
# not polluted with variables like `REANA_SERVER_SERVICE_HOST`
enable_service_links=False,
automount_service_account_token=False,
)

self.kubernetes_objects = {
"ingress": self._build_ingress(),
"service": self._build_service(metadata),
Expand Down Expand Up @@ -149,15 +163,6 @@ def _build_deployment(self, metadata):
:param metadata: Common Kubernetes metadata for the interactive
deployment.
"""
container = client.V1Container(name=self.deployment_name, image=self.image)
pod_spec = client.V1PodSpec(
containers=[container],
node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL,
# Disable service discovery with env variables, so that the environment is
# not polluted with variables like `REANA_SERVER_SERVICE_HOST`
enable_service_links=False,
automount_service_account_token=False,
)
labels = {
"app": self.deployment_name,
"reana_workflow_mode": "session",
Expand All @@ -166,7 +171,7 @@ def _build_deployment(self, metadata):
}
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels=labels),
spec=pod_spec,
spec=self._pod_spec,
)
spec = client.V1DeploymentSpec(
selector=client.V1LabelSelector(match_labels=labels),
Expand All @@ -184,36 +189,26 @@ def _build_deployment(self, metadata):

def add_command(self, command):
"""Add a command to the deployment."""
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].command = command
self._session_container.command = command

def add_command_arguments(self, args):
"""Add command line arguments in addition to the command."""
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].args = args
self._session_container.args = args

def add_reana_shared_storage(self):
"""Add the REANA shared file system volume mount to the deployment."""
volume_mount, volume = get_workspace_volume(self.workspace)
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].volume_mounts = [volume_mount]
self.kubernetes_objects["deployment"].spec.template.spec.volumes = [volume]
self._session_container.volume_mounts.append(volume_mount)
self._pod_spec.volumes.append(volume)

def add_cvmfs_repo_mounts(self, cvmfs_repos):
"""Add mounts for the provided CVMFS repositories to the deployment.
:param cvmfs_mounts: List of CVMFS repos to make available.
"""
cvmfs_volume_mounts, cvmfs_volumes = get_k8s_cvmfs_volumes(cvmfs_repos)
self.kubernetes_objects["deployment"].spec.template.spec.volumes.extend(
cvmfs_volumes
)
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].volume_mounts.extend(cvmfs_volume_mounts)
self._pod_spec.volumes.extend(cvmfs_volumes)
self._session_container.volume_mounts.extend(cvmfs_volume_mounts)

def add_environment_variable(self, name, value):
"""Add an environment variable.
Expand All @@ -222,24 +217,25 @@ def add_environment_variable(self, name, value):
:param value: Environment variable value.
"""
env_var = client.V1EnvVar(name, str(value))
if isinstance(
self.kubernetes_objects["deployment"].spec.template.spec.containers[0].env,
list,
):
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].env.append(env_var)
else:
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].env = [env_var]
self._session_container.env.append(env_var)

def add_run_with_root_permissions(self):
"""Run interactive session with root."""
security_context = client.V1SecurityContext(run_as_user=0)
self.kubernetes_objects["deployment"].spec.template.spec.containers[
0
].security_context = security_context
self._session_container.security_context = security_context

def add_user_secrets(self):
"""Mount the "file" secrets and set the "env" secrets in the container."""
secrets_store = REANAUserSecretsStore(self.owner_id)

# mount file secrets
secrets_volume = secrets_store.get_file_secrets_volume_as_k8s_specs()
secrets_volume_mount = secrets_store.get_secrets_volume_mount_as_k8s_spec()
self._pod_spec.volumes.append(secrets_volume)
self._session_container.volume_mounts.append(secrets_volume_mount)

# set environment secrets
self._session_container.env += secrets_store.get_env_secrets_as_k8s_spec()

def get_deployment_objects(self):
"""Return the alrady built Kubernetes objects."""
Expand All @@ -255,6 +251,7 @@ def build_interactive_jupyter_deployment_k8s_objects(
owner_id=None,
workflow_id=None,
image=None,
expose_secrets=True,
):
"""Build the Kubernetes specification for a Jupyter NB interactive session.
Expand All @@ -276,6 +273,8 @@ def build_interactive_jupyter_deployment_k8s_objects(
session belongs to.
:param image: Jupyter Notebook image to use, i.e.
``jupyter/tensorflow-notebook`` to enable ``tensorflow``.
:param expose_secrets: If true, mount the "file" secrets and set the
"env" secrets in jupyter's pod.
"""
image = image or JUPYTER_INTERACTIVE_SESSION_DEFAULT_IMAGE
cvmfs_repos = cvmfs_repos or []
Expand All @@ -297,6 +296,8 @@ def build_interactive_jupyter_deployment_k8s_objects(
deployment_builder.add_reana_shared_storage()
if cvmfs_repos:
deployment_builder.add_cvmfs_repo_mounts(cvmfs_repos)
if expose_secrets:
deployment_builder.add_user_secrets()
deployment_builder.add_environment_variable("NB_GID", 0)
# Changes umask so all files generated by the Jupyter Notebook can be
# modified by the root group users.
Expand Down
51 changes: 20 additions & 31 deletions reana_workflow_controller/rest/workflows_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@


from flask import Blueprint, jsonify, request
from webargs import fields
from webargs.flaskparser import use_kwargs

from reana_db.utils import _get_workflow_with_uuid_or_name
from reana_db.models import WorkflowSession, InteractiveSessionType, RunStatus

Expand All @@ -20,10 +23,14 @@


@blueprint.route(
"/workflows/<workflow_id_or_name>/open/" "<interactive_session_type>",
"/workflows/<workflow_id_or_name>/open/<interactive_session_type>",
methods=["POST"],
)
def open_interactive_session(workflow_id_or_name, interactive_session_type): # noqa
@use_kwargs({"user": fields.Str(required=True)}, location="query")
@use_kwargs({"image": fields.Str()}, location="json")
def open_interactive_session(
workflow_id_or_name, interactive_session_type, user, **kwargs
): # noqa
r"""Start an interactive session inside the workflow workspace.
---
Expand Down Expand Up @@ -109,45 +116,27 @@ def open_interactive_session(workflow_id_or_name, interactive_session_type): #
"""
try:
if interactive_session_type not in InteractiveSessionType.__members__:
return (
jsonify(
{
"message": "Interactive session type {0} not found, try "
"with one of: {1}".format(
interactive_session_type,
[e.name for e in InteractiveSessionType],
)
}
),
404,
error_msg = (
f"Interactive session type {interactive_session_type} not found, "
f"try with one of: {[e.name for e in InteractiveSessionType]}"
)
interactive_session_configuration = request.json if request.is_json else {}
user_uuid = request.args["user"]
workflow = None
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
return jsonify({"message": error_msg}), 404

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid=user)

if workflow.sessions.first() is not None:
return (
jsonify({"message": "Interactive session is already open"}),
404,
)
return jsonify({"message": "Interactive session is already open"}), 404

if workflow.status == RunStatus.deleted:
return (
jsonify(
{
"message": "Interactive session can't be opened from a deleted workflow"
}
),
404,
)
error_msg = "Interactive session can't be opened from a deleted workflow"
return jsonify({"message": error_msg}), 404

kwrm = KubernetesWorkflowRunManager(workflow)
access_path = kwrm.start_interactive_session(
interactive_session_type,
image=interactive_session_configuration.get("image", None),
image=kwargs.get("image"),
)
return jsonify({"path": "{}".format(access_path)}), 200
return jsonify({"path": str(access_path)}), 200

except (KeyError, ValueError) as e:
status_code = 400 if workflow else 404
Expand Down
1 change: 1 addition & 0 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
:return: Relative path to access the interactive session.
"""
action_completed = True
kubernetes_objects = None
try:
if interactive_session_type not in InteractiveSessionType.__members__:
raise REANAInteractiveSessionError(
Expand Down
47 changes: 47 additions & 0 deletions tests/test_k8s.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# This file is part of REANA.
# Copyright (C) 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

from unittest.mock import Mock, patch
from reana_workflow_controller.k8s import InteractiveDeploymentK8sBuilder
from reana_commons.k8s.secrets import REANAUserSecretsStore


def test_interactive_deployment_k8s_builder_user_secrets(monkeypatch):
"""Expose user secrets in interactive sessions"""
monkeypatch.setattr(
REANAUserSecretsStore,
"get_file_secrets_volume_as_k8s_specs",
lambda _: {"name": "secrets-volume"},
)
monkeypatch.setattr(
REANAUserSecretsStore,
"get_secrets_volume_mount_as_k8s_spec",
lambda _: {"name": "secrets-volume-mount"},
)
monkeypatch.setattr(
REANAUserSecretsStore,
"get_env_secrets_as_k8s_spec",
lambda _: [{"name": "third_env", "value": "3"}],
)

builder = InteractiveDeploymentK8sBuilder(
"name", "workflow_id", "owner_id", "workspace", "docker_image", "port", "path"
)

builder.add_command_arguments(["args"])
builder.add_reana_shared_storage()
builder.add_user_secrets()
builder.add_environment_variable("first_env", "1")
builder.add_environment_variable("second_env", "2")
builder.add_run_with_root_permissions()
objs = builder.get_deployment_objects()

deployment = objs["deployment"]
pod = deployment.spec.template.spec
assert len(pod.containers) == 1
assert {"name": "secrets-volume"} in pod.volumes
assert {"name": "secrets-volume-mount"} in pod.containers[0].volume_mounts
assert {"name": "third_env", "value": "3"} in pod.containers[0].env
2 changes: 2 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ def test_create_interactive_session(app, default_user, sample_serial_workflow_in
current_k8s_corev1_api_client=mock.DEFAULT,
current_k8s_networking_api_client=mock.DEFAULT,
current_k8s_appsv1_api_client=mock.DEFAULT,
REANAUserSecretsStore=mock.DEFAULT,
):
res = client.post(
url_for(
Expand Down Expand Up @@ -1530,6 +1531,7 @@ def test_create_interactive_session_custom_image(
current_k8s_corev1_api_client=mock.DEFAULT,
current_k8s_networking_api_client=mock.DEFAULT,
current_k8s_appsv1_api_client=mock.DEFAULT,
REANAUserSecretsStore=mock.DEFAULT,
) as mocks:
client.post(
url_for(
Expand Down
16 changes: 12 additions & 4 deletions tests/test_workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def test_start_interactive_session(sample_serial_workflow_in_db):
) as mocks:
kwrm = KubernetesWorkflowRunManager(sample_serial_workflow_in_db)
if len(InteractiveSessionType):
kwrm.start_interactive_session(InteractiveSessionType(0).name)
kwrm.start_interactive_session(
InteractiveSessionType(0).name, expose_secrets=False
)
mocks[
"current_k8s_appsv1_api_client"
].create_namespaced_deployment.assert_called_once()
Expand Down Expand Up @@ -66,7 +68,9 @@ def test_start_interactive_workflow_k8s_failure(sample_serial_workflow_in_db):
):
kwrm = KubernetesWorkflowRunManager(sample_serial_workflow_in_db)
if len(InteractiveSessionType):
kwrm.start_interactive_session(InteractiveSessionType(0).name)
kwrm.start_interactive_session(
InteractiveSessionType(0).name, expose_secrets=False
)


def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db):
Expand All @@ -92,7 +96,9 @@ def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db):
try:
kwrm = KubernetesWorkflowRunManager(sample_serial_workflow_in_db)
if len(InteractiveSessionType):
kwrm.start_interactive_session(InteractiveSessionType(0).name)
kwrm.start_interactive_session(
InteractiveSessionType(0).name, expose_secrets=False
)
except REANAInteractiveSessionError:
mocks[
"current_k8s_corev1_api_client"
Expand Down Expand Up @@ -137,7 +143,9 @@ def test_interactive_session_closure(sample_serial_workflow_in_db, session):
):
kwrm = KubernetesWorkflowRunManager(workflow)
if len(InteractiveSessionType):
kwrm.start_interactive_session(InteractiveSessionType(0).name)
kwrm.start_interactive_session(
InteractiveSessionType(0).name, expose_secrets=False
)

int_session = InteractiveSession.query.filter_by(
owner_id=workflow.owner_id,
Expand Down

0 comments on commit 784efee

Please sign in to comment.