Skip to content

Commit

Permalink
Add nodepool to the CRD
Browse files Browse the repository at this point in the history
  • Loading branch information
tomach committed Nov 8, 2023
1 parent 84ad1a7 commit d53aeb1
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 31 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Unreleased
* Updated the ``CrateVersion`` nightly parsing to accept the new datetime format
of ``yyyy-mm-dd-hh-mm`` while still being compatible with the old ``yyyymmdd`` format.

* Changed the operator CRD to be able to specify a nodepool and set node affinity and
tolerations accordingly when creating a cluster or changing its compute.

2.31.0 (2023-09-11)
-------------------

Expand Down
46 changes: 32 additions & 14 deletions crate/operator/change_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,23 @@ async def handle( # type: ignore


def generate_change_compute_payload(old, body):
old_data = old["spec"]["nodes"]["data"][0].get("resources", {})
new_data = body["spec"]["nodes"]["data"][0].get("resources", {})
old_data = old["spec"]["nodes"]["data"][0]
new_data = body["spec"]["nodes"]["data"][0]
old_resources = old_data.get("resources", {})
new_resources = new_data.get("resources", {})
return WebhookChangeComputePayload(
old_cpu_limit=old_data.get("limits", {}).get("cpu"),
old_memory_limit=old_data.get("limits", {}).get("memory"),
old_cpu_request=old_data.get("requests", {}).get("cpu"),
old_memory_request=old_data.get("requests", {}).get("memory"),
old_heap_ratio=old_data.get("heapRatio"),
new_cpu_limit=new_data.get("limits", {}).get("cpu"),
new_memory_limit=new_data.get("limits", {}).get("memory"),
new_cpu_request=new_data.get("requests", {}).get("cpu"),
new_memory_request=new_data.get("requests", {}).get("memory"),
new_heap_ratio=new_data.get("heapRatio"),
old_cpu_limit=old_resources.get("limits", {}).get("cpu"),
old_memory_limit=old_resources.get("limits", {}).get("memory"),
old_cpu_request=old_resources.get("requests", {}).get("cpu"),
old_memory_request=old_resources.get("requests", {}).get("memory"),
old_heap_ratio=old_resources.get("heapRatio"),
old_nodepool=old_data.get("nodepool"),
new_cpu_limit=new_resources.get("limits", {}).get("cpu"),
new_memory_limit=new_resources.get("limits", {}).get("memory"),
new_cpu_request=new_resources.get("requests", {}).get("cpu"),
new_memory_request=new_resources.get("requests", {}).get("memory"),
new_heap_ratio=new_resources.get("heapRatio"),
new_nodepool=new_data.get("nodepool"),
)


Expand Down Expand Up @@ -165,8 +169,22 @@ def generate_body_patch(
"spec": {
"template": {
"spec": {
"affinity": get_statefulset_affinity(name, logger, node_spec),
"tolerations": get_tolerations(name, logger, node_spec),
"affinity": get_statefulset_affinity(
name,
logger,
{
**node_spec,
"nodepool": compute_change_data.get("new_nodepool"),
},
),
"tolerations": get_tolerations(
name,
logger,
{
**node_spec,
"nodepool": compute_change_data.get("new_nodepool"),
},
),
"containers": [node_spec],
}
}
Expand Down
5 changes: 5 additions & 0 deletions crate/operator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ class SnapshotRestoreType(enum.Enum):
TABLES = "tables"
SECTIONS = "sections"
PARTITIONS = "partitions"


class Nodepool(str, enum.Enum):
SHARED = "shared"
DEDICATED = "dedicated"
16 changes: 2 additions & 14 deletions crate/operator/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
SHARED_NODE_TOLERATION_KEY,
SHARED_NODE_TOLERATION_VALUE,
CloudProvider,
Nodepool,
Port,
)
from crate.operator.utils import crate, quorum
Expand All @@ -106,7 +107,6 @@
def get_sql_exporter_config(
owner_references: Optional[List[V1OwnerReference]], name: str, labels: LabelType
) -> V1ConfigMap:

sql_exporter_config = pkgutil.get_data("crate.operator", "data/sql-exporter.yaml")

if sql_exporter_config:
Expand Down Expand Up @@ -379,7 +379,6 @@ def get_statefulset_crate_command(
is_data: bool,
crate_version: str,
) -> List[str]:

expected_nodes_setting_name = "gateway.expected_nodes"
recover_after_nodes_setting_name = "gateway.recover_after_nodes"
expected_nodes_setting_value = total_nodes_count
Expand Down Expand Up @@ -1034,7 +1033,6 @@ async def recreate_services(
meta,
logger: logging.Logger,
):

ports_spec = spec.get("ports", {})
http_port = ports_spec.get("http", Port.HTTP.value)
postgres_port = ports_spec.get("postgres", Port.POSTGRES.value)
Expand Down Expand Up @@ -1119,16 +1117,7 @@ async def create_system_user(


def is_shared_resources_cluster(node_spec: Dict[str, Any]) -> bool:
try:
cpu_request = node_spec["resources"].get("requests", {}).get("cpu")
cpu_limit = node_spec["resources"].get("limits", {}).get("cpu")
memory_request = node_spec["resources"].get("requests", {}).get("memory")
memory_limit = node_spec["resources"].get("limits", {}).get("memory")
if not (cpu_request or memory_request):
return False
return cpu_request != cpu_limit or memory_request != memory_limit
except KeyError:
return False
return node_spec.get("nodepool") == Nodepool.SHARED


def get_cluster_resource_requests(
Expand Down Expand Up @@ -1251,7 +1240,6 @@ async def handle( # type: ignore
logger: logging.Logger,
**kwargs: Any,
):

await create_statefulset(
owner_references,
namespace,
Expand Down
1 change: 1 addition & 0 deletions crate/operator/utils/crd.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ def has_compute_changed(old_spec, new_spec) -> bool:
!= new_spec.get("resources", {}).get("limits", {}).get("memory")
or old_spec.get("resources", {}).get("requests", {}).get("memory")
!= new_spec.get("resources", {}).get("requests", {}).get("memory")
or old_spec.get("nodepool") != new_spec.get("nodepool")
)
3 changes: 3 additions & 0 deletions crate/operator/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ class WebhookChangeComputePayload(WebhookSubPayload):
old_heap_ratio: float
new_heap_ratio: float

old_nodepool: str
new_nodepool: str


class WebhookInfoChangedPayload(WebhookSubPayload):
external_ip: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ spec:
replicas:
description: Number of CrateDB nodes of this type.
type: number
nodepool:
description: Type of nodepool where the cluster should run.
type: string
resources:
properties:
cpus:
Expand Down Expand Up @@ -519,6 +522,9 @@ spec:
description: Number of master nodes. Should be an odd number.
minimum: 3
type: number
nodepool:
description: Type of nodepool where the cluster should run.
type: string
resources:
properties:
cpus:
Expand Down
11 changes: 8 additions & 3 deletions tests/test_change_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,13 @@ async def test_change_compute_from_request_to_limit(

@pytest.mark.parametrize(
"old_cpu_limit, old_memory_limit, old_cpu_request, old_memory_request, "
"new_cpu_limit, new_memory_limit, new_cpu_request, new_memory_request",
"new_cpu_limit, new_memory_limit, new_cpu_request, new_memory_request, "
"old_nodepool, new_nodepool",
[
# Test no requests
(1, "2Gi", None, None, 3, "5Gi", None, None),
(1, "2Gi", None, None, 3, "5Gi", None, None, "dedicated", "dedicated"),
# Test requests set
(1, "2Gi", None, None, 3, "5Gi", 5, "8Gi"),
(1, "2Gi", None, None, 3, "5Gi", 5, "8Gi", "shared", "shared"),
],
)
def test_generate_body_patch(
Expand All @@ -299,6 +300,8 @@ def test_generate_body_patch(
new_memory_limit,
new_cpu_request,
new_memory_request,
old_nodepool,
new_nodepool,
faker,
):
compute_change_data = WebhookChangeComputePayload(
Expand All @@ -312,6 +315,8 @@ def test_generate_body_patch(
new_memory_request=new_memory_request,
old_heap_ratio=0.25,
new_heap_ratio=0.25,
old_nodepool=old_nodepool,
new_nodepool=new_nodepool,
)

name = faker.domain_word()
Expand Down
6 changes: 6 additions & 0 deletions tests/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def test_dedicated_resources_affinity(self, node_spec, faker):
"limits": {"cpu": 0.5, "memory": "8589934592"},
"requests": {"cpu": 0.25, "memory": "8589934592"},
},
"nodepool": "shared",
},
],
)
Expand Down Expand Up @@ -279,6 +280,7 @@ def test_dedicated_resources_tolerations(self, node_spec, faker):
"limits": {"cpu": 0.5, "memory": "8589934592"},
"requests": {"cpu": 0.25, "memory": "8589934592"},
},
"nodepool": "shared",
},
],
)
Expand Down Expand Up @@ -1288,6 +1290,7 @@ def test_sql_exporter_config():
"limits": {"cpu": 0.5, "memory": "8589934592"},
"requests": {"cpu": 0.5, "memory": "8589934592"},
},
"nodepool": "dedicated",
},
False,
),
Expand All @@ -1296,12 +1299,14 @@ def test_sql_exporter_config():
"resources": {
"limits": {"cpu": 0.5, "memory": "8589934592"},
},
"nodepool": "dedicated",
},
False,
),
(
{
"resources": {"cpus": 0.5, "memory": "8589934592"},
"nodepool": "dedicated",
},
False,
),
Expand All @@ -1311,6 +1316,7 @@ def test_sql_exporter_config():
"limits": {"cpu": 0.5, "memory": "8589934592"},
"requests": {"cpu": 0.25, "memory": "8589934592"},
},
"nodepool": "shared",
},
True,
),
Expand Down

0 comments on commit d53aeb1

Please sign in to comment.