From 6d0f7f83414c54088aea76aa800c4c814a0600c8 Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Tue, 30 Apr 2024 12:24:51 +0200 Subject: [PATCH] perf(k8s): avoid fetching secrets multiple times (#456) Closes #455 --- reana_commons/k8s/kerberos.py | 19 +- reana_commons/k8s/secrets.py | 407 ++++++++++++++++------------------ tests/k8s/test_kerberos.py | 28 +++ tests/k8s/test_secrets.py | 193 ++++++++++++++++ tests/test_secrets_store.py | 100 --------- 5 files changed, 421 insertions(+), 326 deletions(-) create mode 100644 tests/k8s/test_kerberos.py create mode 100644 tests/k8s/test_secrets.py delete mode 100644 tests/test_secrets_store.py diff --git a/reana_commons/k8s/kerberos.py b/reana_commons/k8s/kerberos.py index f9cf4561..c470dbfd 100644 --- a/reana_commons/k8s/kerberos.py +++ b/reana_commons/k8s/kerberos.py @@ -23,7 +23,7 @@ KRB5_TOKEN_CACHE_LOCATION, ) from reana_commons.errors import REANASecretDoesNotExist -from reana_commons.k8s.secrets import REANAUserSecretsStore +from reana_commons.k8s.secrets import UserSecrets KerberosConfig = namedtuple( @@ -33,13 +33,13 @@ def get_kerberos_k8s_config( - secrets_store: REANAUserSecretsStore, kubernetes_uid: int + user_secrets: UserSecrets, kubernetes_uid: int ) -> KerberosConfig: """Get the k8s specification for the Kerberos init and renew containers. These containers are used to generate and renew the Kerberos tickets. - :param secrets_stores: User's secrets store + :param user_secrets: User's secrets store :param kubernetes_uid: UID of the user who needs Kerberos :returns: - specification of the sidecar container - volumes needed by the sidecar container @@ -48,15 +48,18 @@ def get_kerberos_k8s_config( - specification for init container used to generate Kerberos ticket - specification for renew container used to periodically renew Kerberos ticket """ - secrets_volume_mount = secrets_store.get_secrets_volume_mount_as_k8s_spec() - keytab_file = secrets_store.get_secret_value("CERN_KEYTAB") - cern_user = secrets_store.get_secret_value("CERN_USER") + secrets_volume_mount = user_secrets.get_secrets_volume_mount_as_k8s_spec() + keytab_file_name = user_secrets.get_secret("CERN_KEYTAB") + cern_user = user_secrets.get_secret("CERN_USER") - if not keytab_file: + if not keytab_file_name: raise REANASecretDoesNotExist(missing_secrets_list=["CERN_KEYTAB"]) if not cern_user: raise REANASecretDoesNotExist(missing_secrets_list=["CERN_USER"]) + keytab_file_name = keytab_file_name.value_str + cern_user = cern_user.value_str + ticket_cache_volume = { "name": "krb5-cache", "emptyDir": {}, @@ -95,7 +98,7 @@ def get_kerberos_k8s_config( "command": [ "kinit", "-kt", - f"/etc/reana/secrets/{keytab_file}", + f"/etc/reana/secrets/{keytab_file_name}", f"{cern_user}@CERN.CH", ], "name": KRB5_INIT_CONTAINER_NAME, diff --git a/reana_commons/k8s/secrets.py b/reana_commons/k8s/secrets.py index 8798328d..ecd78c4d 100644 --- a/reana_commons/k8s/secrets.py +++ b/reana_commons/k8s/secrets.py @@ -8,13 +8,15 @@ """REANA Kubernetes secrets.""" import base64 +import binascii import json import logging +from typing import Any, Dict, List, Optional, Sequence, Union +from uuid import UUID from kubernetes import client from kubernetes.client.rest import ApiException from reana_commons.config import ( - REANA_COMPONENT_PREFIX, REANA_RUNTIME_KUBERNETES_NAMESPACE, REANA_USER_SECRET_MOUNT_PATH, ) @@ -25,273 +27,242 @@ log = logging.getLogger(__name__) -class REANAUserSecretsStore(object): - """REANA user secrets store.""" +class Secret: + """User secret. - def __init__(self, user_secret_store_id): - """Initialise the secret store object.""" - self.user_secret_store_id = build_unique_component_name( - "secretsstore", str(user_secret_store_id) - ) + This class accepts either `bytes` or `str` values. + """ + + types = ["env", "file"] - def _initialise_user_secrets_store(self): - """Initialise an empty Kubernetes secret for a given user.""" + @classmethod + def from_base64(cls, name: str, type_: str, value: str): + """Initialise Secret from base64 encoded value.""" try: - empty_k8s_secret = client.V1Secret( - api_version="v1", - metadata=client.V1ObjectMeta( - name=str(self.user_secret_store_id), - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - ), - data={}, - ) - empty_k8s_secret.metadata.annotations = {"secrets_types": "{}"} - current_k8s_corev1_api_client.create_namespaced_secret( - REANA_RUNTIME_KUBERNETES_NAMESPACE, empty_k8s_secret - ) - return empty_k8s_secret - except ApiException: - log.error( - "Something went wrong while creating " - "Kubernetes secret for user {0}.".format( - str(self.user_secret_store_id) - ), - exc_info=True, - ) + decoded = base64.b64decode(value, validate=True) + except binascii.Error: + raise ValueError("Invalid base64 value.") + return cls(name, type_, decoded) - def _update_store(self, k8s_user_secrets): - """Update Kubernetes secret store. + def __init__(self, name: str, type_: str, value: Union[str, bytes]): + """Initialise Secret.""" + if type_ not in self.types: + raise ValueError(f"type_ must be one of: {self.types}") + self.name: str = name + self.type_: str = type_ + self.set_value(value) - :param k8s_user_secrets: A Kubernetes secrets object containing a new - version of the store. - """ - current_k8s_corev1_api_client.replace_namespaced_secret( - str(self.user_secret_store_id), - REANA_RUNTIME_KUBERNETES_NAMESPACE, - k8s_user_secrets, + @property + def value_str(self) -> str: + """Get secret value as string.""" + return self._value_bytes.decode() + + @property + def value_bytes(self) -> bytes: + """Get secret value as bytes.""" + return self._value_bytes + + def set_value(self, value: Union[str, bytes]): + """Set secret value.""" + self._value_bytes = value.encode() if isinstance(value, str) else bytes(value) + + def __eq__(self, other): + """Check if two secrets are equal.""" + if not isinstance(other, Secret): + return False + return ( + self.name == other.name + and self.type_ == other.type_ + and self._value_bytes == other._value_bytes ) - def _get_k8s_user_secrets_store(self): - """Retrieve the Kubernetes secret which contains all user secrets.""" - try: - k8s_user_secrets_store = ( - current_k8s_corev1_api_client.read_namespaced_secret( - str(self.user_secret_store_id), REANA_RUNTIME_KUBERNETES_NAMESPACE - ) - ) - k8s_user_secrets_store.data = k8s_user_secrets_store.data or {} - return k8s_user_secrets_store - except ApiException as api_e: - if api_e.status == 404: - log.info( - "Kubernetes secret for user {0} does not " - "exist, creating...".format(str(self.user_secret_store_id)) - ) - return self._initialise_user_secrets_store() - else: - log.error( - "Something went wrong while retrieving " - "Kubernetes secret for user {0}.".format( - str(self.user_secret_store_id) - ), - exc_info=True, - ) - def _dump_json_annotation_to_k8s_object( - self, k8s_object, annotation_key, annotation_value - ): - """Dump Python object as annotation to Kubernetes object.""" - try: - k8s_object.metadata.annotations[annotation_key] = json.dumps( - annotation_value - ) - except TypeError as e: - log.error( - "Could not add annotations to user secrets:\n" "{}".format(str(e)), - exc_info=True, - ) +class UserSecrets: + """Collections of secrets of a given user.""" - def _load_json_annotation_from_k8s_object(self, k8s_object, annotation_key): - """Load string annotations from Kubernetes object.""" - try: - return json.loads(k8s_object.metadata.annotations[annotation_key]) - except ValueError: - log.error( - "Annotations for user {} secret store could not be" - "loaded as json.".format(annotation_key) - ) - except KeyError: - log.error( - "Annotation key {annotation_key} does not exist for" - " user {user} secret store, so it can not be loaded".format( - annotation_key=annotation_key, user=k8s_object.metadata.name - ) + def __init__(self, user_id: str, k8s_secret_name: str, secrets: List[Secret] = []): + """Initialise UserSecrets.""" + self.user_id = user_id + self.k8s_secret_name = k8s_secret_name + self.secrets = {secret.name: secret for secret in secrets} + + @classmethod + def from_k8s_secret(cls, user_id: str, k8s_secret: client.V1Secret): + """Initialise from k8s secret object.""" + secrets = [] + types = json.loads(k8s_secret.metadata.annotations["secrets_types"]) + for secret_name, secret_value in k8s_secret.data.items(): + secrets.append( + Secret.from_base64(secret_name, types[secret_name], secret_value) ) + return cls( + user_id=user_id, + k8s_secret_name=k8s_secret.metadata.name, + secrets=secrets, + ) - def add_secrets(self, secrets_dict, overwrite=False): - """Add a new secret to the user's Kubernetes secret. + def to_k8s_secret(self) -> client.V1Secret: + """Return user secrets as Kubernetes secret.""" + secrets_types = {secret.name: secret.type_ for secret in self.secrets.values()} + k8s_secret = client.V1Secret( + api_version="v1", + metadata=client.V1ObjectMeta( + name=self.k8s_secret_name, + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + annotations={"secrets_types": json.dumps(secrets_types)}, + ), + data={ + secret.name: base64.standard_b64encode(secret.value_bytes).decode() + for secret in self.secrets.values() + }, + ) + return k8s_secret - :param secrets: Dictionary containing new secrets, where keys are - secret names and corresponding values are dictionaries containing - base64 encoded value and a type (which determines how the secret - should be mounted). - :returns: Updated user secret list. - """ - try: - k8s_user_secrets = self._get_k8s_user_secrets_store() - for secret_name in secrets_dict: - if k8s_user_secrets.data.get(secret_name) and not overwrite: - raise REANASecretAlreadyExists( - "Operation cancelled. Secret {} already exists. " - "If you want change it use overwrite".format(secret_name) - ) - secrets_types = self._load_json_annotation_from_k8s_object( - k8s_user_secrets, "secrets_types" - ) - secrets_types[secret_name] = secrets_dict[secret_name]["type"] - self._dump_json_annotation_to_k8s_object( - k8s_user_secrets, "secrets_types", secrets_types + def add_secrets(self, secrets: Sequence[Secret], overwrite: bool = False): + """Add new secrets to the user's secrets.""" + for secret in secrets: + if secret.name in self.secrets and not overwrite: + raise REANASecretAlreadyExists( + "Operation cancelled. Secret {} already exists. " + "If you want change it use overwrite".format(secret.name) ) - k8s_user_secrets.data[secret_name] = secrets_dict[secret_name]["value"] - self._update_store(k8s_user_secrets) - return k8s_user_secrets.data.keys() - except ApiException: - log.error( - "Something went wrong while adding secrets to " - "Kubernetes secret for user {0}.".format( - str(self.user_secret_store_id) - ), - exc_info=True, - ) + self.secrets[secret.name] = secret - def get_secrets(self): - """List all secrets for a given user.""" - secrets_store = self._get_k8s_user_secrets_store() - secrets_with_types = [] - for secret_name in secrets_store.data: - secrets_types = self._load_json_annotation_from_k8s_object( - secrets_store, "secrets_types" - ) - secrets_with_types.append( - {"name": secret_name, "type": secrets_types[secret_name]} - ) + def delete_secrets(self, names: Sequence[str]) -> List[str]: + """Delete one or more of users secrets.""" + missing_secrets = [name for name in names if name not in self.secrets] + if missing_secrets: + raise REANASecretDoesNotExist(missing_secrets) - return secrets_with_types + for secret_name in names: + del self.secrets[secret_name] + return list(names) - def get_env_secrets_as_k8s_spec(self): - """Return a list of specification items for env type secrets for k8s. + def get_secret(self, name: str) -> Optional[Secret]: + """Get secret of given user by name.""" + return self.secrets.get(name) - Return all environment variable secrets as a list of Kubernetes - environment variable specs. + def get_secrets(self) -> List[Secret]: + """List all secrets for a given user.""" + return list(self.secrets.values()) + + def get_env_secrets_as_k8s_spec(self) -> List: + """Get the list of specification items for env-type secrets for k8s. + + Return all environment variable secrets as a list of dicts. Object reference: https://github.com/kubernetes-client/python/ blob/master/kubernetes/docs/V1EnvVar.md. """ - all_secrets = self.get_secrets() env_secrets = [] - for secret in all_secrets: - name = secret["name"] - if secret["type"] == "env": + for secret in self.secrets.values(): + if secret.type_ == "env": env_secrets.append( { - "name": name, + "name": secret.name, "valueFrom": { "secretKeyRef": { - "name": self.user_secret_store_id, - "key": name, + "name": self.k8s_secret_name, + "key": secret.name, } }, } ) return env_secrets - def get_file_secrets_as_k8s_specs(self): - """Return a list of k8s specification items for file-type secrets. - - API Reference: https://kubernetes.io/docs/concepts/configuration/ - secret/#using-secrets-as-files-from-a-pod - """ - all_secrets = self.get_secrets() - file_secrets = [] - for secret in all_secrets: - name = secret["name"] - if secret["type"] == "file": - file_secrets.append( - { - "key": name, - "path": name, - } - ) - return file_secrets + def get_secrets_volume_mount_as_k8s_spec(self) -> Dict[str, Any]: + """Return a volume mount object for the file-type secrets.""" + return { + "name": self.k8s_secret_name, + "mountPath": REANA_USER_SECRET_MOUNT_PATH, + "readOnly": True, + } def get_file_secrets_volume_as_k8s_specs(self): - """Return the k8s specification item for file-type secrets. + """Get the k8s specification of a volume for file-type secrets. - Return the specification for Kubernetes secret store API, + Return the specification of volume adapted from a k8s secret, specifying the secrets that should be mounted as files. Object reference: https://github.com/kubernetes-client/python/ blob/master/kubernetes/docs/V1SecretVolumeSource.md """ - user_id = self.user_secret_store_id + file_secrets = [] + for secret in self.secrets.values(): + if secret.type_ == "file": + file_secrets.append( + { + "key": secret.name, + "path": secret.name, + } + ) return { - "name": user_id, + "name": self.k8s_secret_name, "secret": { - "secretName": user_id, - "items": self.get_file_secrets_as_k8s_specs(), + "secretName": self.k8s_secret_name, + "items": file_secrets, }, } - def get_secrets_volume_mount_as_k8s_spec(self): - """Return a secret volume mount object for secret store id.""" - return { - "name": self.user_secret_store_id, - "mountPath": REANA_USER_SECRET_MOUNT_PATH, - "readOnly": True, - } - def delete_secrets(self, secrets): - """Delete one or more of users secrets. +class UserSecretsStore: + """Utility class to fetch and update user secrets stored in Kubernetes.""" - :param secrets: List of secret names to be deleted form the store. - :returns: List with the names of the deleted secrets. - """ + @staticmethod + def init(user_id: Union[str, UUID]) -> UserSecrets: + """Initialise the secret store of a given user through the k8s API.""" + user_id = str(user_id) + user_secret_store_id = build_unique_component_name("secretsstore", user_id) + empty_secrets = UserSecrets(user_id, user_secret_store_id) try: - k8s_user_secrets = self._get_k8s_user_secrets_store() - deleted = [] - missing_secrets_list = [] - for secret_name in secrets: - try: - secrets_types = self._load_json_annotation_from_k8s_object( - k8s_user_secrets, "secrets_types" - ) - del secrets_types[secret_name] - self._dump_json_annotation_to_k8s_object( - k8s_user_secrets, "secrets_types", secrets_types - ) - del k8s_user_secrets.data[secret_name] - deleted.append(secret_name) - except KeyError: - missing_secrets_list.append(secret_name) - if missing_secrets_list: - raise REANASecretDoesNotExist(missing_secrets_list=missing_secrets_list) - self._update_store(k8s_user_secrets) - return deleted + current_k8s_corev1_api_client.create_namespaced_secret( + REANA_RUNTIME_KUBERNETES_NAMESPACE, empty_secrets.to_k8s_secret() + ) + return empty_secrets except ApiException: log.error( - "Something went wrong while deleting secrets from " - "Kubernetes secret for user {0}.".format( - str(self.user_secret_store_id) - ), + "Something went wrong while creating " + "Kubernetes secret for user {0}.".format(user_secret_store_id), exc_info=True, ) + raise + + @staticmethod + def fetch(user_id: Union[str, UUID]) -> UserSecrets: + """Fetch the secret store of a given user through the k8s API. + + If the secret store does not exist, it will be created. + """ + user_id = str(user_id) + user_secret_store_id = build_unique_component_name("secretsstore", user_id) + try: + k8s_user_secrets_store = ( + current_k8s_corev1_api_client.read_namespaced_secret( + user_secret_store_id, REANA_RUNTIME_KUBERNETES_NAMESPACE + ) + ) + k8s_user_secrets_store.data = k8s_user_secrets_store.data or {} + return UserSecrets.from_k8s_secret(user_id, k8s_user_secrets_store) + except ApiException as api_e: + if api_e.status == 404: + log.info( + "Kubernetes secret for user {0} does not " + "exist, creating...".format(user_secret_store_id) + ) + return UserSecretsStore.init(user_id) + else: + log.error( + "Something went wrong while retrieving " + "Kubernetes secret for user {0}.".format(user_secret_store_id), + exc_info=True, + ) + raise - def get_secret_value(self, name): - """Return secret value if secret with specified name is present.""" - secrets = self.get_secrets() - secret_names = [secret["name"] for secret in secrets] - if name in secret_names: - secrets_store = self._get_k8s_user_secrets_store() - secret_value = base64.standard_b64decode(secrets_store.data[name]).decode() - return secret_value - return None + @staticmethod + def update(secrets: UserSecrets): + """Update the secret store of a given user through the k8s API.""" + current_k8s_corev1_api_client.replace_namespaced_secret( + secrets.k8s_secret_name, + REANA_RUNTIME_KUBERNETES_NAMESPACE, + secrets.to_k8s_secret(), + ) diff --git a/tests/k8s/test_kerberos.py b/tests/k8s/test_kerberos.py new file mode 100644 index 00000000..17d1d6bd --- /dev/null +++ b/tests/k8s/test_kerberos.py @@ -0,0 +1,28 @@ +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +from uuid import uuid4 +from reana_commons.k8s.secrets import Secret, UserSecrets +from reana_commons.k8s.kerberos import KerberosConfig, get_kerberos_k8s_config + + +def test_get_kerberos_k8s_config(kerberos_user_secrets): + """Test get_kerberos_k8s_config.""" + secrets = [ + Secret.from_base64(name, type_=s["type"], value=s["value"]) + for name, s in kerberos_user_secrets.items() + ] + user_secrets = UserSecrets(str(uuid4()), "k8s_kerberos_secret", secrets) + conf: KerberosConfig = get_kerberos_k8s_config(user_secrets, 123) + + assert conf.init_container["command"] == [ + "kinit", + "-kt", + "/etc/reana/secrets/.keytab", + "johndoe@CERN.CH", + ] + assert conf.init_container["securityContext"]["runAsUser"] == 123 + assert conf.renew_container["securityContext"]["runAsUser"] == 123 diff --git a/tests/k8s/test_secrets.py b/tests/k8s/test_secrets.py new file mode 100644 index 00000000..54d6ac2c --- /dev/null +++ b/tests/k8s/test_secrets.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2019, 2020, 2021, 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +import json +from uuid import uuid4 + +import pytest +from kubernetes import client +from kubernetes.client.rest import ApiException +from mock import DEFAULT, Mock, patch + +from reana_commons.errors import REANASecretAlreadyExists, REANASecretDoesNotExist +from reana_commons.k8s.secrets import Secret, UserSecrets, UserSecretsStore + + +def test_secret_encoding(): + """Test the correct encoding of secret values.""" + s = Secret("name", type_="env", value="secret") + assert s.value_bytes == b"secret" + assert s.value_str == "secret" + + s = Secret("name", type_="env", value=b"secret2") + assert s.value_bytes == b"secret2" + assert s.value_str == "secret2" + + s.set_value(b"secret3") + assert s.value_bytes == b"secret3" + assert s.value_str == "secret3" + + s.set_value("secret4") + assert s.value_bytes == b"secret4" + assert s.value_str == "secret4" + + +def test_user_secrets_add(): + """Test adding user secrets.""" + us = UserSecrets(user_id="123", k8s_secret_name="asd") + s = Secret("secret_name", "file", "hello!") + us.add_secrets([s]) + assert us.get_secret("secret_name") == s + + +def test_user_secrets_delete(): + """Test deleting user secrets.""" + s = Secret("secret_name", "file", "hello!") + us = UserSecrets(user_id="123", k8s_secret_name="asd", secrets=[s]) + assert us.get_secret("secret_name") is not None + us.delete_secrets(["secret_name"]) + assert us.get_secret("secret_name") is None + + +def test_user_secrets_to_k8s(): + """Test converting user secrets to k8s secrets.""" + s = Secret("secret_name", "file", b"hello!") + s2 = Secret("secret_name_2", "env", "hello env!") + us = UserSecrets(user_id="123", k8s_secret_name="k8s_secret") + us.add_secrets([s, s2]) + k8s_secret = us.to_k8s_secret() + + assert k8s_secret.metadata.name == "k8s_secret" + secret_types = json.loads(k8s_secret.metadata.annotations["secrets_types"]) + assert secret_types["secret_name"] == "file" + assert secret_types["secret_name_2"] == "env" + assert k8s_secret.data["secret_name"] == "aGVsbG8h" + assert k8s_secret.data["secret_name_2"] == "aGVsbG8gZW52IQ==" + + +def test_user_secrets_from_k8s(): + """Test converting k8s secrets to user secrets.""" + k8s_secret = client.V1Secret( + metadata=client.V1ObjectMeta( + name="k8s_secret", + annotations={ + "secrets_types": json.dumps( + {"secret_name": "file", "secret_name_2": "env"} + ) + }, + ), + data={ + "secret_name": "aGVsbG8h", + "secret_name_2": "aGVsbG8gZW52IQ==", + }, + ) + + us = UserSecrets.from_k8s_secret("123", k8s_secret) + assert us.get_secret("secret_name").name == "secret_name" + assert us.get_secret("secret_name").type_ == "file" + assert us.get_secret("secret_name").value_str == "hello!" + + assert us.get_secret("secret_name_2").name == "secret_name_2" + assert us.get_secret("secret_name_2").type_ == "env" + assert us.get_secret("secret_name_2").value_str == "hello env!" + + assert len(us.secrets) == 2 + assert us.user_id == "123" + assert us.k8s_secret_name == "k8s_secret" + + +def test_user_secrets_full_conversion_from_to_k8s(): + """Test full conversion from and to k8s secrets.""" + s = Secret("secret_name", "file", b"hello!") + s2 = Secret("secret_name_2", "env", "hello env!") + us = UserSecrets(user_id="123", k8s_secret_name="k8s_secret") + us.add_secrets([s, s2]) + + k8s_secret = us.to_k8s_secret() + us_from_k8s = UserSecrets.from_k8s_secret("123", k8s_secret) + + assert us.user_id == us_from_k8s.user_id + assert us.secrets == us_from_k8s.secrets + + +def test_create_secret(): + """Test creation of user secrets.""" + corev1_api_client = Mock() + corev1_api_client.read_namespaced_secret = Mock( + side_effect=ApiException(reason="Secret does not exist.", status=404) + ) + secrets = [Secret(name="secret", type_="env", value="secret")] + with patch( + "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", corev1_api_client + ): + user_secrets = UserSecretsStore.fetch(uuid4()) + user_secrets.add_secrets(secrets) + UserSecretsStore.update(user_secrets) + corev1_api_client.create_namespaced_secret.assert_called_once() + corev1_api_client.replace_namespaced_secret.assert_called_once() + + +def test_create_existing_secrets_fail( + corev1_api_client_with_user_secrets, user_secrets, no_db_user +): + """Test create secrets which already exist without overwrite.""" + secret_name = next(iter(user_secrets.keys())) + secrets = [Secret(name=secret_name, type_="env", value="secret")] + with patch( + "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", + corev1_api_client_with_user_secrets(user_secrets), + ) as api_client: + user_secrets = UserSecretsStore.fetch(no_db_user) + with pytest.raises(REANASecretAlreadyExists): + user_secrets.add_secrets(secrets) + api_client.replace_namespaced_secret.assert_not_called() + + +def test_overwrite_secret( + corev1_api_client_with_user_secrets, user_secrets, no_db_user +): + """Test overwriting secrets.""" + secret_name = next(iter(user_secrets.keys())) + secrets = [Secret(name=secret_name, type_="env", value="secret")] + with patch( + "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", + corev1_api_client_with_user_secrets(user_secrets), + ) as api_client: + user_secrets = UserSecretsStore.fetch(no_db_user.id_) + user_secrets.add_secrets(secrets, overwrite=True) + UserSecretsStore.update(user_secrets) + api_client.replace_namespaced_secret.assert_called() + + +def test_delete_secrets(corev1_api_client_with_user_secrets, user_secrets, no_db_user): + """Test deletion of user secrets.""" + secret_names_list = list(user_secrets.keys()) + with patch( + "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", + corev1_api_client_with_user_secrets(user_secrets), + ): + user_secrets = UserSecretsStore.fetch(no_db_user.id_) + deleted_secrets = set(user_secrets.delete_secrets(secret_names_list)) + assert bool(deleted_secrets.intersection(secret_names_list)) and not bool( + deleted_secrets.difference(secret_names_list) + ) + + +def test_delete_unknown_secret( + corev1_api_client_with_user_secrets, user_secrets, no_db_user +): + """Test delete a non existing secret.""" + with patch( + "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", + corev1_api_client_with_user_secrets(user_secrets), + ) as api_client: + user_secrets = UserSecretsStore.fetch(no_db_user.id_) + secret_name = "unknown-secret" + with pytest.raises(REANASecretDoesNotExist): + user_secrets.delete_secrets([secret_name]) + api_client.replace_namespaced_secret.assert_not_called() diff --git a/tests/test_secrets_store.py b/tests/test_secrets_store.py deleted file mode 100644 index ee6c4e0d..00000000 --- a/tests/test_secrets_store.py +++ /dev/null @@ -1,100 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of REANA. -# Copyright (C) 2019, 2020, 2021 CERN. -# -# REANA is free software; you can redistribute it and/or modify it -# under the terms of the MIT License; see LICENSE file for more details. - -import json - -import pytest -from kubernetes import client -from kubernetes.client.rest import ApiException -from mock import DEFAULT, Mock, patch - -from reana_commons.errors import REANASecretAlreadyExists, REANASecretDoesNotExist -from reana_commons.k8s.secrets import REANAUserSecretsStore - - -def test_create_secret(user_secrets, no_db_user): - """Test creation of user secrets.""" - corev1_api_client = Mock() - corev1_api_client.read_namespaced_secret = Mock( - side_effect=ApiException(reason="Secret does not exist.", status=404) - ) - with patch( - "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", corev1_api_client - ): - secrets_store = REANAUserSecretsStore(no_db_user.id_) - secrets_store.add_secrets(user_secrets) - corev1_api_client.create_namespaced_secret.assert_called_once() - corev1_api_client.replace_namespaced_secret.assert_called_once() - - -def test_create_existing_secrets_fail( - corev1_api_client_with_user_secrets, user_secrets, no_db_user -): - """Test create secrets which already exist without overwrite.""" - with patch( - "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", - corev1_api_client_with_user_secrets(user_secrets), - ) as api_client: - secrets_store = REANAUserSecretsStore(no_db_user) - with pytest.raises(REANASecretAlreadyExists): - secrets_store.add_secrets(user_secrets) - api_client.replace_namespaced_secret.assert_not_called() - - -def test_overwrite_secret( - corev1_api_client_with_user_secrets, user_secrets, no_db_user -): - """Test overwriting secrets.""" - with patch( - "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", - corev1_api_client_with_user_secrets(user_secrets), - ) as api_client: - secrets_store = REANAUserSecretsStore(no_db_user.id_) - secrets_store.add_secrets(user_secrets, overwrite=True) - api_client.replace_namespaced_secret.assert_called() - - -def test_get_secrets(corev1_api_client_with_user_secrets, user_secrets, no_db_user): - """Test listing user secrests.""" - with patch( - "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", - corev1_api_client_with_user_secrets(user_secrets), - ): - secrets_store = REANAUserSecretsStore(no_db_user.id_) - secrets_list = secrets_store.get_secrets() - for secret in secrets_list: - assert user_secrets[secret["name"]]["type"] == secret["type"] - - -def test_delete_secrets(corev1_api_client_with_user_secrets, user_secrets, no_db_user): - """Test deletion of user secrets.""" - with patch( - "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", - corev1_api_client_with_user_secrets(user_secrets), - ): - secrets_store = REANAUserSecretsStore(no_db_user.id_) - secret_names_list = user_secrets.keys() - deleted_secrets = set(secrets_store.delete_secrets(secret_names_list)) - assert bool(deleted_secrets.intersection(secret_names_list)) and not bool( - deleted_secrets.difference(secret_names_list) - ) - - -def test_delete_unknown_secret( - corev1_api_client_with_user_secrets, user_secrets, no_db_user -): - """Test delete a non existing secret.""" - with patch( - "reana_commons.k8s.secrets." "current_k8s_corev1_api_client", - corev1_api_client_with_user_secrets(user_secrets), - ) as api_client: - secrets_store = REANAUserSecretsStore(no_db_user.id_) - secret_name = "unknown-secret" - with pytest.raises(REANASecretDoesNotExist): - secrets_store.delete_secrets([secret_name]) - api_client.replace_namespaced_secret.assert_not_called()