From 8b0d3f850d3ab28a9dbd42a790b5bbfaaa98dd0a Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Wed, 25 Oct 2023 17:46:12 +0200 Subject: [PATCH] Add `nodepool` to the CRD --- CHANGES.rst | 3 ++ crate/operator/change_compute.py | 46 +++++++++++++------ crate/operator/constants.py | 5 ++ crate/operator/create.py | 10 ++-- crate/operator/utils/crd.py | 1 + crate/operator/webhooks.py | 3 ++ .../templates/cratedbs-cloud-crate-io.yaml | 6 +++ tests/test_change_compute.py | 11 +++-- tests/test_create.py | 6 +++ 9 files changed, 69 insertions(+), 22 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 11954bed..7714abe6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,9 @@ Changelog Unreleased ---------- +* Changed the operator CRD to be able to specify a nodepool and set node affinity and + tolerations accordingly when creating a cluster or changing its compute. + 2.31.0 (2023-09-11) ------------------- diff --git a/crate/operator/change_compute.py b/crate/operator/change_compute.py index a9c3809a..5e36c5a4 100644 --- a/crate/operator/change_compute.py +++ b/crate/operator/change_compute.py @@ -87,19 +87,23 @@ async def handle( # type: ignore def generate_change_compute_payload(old, body): - old_data = old["spec"]["nodes"]["data"][0].get("resources", {}) - new_data = body["spec"]["nodes"]["data"][0].get("resources", {}) + old_data = old["spec"]["nodes"]["data"][0] + new_data = body["spec"]["nodes"]["data"][0] + old_resources = old_data.get("resources", {}) + new_resources = new_data.get("resources", {}) return WebhookChangeComputePayload( - old_cpu_limit=old_data.get("limits", {}).get("cpu"), - old_memory_limit=old_data.get("limits", {}).get("memory"), - old_cpu_request=old_data.get("requests", {}).get("cpu"), - old_memory_request=old_data.get("requests", {}).get("memory"), - old_heap_ratio=old_data.get("heapRatio"), - new_cpu_limit=new_data.get("limits", {}).get("cpu"), - new_memory_limit=new_data.get("limits", {}).get("memory"), - new_cpu_request=new_data.get("requests", {}).get("cpu"), - new_memory_request=new_data.get("requests", {}).get("memory"), - new_heap_ratio=new_data.get("heapRatio"), + old_cpu_limit=old_resources.get("limits", {}).get("cpu"), + old_memory_limit=old_resources.get("limits", {}).get("memory"), + old_cpu_request=old_resources.get("requests", {}).get("cpu"), + old_memory_request=old_resources.get("requests", {}).get("memory"), + old_heap_ratio=old_resources.get("heapRatio"), + old_nodepool=old_data.get("nodepool"), + new_cpu_limit=new_resources.get("limits", {}).get("cpu"), + new_memory_limit=new_resources.get("limits", {}).get("memory"), + new_cpu_request=new_resources.get("requests", {}).get("cpu"), + new_memory_request=new_resources.get("requests", {}).get("memory"), + new_heap_ratio=new_resources.get("heapRatio"), + new_nodepool=new_data.get("nodepool"), ) @@ -165,8 +169,22 @@ def generate_body_patch( "spec": { "template": { "spec": { - "affinity": get_statefulset_affinity(name, logger, node_spec), - "tolerations": get_tolerations(name, logger, node_spec), + "affinity": get_statefulset_affinity( + name, + logger, + { + **node_spec, + "nodepool": compute_change_data.get("new_nodepool"), + }, + ), + "tolerations": get_tolerations( + name, + logger, + { + **node_spec, + "nodepool": compute_change_data.get("new_nodepool"), + }, + ), "containers": [node_spec], } } diff --git a/crate/operator/constants.py b/crate/operator/constants.py index ffec8e82..57443c00 100644 --- a/crate/operator/constants.py +++ b/crate/operator/constants.py @@ -72,3 +72,8 @@ class SnapshotRestoreType(enum.Enum): TABLES = "tables" SECTIONS = "sections" PARTITIONS = "partitions" + + +class Nodepool(str, enum.Enum): + SHARED = "shared" + DEDICATED = "dedicated" diff --git a/crate/operator/create.py b/crate/operator/create.py index eade3f94..92cc8d48 100644 --- a/crate/operator/create.py +++ b/crate/operator/create.py @@ -92,6 +92,7 @@ SHARED_NODE_TOLERATION_KEY, SHARED_NODE_TOLERATION_VALUE, CloudProvider, + Nodepool, Port, ) from crate.operator.utils import crate, quorum @@ -106,7 +107,6 @@ def get_sql_exporter_config( owner_references: Optional[List[V1OwnerReference]], name: str, labels: LabelType ) -> V1ConfigMap: - sql_exporter_config = pkgutil.get_data("crate.operator", "data/sql-exporter.yaml") if sql_exporter_config: @@ -379,7 +379,6 @@ def get_statefulset_crate_command( is_data: bool, crate_version: str, ) -> List[str]: - expected_nodes_setting_name = "gateway.expected_nodes" recover_after_nodes_setting_name = "gateway.recover_after_nodes" expected_nodes_setting_value = total_nodes_count @@ -1034,7 +1033,6 @@ async def recreate_services( meta, logger: logging.Logger, ): - ports_spec = spec.get("ports", {}) http_port = ports_spec.get("http", Port.HTTP.value) postgres_port = ports_spec.get("postgres", Port.POSTGRES.value) @@ -1124,9 +1122,12 @@ def is_shared_resources_cluster(node_spec: Dict[str, Any]) -> bool: cpu_limit = node_spec["resources"].get("limits", {}).get("cpu") memory_request = node_spec["resources"].get("requests", {}).get("memory") memory_limit = node_spec["resources"].get("limits", {}).get("memory") + nodepool = node_spec.get("nodepool") if not (cpu_request or memory_request): return False - return cpu_request != cpu_limit or memory_request != memory_limit + return ( + cpu_request != cpu_limit or memory_request != memory_limit + ) and nodepool == Nodepool.SHARED except KeyError: return False @@ -1251,7 +1252,6 @@ async def handle( # type: ignore logger: logging.Logger, **kwargs: Any, ): - await create_statefulset( owner_references, namespace, diff --git a/crate/operator/utils/crd.py b/crate/operator/utils/crd.py index 0b8a5d51..b744d19e 100644 --- a/crate/operator/utils/crd.py +++ b/crate/operator/utils/crd.py @@ -36,4 +36,5 @@ def has_compute_changed(old_spec, new_spec) -> bool: != new_spec.get("resources", {}).get("limits", {}).get("memory") or old_spec.get("resources", {}).get("requests", {}).get("memory") != new_spec.get("resources", {}).get("requests", {}).get("memory") + or old_spec.get("nodepool") != new_spec.get("nodepool") ) diff --git a/crate/operator/webhooks.py b/crate/operator/webhooks.py index c524dc55..4d046ea7 100644 --- a/crate/operator/webhooks.py +++ b/crate/operator/webhooks.py @@ -153,6 +153,9 @@ class WebhookChangeComputePayload(WebhookSubPayload): old_heap_ratio: float new_heap_ratio: float + old_nodepool: str + new_nodepool: str + class WebhookInfoChangedPayload(WebhookSubPayload): external_ip: str diff --git a/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml b/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml index a243090f..bab1d279 100644 --- a/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml +++ b/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml @@ -437,6 +437,9 @@ spec: replicas: description: Number of CrateDB nodes of this type. type: number + nodepool: + description: Type of nodepool where the cluster should run. + type: string resources: properties: cpus: @@ -519,6 +522,9 @@ spec: description: Number of master nodes. Should be an odd number. minimum: 3 type: number + nodepool: + description: Type of nodepool where the cluster should run. + type: string resources: properties: cpus: diff --git a/tests/test_change_compute.py b/tests/test_change_compute.py index 3d795b9d..998cc59e 100644 --- a/tests/test_change_compute.py +++ b/tests/test_change_compute.py @@ -282,12 +282,13 @@ async def test_change_compute_from_request_to_limit( @pytest.mark.parametrize( "old_cpu_limit, old_memory_limit, old_cpu_request, old_memory_request, " - "new_cpu_limit, new_memory_limit, new_cpu_request, new_memory_request", + "new_cpu_limit, new_memory_limit, new_cpu_request, new_memory_request, " + "old_nodepool, new_nodepool", [ # Test no requests - (1, "2Gi", None, None, 3, "5Gi", None, None), + (1, "2Gi", None, None, 3, "5Gi", None, None, "dedicated", "dedicated"), # Test requests set - (1, "2Gi", None, None, 3, "5Gi", 5, "8Gi"), + (1, "2Gi", None, None, 3, "5Gi", 5, "8Gi", "shared", "shared"), ], ) def test_generate_body_patch( @@ -299,6 +300,8 @@ def test_generate_body_patch( new_memory_limit, new_cpu_request, new_memory_request, + old_nodepool, + new_nodepool, faker, ): compute_change_data = WebhookChangeComputePayload( @@ -312,6 +315,8 @@ def test_generate_body_patch( new_memory_request=new_memory_request, old_heap_ratio=0.25, new_heap_ratio=0.25, + old_nodepool=old_nodepool, + new_nodepool=new_nodepool, ) name = faker.domain_word() diff --git a/tests/test_create.py b/tests/test_create.py index 80261351..90fc7e0f 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -178,6 +178,7 @@ def test_dedicated_resources_affinity(self, node_spec, faker): "limits": {"cpu": 0.5, "memory": "8589934592"}, "requests": {"cpu": 0.25, "memory": "8589934592"}, }, + "nodepool": "shared", }, ], ) @@ -279,6 +280,7 @@ def test_dedicated_resources_tolerations(self, node_spec, faker): "limits": {"cpu": 0.5, "memory": "8589934592"}, "requests": {"cpu": 0.25, "memory": "8589934592"}, }, + "nodepool": "shared", }, ], ) @@ -1288,6 +1290,7 @@ def test_sql_exporter_config(): "limits": {"cpu": 0.5, "memory": "8589934592"}, "requests": {"cpu": 0.5, "memory": "8589934592"}, }, + "nodepool": "dedicated", }, False, ), @@ -1296,12 +1299,14 @@ def test_sql_exporter_config(): "resources": { "limits": {"cpu": 0.5, "memory": "8589934592"}, }, + "nodepool": "dedicated", }, False, ), ( { "resources": {"cpus": 0.5, "memory": "8589934592"}, + "nodepool": "dedicated", }, False, ), @@ -1311,6 +1316,7 @@ def test_sql_exporter_config(): "limits": {"cpu": 0.5, "memory": "8589934592"}, "requests": {"cpu": 0.25, "memory": "8589934592"}, }, + "nodepool": "shared", }, True, ),