Skip to content

Commit

Permalink
feat: deregister_resource fn
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Jan 21, 2024
1 parent 25b6f9b commit 5c7d2dd
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 15 deletions.
12 changes: 10 additions & 2 deletions zetta_utils/cloud_management/resource_allocation/aws_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from zetta_utils import builder, log
from zetta_utils.message_queues.sqs import SQSQueue
from zetta_utils.message_queues.sqs import utils as sqs_utils
from zetta_utils.run import Resource, register_resource
from zetta_utils.run import (
Resource,
ResourceTypes,
deregister_resource,
register_resource,
)

logger = log.get_logger("zetta_utils")

Expand All @@ -14,7 +19,9 @@ def sqs_queue_ctx_mngr(run_id: str, queue: SQSQueue):
sqs = sqs_utils.get_sqs_client(queue.region_name)
_queue = sqs.create_queue(QueueName=queue.name, Attributes={"SqsManagedSseEnabled": "false"})

register_resource(Resource(run_id, "sqs_queue", queue.name, region=queue.region_name))
_id = register_resource(
Resource(run_id, ResourceTypes.SQS_QUEUE.value, queue.name, region=queue.region_name)
)

logger.info(f"Created SQS queue with URL={_queue['QueueUrl']}")
try:
Expand All @@ -23,3 +30,4 @@ def sqs_queue_ctx_mngr(run_id: str, queue: SQSQueue):
logger.info(f"Deleting SQS queue '{queue.name}'")
logger.debug(f"Deleting SQS queue with URL={_queue['QueueUrl']}")
sqs.delete_queue(QueueUrl=_queue["QueueUrl"])
deregister_resource(_id)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

from kubernetes import client as k8s_client # type: ignore
from zetta_utils import log
from zetta_utils.run import Resource, ResourceTypes, register_resource
from zetta_utils.run import (
Resource,
ResourceTypes,
deregister_resource,
register_resource,
)

from .common import ClusterInfo, get_cluster_data

Expand Down Expand Up @@ -37,7 +42,7 @@ def configmap_ctx_manager(

logger.info(f"Creating k8s configmap `{configmap.metadata.name}`")
k8s_core_v1_api.create_namespaced_config_map(body=configmap, namespace=namespace)
register_resource(
_id = register_resource(
Resource(
run_id,
ResourceTypes.K8S_CONFIGMAP.value,
Expand All @@ -59,3 +64,4 @@ def configmap_ctx_manager(
name=configmap.metadata.name,
namespace=namespace,
)
deregister_resource(_id)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from kubernetes import client as k8s_client # type: ignore
from zetta_utils import builder, log
from zetta_utils.mazepa import SemaphoreType
from zetta_utils.run import Resource, ResourceTypes, register_resource
from zetta_utils.run import (
Resource,
ResourceTypes,
deregister_resource,
register_resource,
)

from .common import ClusterInfo, get_cluster_data, get_mazepa_worker_command
from .pod import get_pod_spec
Expand Down Expand Up @@ -160,7 +165,7 @@ def deployment_ctx_mngr(
with secrets_ctx_mngr(run_id, secrets, cluster_info):
logger.info(f"Creating k8s deployment `{deployment.metadata.name}`")
k8s_apps_v1_api.create_namespaced_deployment(body=deployment, namespace=namespace)
register_resource(
_id = register_resource(
Resource(
run_id,
ResourceTypes.K8S_DEPLOYMENT.value,
Expand All @@ -181,3 +186,4 @@ def deployment_ctx_mngr(
k8s_apps_v1_api.delete_namespaced_deployment(
name=deployment.metadata.name, namespace=namespace
)
deregister_resource(_id)
10 changes: 8 additions & 2 deletions zetta_utils/cloud_management/resource_allocation/k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from kubernetes import client as k8s_client # type: ignore
from kubernetes import watch # type: ignore
from zetta_utils import log
from zetta_utils.run import Resource, ResourceTypes, register_resource
from zetta_utils.run import (
Resource,
ResourceTypes,
deregister_resource,
register_resource,
)

from .common import ClusterInfo, get_cluster_data
from .secret import secrets_ctx_mngr
Expand Down Expand Up @@ -216,7 +221,7 @@ def job_ctx_manager(
with secrets_ctx_mngr(run_id, secrets, cluster_info):
logger.info(f"Creating k8s job `{job.metadata.name}`")
batch_v1_api.create_namespaced_job(body=job, namespace=namespace)
register_resource(
_id = register_resource(
Resource(
run_id,
ResourceTypes.K8S_JOB.value,
Expand All @@ -235,3 +240,4 @@ def job_ctx_manager(
namespace=namespace,
propagation_policy="Foreground",
)
deregister_resource(_id)
14 changes: 11 additions & 3 deletions zetta_utils/cloud_management/resource_allocation/k8s/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from kubernetes import client as k8s_client # type: ignore
from zetta_utils import log
from zetta_utils.run import Resource, ResourceTypes, register_resource
from zetta_utils.run import (
Resource,
ResourceTypes,
deregister_resource,
register_resource,
)

from .common import ClusterInfo, get_cluster_data

Expand Down Expand Up @@ -89,16 +94,18 @@ def secrets_ctx_mngr(
configuration, _ = get_cluster_data(cluster_info)
k8s_client.Configuration.set_default(configuration)
k8s_core_v1_api = k8s_client.CoreV1Api()
secrets_resource_ids = []
for secret in secrets:
logger.info(f"Creating k8s secret `{secret.metadata.name}`")
k8s_core_v1_api.create_namespaced_secret(namespace=namespace, body=secret)
register_resource(
_id = register_resource(
Resource(
run_id,
ResourceTypes.K8S_SECRET.value,
secret.metadata.name,
)
)
secrets_resource_ids.append(_id)

try:
yield
Expand All @@ -109,8 +116,9 @@ def secrets_ctx_mngr(

# need to create a new client for the above to take effect
k8s_core_v1_api = k8s_client.CoreV1Api()
for secret in secrets:
for secret, _id in zip(secrets, secrets_resource_ids):
logger.info(f"Deleting k8s secret `{secret.metadata.name}`")
k8s_core_v1_api.delete_namespaced_secret(
name=secret.metadata.name, namespace=namespace
)
deregister_resource(_id)
10 changes: 8 additions & 2 deletions zetta_utils/cloud_management/resource_allocation/k8s/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

from kubernetes import client as k8s_client # type: ignore
from zetta_utils import log
from zetta_utils.run import Resource, ResourceTypes, register_resource
from zetta_utils.run import (
Resource,
ResourceTypes,
deregister_resource,
register_resource,
)

from .common import ClusterInfo, get_cluster_data

Expand Down Expand Up @@ -41,7 +46,7 @@ def service_ctx_manager(
logger.info(f"Creating k8s service `{service.metadata.name}`")
k8s_core_v1_api.create_namespaced_service(body=service, namespace=namespace)

register_resource(
_id = register_resource(
Resource(
run_id,
ResourceTypes.K8S_SERVICE.value,
Expand All @@ -63,3 +68,4 @@ def service_ctx_manager(
name=service.metadata.name,
namespace=namespace,
)
deregister_resource(_id)
9 changes: 8 additions & 1 deletion zetta_utils/run/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@
from zetta_utils.mazepa import id_generation
from zetta_utils.parsing import json

from .resource import Resource, register_resource, ResourceTypes, ResourceKeys, RESOURCE_DB
from .resource import (
deregister_resource,
Resource,
register_resource,
ResourceTypes,
ResourceKeys,
RESOURCE_DB,
)

logger = log.get_logger("zetta_utils")

Expand Down
16 changes: 15 additions & 1 deletion zetta_utils/run/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ResourceKeys(Enum):
RUN_ID = "run_id"
TYPE = "type"
NAME = "name"
REGION = "region"


@attrs.frozen
Expand All @@ -40,8 +41,21 @@ class Resource:
region: str = ""


def register_resource(resource: Resource) -> None:
def register_resource(resource: Resource) -> str:
_resource = attrs.asdict(resource)
row_key = str(uuid.uuid4())
col_keys = tuple(_resource.keys())
RESOURCE_DB[(row_key, col_keys)] = _resource
return row_key


def deregister_resource(resource_id: str): # pragma: no cover
def _delete_db_entry(entry_id: str, columns: list[str]):
parent_key = client.key("Row", entry_id)
for column in columns:
col_key = client.key("Column", column, parent=parent_key)
client.delete(col_key)

client = RESOURCE_DB.backend.client # type: ignore
columns = [key.value for key in list(ResourceKeys)]
_delete_db_entry(resource_id, columns)

0 comments on commit 5c7d2dd

Please sign in to comment.