diff --git a/zetta_utils/cloud_management/resource_allocation/aws_sqs.py b/zetta_utils/cloud_management/resource_allocation/aws_sqs.py index 5f3f1a030..b421e5bc8 100644 --- a/zetta_utils/cloud_management/resource_allocation/aws_sqs.py +++ b/zetta_utils/cloud_management/resource_allocation/aws_sqs.py @@ -3,7 +3,12 @@ from zetta_utils import builder, log from zetta_utils.message_queues.sqs import SQSQueue from zetta_utils.message_queues.sqs import utils as sqs_utils -from zetta_utils.run import Resource, register_resource +from zetta_utils.run import ( + Resource, + ResourceTypes, + deregister_resource, + register_resource, +) logger = log.get_logger("zetta_utils") @@ -14,7 +19,9 @@ def sqs_queue_ctx_mngr(run_id: str, queue: SQSQueue): sqs = sqs_utils.get_sqs_client(queue.region_name) _queue = sqs.create_queue(QueueName=queue.name, Attributes={"SqsManagedSseEnabled": "false"}) - register_resource(Resource(run_id, "sqs_queue", queue.name, region=queue.region_name)) + _id = register_resource( + Resource(run_id, ResourceTypes.SQS_QUEUE.value, queue.name, region=queue.region_name) + ) logger.info(f"Created SQS queue with URL={_queue['QueueUrl']}") try: @@ -23,3 +30,4 @@ def sqs_queue_ctx_mngr(run_id: str, queue: SQSQueue): logger.info(f"Deleting SQS queue '{queue.name}'") logger.debug(f"Deleting SQS queue with URL={_queue['QueueUrl']}") sqs.delete_queue(QueueUrl=_queue["QueueUrl"]) + deregister_resource(_id) diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/configmap.py b/zetta_utils/cloud_management/resource_allocation/k8s/configmap.py index 4989a163a..35fc6c40b 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/configmap.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/configmap.py @@ -7,7 +7,12 @@ from kubernetes import client as k8s_client # type: ignore from zetta_utils import log -from zetta_utils.run import Resource, ResourceTypes, register_resource +from zetta_utils.run import ( + Resource, + ResourceTypes, + deregister_resource, + register_resource, +) from .common import ClusterInfo, get_cluster_data @@ -37,7 +42,7 @@ def configmap_ctx_manager( logger.info(f"Creating k8s configmap `{configmap.metadata.name}`") k8s_core_v1_api.create_namespaced_config_map(body=configmap, namespace=namespace) - register_resource( + _id = register_resource( Resource( run_id, ResourceTypes.K8S_CONFIGMAP.value, @@ -59,3 +64,4 @@ def configmap_ctx_manager( name=configmap.metadata.name, namespace=namespace, ) + deregister_resource(_id) diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/deployment.py b/zetta_utils/cloud_management/resource_allocation/k8s/deployment.py index ddd7ad839..923cce326 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/deployment.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/deployment.py @@ -10,7 +10,12 @@ from kubernetes import client as k8s_client # type: ignore from zetta_utils import builder, log from zetta_utils.mazepa import SemaphoreType -from zetta_utils.run import Resource, ResourceTypes, register_resource +from zetta_utils.run import ( + Resource, + ResourceTypes, + deregister_resource, + register_resource, +) from .common import ClusterInfo, get_cluster_data, get_mazepa_worker_command from .pod import get_pod_spec @@ -160,7 +165,7 @@ def deployment_ctx_mngr( with secrets_ctx_mngr(run_id, secrets, cluster_info): logger.info(f"Creating k8s deployment `{deployment.metadata.name}`") k8s_apps_v1_api.create_namespaced_deployment(body=deployment, namespace=namespace) - register_resource( + _id = register_resource( Resource( run_id, ResourceTypes.K8S_DEPLOYMENT.value, @@ -181,3 +186,4 @@ def deployment_ctx_mngr( k8s_apps_v1_api.delete_namespaced_deployment( name=deployment.metadata.name, namespace=namespace ) + deregister_resource(_id) diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/job.py b/zetta_utils/cloud_management/resource_allocation/k8s/job.py index 786ddd48d..4b8ea63d1 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/job.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/job.py @@ -11,7 +11,12 @@ from kubernetes import client as k8s_client # type: ignore from kubernetes import watch # type: ignore from zetta_utils import log -from zetta_utils.run import Resource, ResourceTypes, register_resource +from zetta_utils.run import ( + Resource, + ResourceTypes, + deregister_resource, + register_resource, +) from .common import ClusterInfo, get_cluster_data from .secret import secrets_ctx_mngr @@ -216,7 +221,7 @@ def job_ctx_manager( with secrets_ctx_mngr(run_id, secrets, cluster_info): logger.info(f"Creating k8s job `{job.metadata.name}`") batch_v1_api.create_namespaced_job(body=job, namespace=namespace) - register_resource( + _id = register_resource( Resource( run_id, ResourceTypes.K8S_JOB.value, @@ -235,3 +240,4 @@ def job_ctx_manager( namespace=namespace, propagation_policy="Foreground", ) + deregister_resource(_id) diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/secret.py b/zetta_utils/cloud_management/resource_allocation/k8s/secret.py index 7111e17cf..89ef64fdb 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/secret.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/secret.py @@ -8,7 +8,12 @@ from kubernetes import client as k8s_client # type: ignore from zetta_utils import log -from zetta_utils.run import Resource, ResourceTypes, register_resource +from zetta_utils.run import ( + Resource, + ResourceTypes, + deregister_resource, + register_resource, +) from .common import ClusterInfo, get_cluster_data @@ -89,16 +94,18 @@ def secrets_ctx_mngr( configuration, _ = get_cluster_data(cluster_info) k8s_client.Configuration.set_default(configuration) k8s_core_v1_api = k8s_client.CoreV1Api() + secrets_resource_ids = [] for secret in secrets: logger.info(f"Creating k8s secret `{secret.metadata.name}`") k8s_core_v1_api.create_namespaced_secret(namespace=namespace, body=secret) - register_resource( + _id = register_resource( Resource( run_id, ResourceTypes.K8S_SECRET.value, secret.metadata.name, ) ) + secrets_resource_ids.append(_id) try: yield @@ -109,8 +116,9 @@ def secrets_ctx_mngr( # need to create a new client for the above to take effect k8s_core_v1_api = k8s_client.CoreV1Api() - for secret in secrets: + for secret, _id in zip(secrets, secrets_resource_ids): logger.info(f"Deleting k8s secret `{secret.metadata.name}`") k8s_core_v1_api.delete_namespaced_secret( name=secret.metadata.name, namespace=namespace ) + deregister_resource(_id) diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/service.py b/zetta_utils/cloud_management/resource_allocation/k8s/service.py index 835481e9f..80f135fb7 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/service.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/service.py @@ -7,7 +7,12 @@ from kubernetes import client as k8s_client # type: ignore from zetta_utils import log -from zetta_utils.run import Resource, ResourceTypes, register_resource +from zetta_utils.run import ( + Resource, + ResourceTypes, + deregister_resource, + register_resource, +) from .common import ClusterInfo, get_cluster_data @@ -41,7 +46,7 @@ def service_ctx_manager( logger.info(f"Creating k8s service `{service.metadata.name}`") k8s_core_v1_api.create_namespaced_service(body=service, namespace=namespace) - register_resource( + _id = register_resource( Resource( run_id, ResourceTypes.K8S_SERVICE.value, @@ -63,3 +68,4 @@ def service_ctx_manager( name=service.metadata.name, namespace=namespace, ) + deregister_resource(_id) diff --git a/zetta_utils/run/__init__.py b/zetta_utils/run/__init__.py index 9c580b536..2adfaf1ba 100644 --- a/zetta_utils/run/__init__.py +++ b/zetta_utils/run/__init__.py @@ -16,7 +16,14 @@ from zetta_utils.mazepa import id_generation from zetta_utils.parsing import json -from .resource import Resource, register_resource, ResourceTypes, ResourceKeys, RESOURCE_DB +from .resource import ( + deregister_resource, + Resource, + register_resource, + ResourceTypes, + ResourceKeys, + RESOURCE_DB, +) logger = log.get_logger("zetta_utils") diff --git a/zetta_utils/run/resource.py b/zetta_utils/run/resource.py index 594b28358..169fb4003 100644 --- a/zetta_utils/run/resource.py +++ b/zetta_utils/run/resource.py @@ -30,6 +30,7 @@ class ResourceKeys(Enum): RUN_ID = "run_id" TYPE = "type" NAME = "name" + REGION = "region" @attrs.frozen @@ -40,8 +41,21 @@ class Resource: region: str = "" -def register_resource(resource: Resource) -> None: +def register_resource(resource: Resource) -> str: _resource = attrs.asdict(resource) row_key = str(uuid.uuid4()) col_keys = tuple(_resource.keys()) RESOURCE_DB[(row_key, col_keys)] = _resource + return row_key + + +def deregister_resource(resource_id: str): # pragma: no cover + def _delete_db_entry(entry_id: str, columns: list[str]): + parent_key = client.key("Row", entry_id) + for column in columns: + col_key = client.key("Column", column, parent=parent_key) + client.delete(col_key) + + client = RESOURCE_DB.backend.client # type: ignore + columns = [key.value for key in list(ResourceKeys)] + _delete_db_entry(resource_id, columns)