Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Implements a custom K8s client to catch ApiErrors centrally #463

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
from charms.sdcore_upf_k8s.v0.fiveg_n3 import N3Provides
from charms.sdcore_upf_k8s.v0.fiveg_n4 import N4Provides
from jinja2 import Environment, FileSystemLoader
from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError
from lightkube.models.meta_v1 import ObjectMeta
from lightkube.resources.core_v1 import Node, Pod
from ops import (
Expand All @@ -46,7 +44,8 @@

from charm_config import CharmConfig, CharmConfigInvalidError, CNIType, UpfMode
from dpdk import DPDK
from k8s_service import K8sService
from k8s_client import K8sClient
from k8s_service import K8sService, K8sServiceError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,6 +117,7 @@ def __init__(self, *args):
pod_name=self._pod_name,
hugepages_volumes=self._volumes_request_from_config(),
)
self.k8s_client = K8sClient()
self.k8s_service = K8sService(
namespace=self._namespace,
service_name=f"{self.app.name}-external",
Expand Down Expand Up @@ -155,15 +155,14 @@ def _on_remove(self, _: RemoveEvent) -> None:
# where the leader status is removed from the leader unit before all
# hooks are finished running. In this case, we will leave behind a
# dirty state in k8s, but it will be cleaned up when the juju model is
# destroyed. It will be re-used if the charm is re-deployed.
# destroyed. It will be reused if the charm is re-deployed.
self._kubernetes_multus.remove()
if self.k8s_service.is_created():
self.k8s_service.delete()

def delete_pod(self):
"""Delete the pod."""
client = Client()
client.delete(Pod, name=self._pod_name, namespace=self._namespace)
self.k8s_client.delete(Pod, name=self._pod_name, namespace=self._namespace)

@property
def _namespace(self) -> str:
Expand Down Expand Up @@ -551,7 +550,10 @@ def _on_config_changed(self, event: EventBase):
if not self._hugepages_are_available():
return
if not self.k8s_service.is_created():
self.k8s_service.create()
try:
self.k8s_service.create()
except K8sServiceError:
return
if not self._kubernetes_multus.multus_is_available():
return
self._kubernetes_multus.configure()
Expand Down Expand Up @@ -1115,15 +1117,7 @@ def _hugepages_are_available(self) -> bool:
"""
if not self._hugepages_is_enabled():
return True
client = Client()
try:
nodes = client.list(Node)
except ApiError as e:
if e.status.reason == "Unauthorized":
logger.debug("kube-apiserver not ready yet")
return False
else:
raise e
nodes = self.k8s_client.list(Node)
if not nodes:
return False
return all(node.status.allocatable.get("hugepages-1Gi", "0") >= "2Gi" for node in nodes) # type: ignore
Expand Down
14 changes: 7 additions & 7 deletions src/dpdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import logging
from typing import Iterable, Optional

from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError
from lightkube.models.core_v1 import Container
from lightkube.resources.apps_v1 import StatefulSet

from k8s_client import K8sClient, K8sClientError

logger = logging.getLogger(__name__)


Expand All @@ -33,7 +33,7 @@ def __init__(
dpdk_access_interface_resource_name: str,
dpdk_core_interface_resource_name: str,
):
self.k8s_client = Client()
self.k8s_client = K8sClient()
self.statefulset_name = statefulset_name
self.namespace = namespace
self.dpdk_resource_requirements = {
Expand Down Expand Up @@ -119,8 +119,8 @@ def _get_statefulset(self, statefulset_name: str, namespace: str) -> Optional[St
"""
try:
return self.k8s_client.get(res=StatefulSet, name=statefulset_name, namespace=namespace)
except ApiError as e:
raise DPDKError(f"Could not get statefulset `{statefulset_name}`: {e.status.message}")
except K8sClientError as e:
raise DPDKError(f"Could not get statefulset `{statefulset_name}`: {e.message}")

@staticmethod
def _get_container(
Expand Down Expand Up @@ -189,7 +189,7 @@ def _replace_statefulset(self, statefulset: StatefulSet) -> None:
try:
self.k8s_client.replace(obj=statefulset)
logger.info("Statefulset %s replaced", statefulset.metadata.name)
except ApiError as e:
except K8sClientError as e:
raise DPDKError(
f"Could not replace statefulset `{statefulset.metadata.name}`: {e.status.message}"
f"Could not replace statefulset `{statefulset.metadata.name}`: {e.message}"
)
87 changes: 87 additions & 0 deletions src/k8s_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python3
# Copyright 2025 Canonical Ltd.
# See LICENSE file for licensing details.

"""Module wrapping the default Lightkube Client, to automatically handle 401 Unauthorized.

To use Kubernetes Client from this module in your code:
1. Import K8sClient:
`from k8s_client import K8sClient`
2. Initialize K8sClient:
`kubernetes_client = K8sClient()`
"""

import functools
import logging
import types

from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError

logger = logging.getLogger(__name__)


class K8sClientError(Exception):
"""K8sClientError."""

def __init__(self, message: str):
self.message = message
super().__init__(self.message)


def try_except_all(func):
"""Wrap Lightkube Client calls with try-except block."""

@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ApiError as e:
if e.status.code == 401:
logger.warning("kube-apiserver not ready yet")
elif e.status.code == 404:
logger.warning("Requested Kubernetes resource not found")
else:
raise K8sClientError(
f"Could not perform requested Kubernetes call due to: {e.status.message}"
)
return None
return wrapper


class MetaClass(type):
"""Metaclass applying a custom wrapper on the base class' functions."""

def __new__(meta, class_name, base_classes, class_dict): # noqa: N804
"""See if any of the base classes were created by with_metaclass() function."""
marker = None
for base in base_classes:
if hasattr(base, '_marker'):
marker = getattr(base, '_marker')
break

if class_name == marker:
return type.__new__(meta, class_name, base_classes, class_dict)

temp_class = type.__new__(meta, 'TempClass', base_classes, class_dict)

new_class_dict = {}
for cls in temp_class.mro():
for attribute_name, attribute_type in cls.__dict__.items():
if isinstance(attribute_type, types.FunctionType):
attribute_type = try_except_all(attribute_type)
new_class_dict[attribute_name] = attribute_type

return type.__new__(meta, class_name, base_classes, new_class_dict)


def with_metaclass(meta, classname, bases):
"""Create a class with the supplied bases and metaclass."""
return type.__new__(meta, classname, bases, {'_marker': classname})


class K8sClient(
with_metaclass(MetaClass, 'WrappedK8sClient', (Client, object))
):
"""Custom K8s client automatically catching 401 Unauthorized error."""
pass
31 changes: 19 additions & 12 deletions src/k8s_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@
import logging
from typing import Optional

from httpx import HTTPStatusError
from lightkube.core.client import Client
from lightkube.models.core_v1 import ServicePort, ServiceSpec
from lightkube.models.meta_v1 import ObjectMeta
from lightkube.resources.core_v1 import Service

from k8s_client import K8sClient, K8sClientError

logger = logging.getLogger(__name__)


class K8sServiceError(Exception):
"""K8sServiceError."""

def __init__(self, message: str):
self.message = message
super().__init__(self.message)


class K8sService:
"""A class to manage the external UPF service."""

Expand All @@ -24,7 +32,7 @@ def __init__(self, namespace: str, service_name: str, app_name: str, pfcp_port:
self.service_name = service_name
self.app_name = app_name
self.pfcp_port = pfcp_port
self.client = Client()
self.client = K8sClient()

def create(self) -> None:
"""Create the external UPF service."""
Expand All @@ -48,17 +56,16 @@ def create(self) -> None:
type="LoadBalancer",
),
)
self.client.apply(service, field_manager=self.app_name)
logger.info("Created/asserted existence of the external UPF service")
try:
self.client.apply(service, field_manager=self.app_name)
logger.info("Created/asserted existence of the external UPF service")
except K8sClientError as e:
raise K8sServiceError(f"Could not create UPF service due to: {e.message}")

def is_created(self) -> bool:
"""Check if the external UPF service exists."""
try:
self.client.get(Service, name=self.service_name, namespace=self.namespace)
if self.client.get(Service, name=self.service_name, namespace=self.namespace):
return True
except HTTPStatusError as status:
if status.response.status_code == 404:
return False
return False

def delete(self) -> None:
Expand All @@ -70,8 +77,8 @@ def delete(self) -> None:
namespace=self.namespace,
)
logger.info("Deleted external UPF service")
except HTTPStatusError as status:
logger.info("Could not delete %s due to: %s", self.service_name, status)
except K8sClientError as e:
logger.warning("Could not delete %s due to: %s", self.service_name, e.message)

def get_hostname(self) -> Optional[str]:
"""Get the hostname of the external UPF service."""
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
class UPFUnitTestFixtures:
patcher_k8s_client = patch("lightkube.core.client.GenericSyncClient")
patcher_client_list = patch("lightkube.core.client.Client.list")
patcher_k8sclient_list = patch("k8s_client.K8sClient.list")
patcher_k8s_service = patch("charm.K8sService", autospec=K8sService)
patcher_huge_pages_is_patched = patch(
"charm.KubernetesHugePagesPatchCharmLib.is_patched",
Expand All @@ -31,6 +32,7 @@ class UPFUnitTestFixtures:
def setup(self, request):
self.mock_k8s_client = UPFUnitTestFixtures.patcher_k8s_client.start().return_value
self.mock_client_list = UPFUnitTestFixtures.patcher_client_list.start()
self.mock_k8sclient_list = UPFUnitTestFixtures.patcher_k8sclient_list.start()
self.mock_k8s_service = UPFUnitTestFixtures.patcher_k8s_service.start().return_value
self.mock_huge_pages_is_patched = UPFUnitTestFixtures.patcher_huge_pages_is_patched.start()
self.mock_multus_is_available = UPFUnitTestFixtures.patcher_multus_is_available.start()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_charm_collect_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_given_hugepages_unavailable_when_collect_unit_status_then_status_is_blo
self,
):
self.mock_check_output.return_value = b"Flags: avx2 ssse3 fma cx16 rdrand pdpe1gb"
self.mock_client_list.return_value = []
self.mock_k8sclient_list.return_value = []
state_in = testing.State(
leader=True,
config={
Expand Down
Loading
Loading