Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DREIMP-10951: Autoscale VitessCell pods with HPA #3938

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
17 changes: 17 additions & 0 deletions paasta_tools/setup_kubernetes_cr.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
from paasta_tools.utils import load_all_configs
from paasta_tools.utils import load_system_paasta_config
from paasta_tools.vitesscell_tools import load_vitess_cell_instance_configs
from paasta_tools.vitesscell_tools import (
update_related_api_objects as update_vitess_cell_related_api_objects,
)
from paasta_tools.vitesscell_tools import VITESSCELL_KUBERNETES_NAMESPACE
from paasta_tools.vitesscluster_tools import load_vitess_cluster_instance_configs
from paasta_tools.vitesscluster_tools import VITESSCLUSTER_KUBERNETES_NAMESPACE
Expand All @@ -67,6 +70,11 @@
}


INSTANCE_TYPE_TO_RELATED_OBJECTS_UPDATER = {
"vitesscell": update_vitess_cell_related_api_objects,
}


INSTANCE_TYPE_TO_NAMESPACE_LOADER = {
"vitesscluster": VITESSCLUSTER_KUBERNETES_NAMESPACE,
"vitesscell": VITESSCELL_KUBERNETES_NAMESPACE,
Expand Down Expand Up @@ -444,6 +452,15 @@ def reconcile_kubernetes_resource(
)
else:
log.info(f"{desired_resource} is up to date, no action taken")

if crd.file_prefix in INSTANCE_TYPE_TO_RELATED_OBJECTS_UPDATER:
INSTANCE_TYPE_TO_RELATED_OBJECTS_UPDATER[crd.file_prefix](
service=service,
instance=inst,
cluster=cluster,
kube_client=kube_client,
soa_dir=DEFAULT_SOA_DIR,
)
except Exception as e:
log.error(str(e))
succeeded = False
Expand Down
143 changes: 143 additions & 0 deletions paasta_tools/vitesscell_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
from typing import Union

import service_configuration_lib
from kubernetes.client import V1ObjectMeta
from kubernetes.client import V2beta2CrossVersionObjectReference
from kubernetes.client import V2beta2HorizontalPodAutoscaler
from kubernetes.client import V2beta2HorizontalPodAutoscalerSpec

from paasta_tools.kubernetes_tools import KubeClient
from paasta_tools.kubernetes_tools import KubernetesDeploymentConfigDict
from paasta_tools.kubernetes_tools import paasta_prefixed
from paasta_tools.kubernetes_tools import sanitised_cr_name
from paasta_tools.utils import BranchDictV2
from paasta_tools.utils import deep_merge_dictionaries
Expand Down Expand Up @@ -59,6 +65,7 @@ class GatewayConfigDict(TypedDict, total=False):
extraFlags: Dict[str, str]
extraLabels: Dict[str, str]
replicas: int
yelp_selector: str
resources: Dict[str, Any]
annotations: Mapping[str, Any]

Expand Down Expand Up @@ -115,6 +122,7 @@ def get_cell_config(
},
extraLabels=labels,
replicas=replicas,
yelp_selector=",".join([f"{k}={v}" for k, v in labels.items()]),
resources={
"requests": requests,
"limits": requests,
Expand Down Expand Up @@ -175,6 +183,129 @@ def get_global_lock_server(self) -> Dict[str, str]:
"rootPath": TOPO_GLOBAL_ROOT,
}

def get_autoscaling_target(self, name: str) -> V2beta2CrossVersionObjectReference:
return V2beta2CrossVersionObjectReference(
api_version="planetscale.com/v2", kind="VitessCell", name=name
)

def get_autoscaling_metric_spec(
self,
name: str,
cluster: str,
kube_client: KubeClient,
namespace: str,
) -> Optional[V2beta2HorizontalPodAutoscaler]:
Copy link
Member Author

@siadat siadat Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, the autoscaling for vitess seems different enough that it's probably more maintainable to have a separate script that we use to configure HPAs rather than mix it into the existing code for long-running services

@nemacysts I added this method override to avoid using the existing one in KubernetesDeploymentConfig.

Afaiu, atm the HPA for the deployments created by VitessCell look like a regular HPA that uses CPU metrics... but I guess we'd like to separate autoscaling deployments created by CRs vs otherwise in general and in case they diverge in the future? so I copied the code from KubernetesDeploymentConfig to this class.

# Returns None if an HPA should not be attached based on the config,
# or the config is invalid.

if self.get_desired_state() == "stop":
return None

if not self.is_autoscaling_enabled():
return None

autoscaling_params = self.get_autoscaling_params()
if autoscaling_params["metrics_providers"][0]["decision_policy"] == "bespoke":
return None

min_replicas = self.get_min_instances()
max_replicas = self.get_max_instances()
if min_replicas == 0 or max_replicas == 0:
log.error(
f"Invalid value for min or max_instances on {name}: {min_replicas}, {max_replicas}"
)
return None

metrics = []
for provider in autoscaling_params["metrics_providers"]:
spec = self.get_autoscaling_provider_spec(name, namespace, provider)
if spec is not None:
metrics.append(spec)
scaling_policy = self.get_autoscaling_scaling_policy(
max_replicas,
autoscaling_params,
)

labels = {
paasta_prefixed("service"): self.service,
paasta_prefixed("instance"): self.instance,
paasta_prefixed("pool"): self.get_pool(),
paasta_prefixed("managed"): "true",
}

hpa = V2beta2HorizontalPodAutoscaler(
kind="HorizontalPodAutoscaler",
metadata=V1ObjectMeta(
name=name, namespace=namespace, annotations=dict(), labels=labels
),
spec=V2beta2HorizontalPodAutoscalerSpec(
behavior=scaling_policy,
max_replicas=max_replicas,
min_replicas=min_replicas,
metrics=metrics,
scale_target_ref=self.get_autoscaling_target(name),
),
)

return hpa

def get_min_instances(self) -> Optional[int]:
vtgate_resources = self.config_dict.get("vtgate_resources")
return vtgate_resources.get("min_instances", 1)

def get_max_instances(self) -> Optional[int]:
vtgate_resources = self.config_dict.get("vtgate_resources")
return vtgate_resources.get("max_instances")

def update_related_api_objects(
self,
kube_client: KubeClient,
):
name = sanitised_cr_name(self.service, self.instance)

min_instances = self.get_min_instances()
max_instances = self.get_max_instances()
should_exist = min_instances and max_instances

exists = (
len(
kube_client.autoscaling.list_namespaced_horizontal_pod_autoscaler(
field_selector=f"metadata.name={name}",
namespace=self.get_namespace(),
limit=1,
).items
)
> 0
)

if should_exist:
hpa = self.get_autoscaling_metric_spec(
name=sanitised_cr_name(self.service, self.instance),
cluster=self.get_cluster(),
kube_client=kube_client,
namespace=self.get_namespace(),
)
if not hpa:
return

if exists:
kube_client.autoscaling.replace_namespaced_horizontal_pod_autoscaler(
name=name,
namespace=self.get_namespace(),
body=hpa,
)
else:
log.info(f"Creating HPA for {name} in {self.get_namespace()}")
kube_client.autoscaling.create_namespaced_horizontal_pod_autoscaler(
namespace=self.get_namespace(),
body=hpa,
)
elif exists:
kube_client.autoscaling.delete_namespaced_horizontal_pod_autoscaler(
name=name,
namespace=self.get_namespace(),
)

def get_vitess_cell_config(self) -> VitessCellConfigDict:
cell = self.config_dict.get("cell")
all_cells = self.config_dict.get("cells")
Expand Down Expand Up @@ -278,6 +409,18 @@ def load_vitess_cell_instance_configs(
return vitess_cell_instance_configs


def update_related_api_objects(
service: str,
instance: str,
cluster: str,
kube_client: KubeClient,
soa_dir: str = DEFAULT_SOA_DIR,
) -> None:
load_vitess_cell_instance_config(
service, instance, cluster, soa_dir=soa_dir
).update_related_api_objects(kube_client)


# TODO: read this from CRD in service configs
def cr_id(service: str, instance: str) -> Mapping[str, str]:
return dict(
Expand Down
2 changes: 2 additions & 0 deletions paasta_tools/vitesscluster_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class RequestsDict(TypedDict, total=False):

class ResourceConfigDict(TypedDict, total=False):
replicas: int
min_instances: Optional[int]
max_instances: Optional[int]
requests: Dict[str, RequestsDict]
limits: Dict[str, RequestsDict]

Expand Down