Skip to content

Commit

Permalink
feat: Seldon Metrics Discovery
Browse files Browse the repository at this point in the history
#68

Summary of changes:
- Merged metrics-discovery branch created at Observability Workshop.
- Modified code to integrate Metrics Endpoints Observer.
- Updated libs.

Merge remote-tracking branch 'origin/metrics-discovery' into feat-metrics-discovery
  • Loading branch information
Ivan Chvets committed Dec 7, 2022
2 parents 0977147 + c52f244 commit aefbc95
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 56 deletions.
16 changes: 8 additions & 8 deletions lib/charms/grafana_k8s/v0/grafana_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def __init__(self, *args):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 16
LIBPATCH = 18

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -615,7 +615,7 @@ def _replace_template_fields( # noqa: C901
if datasources or not existing_templates:
panels = dict_content["panels"]

# Go through all of the panels. If they have a datasource set, AND it's one
# Go through all the panels. If they have a datasource set, AND it's one
# that we can convert to ${lokids} or ${prometheusds}, by stripping off the
# ${} templating and comparing the name to the list we built, replace it,
# otherwise, leave it alone.
Expand Down Expand Up @@ -710,7 +710,7 @@ def _inject_labels(content: str, topology: dict, transformer: "CosTool") -> str:
if "panels" not in dict_content.keys():
return json.dumps(dict_content)

# Go through all of the panels and inject topology labels
# Go through all the panels and inject topology labels
# Panels may have more than one 'target' where the expressions live, so that must be
# accounted for. Additionally, `promql-transform` does not necessarily gracefully handle
# expressions with range queries including variables. Exclude these.
Expand Down Expand Up @@ -924,7 +924,7 @@ def __init__(
If you would like to use relation name other than `grafana-dashboard`,
you will need to specify the relation name via the `relation_name`
argument when instantiating the :class:`GrafanaDashboardProvider` object.
However, it is strongly advised to keep the the default relation name,
However, it is strongly advised to keep the default relation name,
so that people deploying your charm will have a consistent experience
with all other charms that provide Grafana dashboards.
Expand Down Expand Up @@ -1053,7 +1053,7 @@ def _update_all_dashboards_from_dir(
# Path.glob uses fnmatch on the backend, which is pretty limited, so use a
# custom function for the filter
def _is_dashboard(p: Path) -> bool:
return p.is_file and p.name.endswith((".json", ".json.tmpl", ".tmpl"))
return p.is_file() and p.name.endswith((".json", ".json.tmpl", ".tmpl"))

for path in filter(_is_dashboard, Path(self._dashboards_path).glob("*")):
# path = Path(path)
Expand Down Expand Up @@ -1105,7 +1105,7 @@ def _reinitialize_dashboard_data(self, inject_dropdowns: bool = True) -> None:
del stored_dashboard_templates[dashboard_id]
self._stored.dashboard_templates = stored_dashboard_templates

# With all of the file-based dashboards cleared out, force a refresh
# With all the file-based dashboards cleared out, force a refresh
# of relation data
if self._charm.unit.is_leader():
for dashboard_relation in self._charm.model.relations[self._relation_name]:
Expand Down Expand Up @@ -1155,7 +1155,7 @@ def _content_to_dashboard_object(self, content: str, inject_dropdowns: bool = Tr
return {
"charm": self._charm.meta.name,
"content": content,
"juju_topology": self._juju_topology,
"juju_topology": self._juju_topology if inject_dropdowns else {},
"inject_dropdowns": inject_dropdowns,
}

Expand Down Expand Up @@ -1752,7 +1752,7 @@ def _maybe_get_builtin_dashboards(self, event: RelationEvent) -> Dict:
if dashboards_path:

def _is_dashboard(p: Path) -> bool:
return p.is_file and p.name.endswith((".json", ".json.tmpl", ".tmpl"))
return p.is_file() and p.name.endswith((".json", ".json.tmpl", ".tmpl"))

for path in filter(_is_dashboard, Path(dashboards_path).glob("*")):
# path = Path(path)
Expand Down
11 changes: 7 additions & 4 deletions lib/charms/observability_libs/v0/juju_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
LIBID = "bced1658f20f49d28b88f61f83c2d232"

LIBAPI = 0
LIBPATCH = 3
LIBPATCH = 4


class InvalidUUIDError(Exception):
Expand All @@ -94,8 +94,8 @@ def __init__(
model: str,
model_uuid: str,
application: str,
unit: str = None,
charm_name: str = None,
unit: Optional[str] = None,
charm_name: Optional[str] = None,
):
"""Build a JujuTopology object.
Expand Down Expand Up @@ -181,7 +181,10 @@ def from_dict(cls, data: dict):
)

def as_dict(
self, *, remapped_keys: Dict[str, str] = None, excluded_keys: List[str] = None
self,
*,
remapped_keys: Optional[Dict[str, str]] = None,
excluded_keys: Optional[List[str]] = None,
) -> OrderedDict:
"""Format the topology information into an ordered dict.
Expand Down
225 changes: 225 additions & 0 deletions lib/charms/observability_libs/v0/metrics_endpoint_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

"""# MetricsEndpointDiscovery Library.
This library provides functionality for discovering metrics endpoints exposed
by applications deployed to a Kubernetes cluster.
It comprises:
- A custom event and event source for handling metrics endpoint changes.
- Logic to observe cluster events and emit the events as appropriate.
## Using the Library
### Handling Events
To ensure that your charm can react to changing metrics endpoint events,
use the CharmEvents extension.
```python
import json
from charms.observability_libs.v0.metrics_endpoint_discovery import
MetricsEndpointCharmEvents,
MetricsEndpointObserver
)
class MyCharm(CharmBase):
on = MetricsEndpointChangeCharmEvents()
def __init__(self, *args):
super().__init__(*args)
self._observer = MetricsEndpointObserver(self, {"app.kubernetes.io/name": ["grafana-k8s"]})
self.framework.observe(self.on.metrics_endpoint_change, self._on_endpoints_change)
def _on_endpoints_change(self, event):
self.unit.status = ActiveStatus(json.dumps(event.discovered))
```
"""

import json
import logging
import os
import signal
import subprocess
import sys
from typing import Dict, Iterable

from lightkube import Client
from lightkube.resources.core_v1 import Pod
from ops.charm import CharmBase, CharmEvents
from ops.framework import EventBase, EventSource, Object

logger = logging.getLogger(__name__)

# The unique Charmhub library identifier, never change it
LIBID = "a141d5620152466781ed83aafb948d03"

# Increment this major API version when introducing breaking changes
LIBAPI = 0

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2

# File path where metrics endpoint change data is written for exchange
# between the discovery process and the materialised event.
PAYLOAD_FILE_PATH = "/tmp/metrics-endpoint-payload.json"

# File path for the spawned discovery process to write logs.
LOG_FILE_PATH = "/var/log/discovery.log"


class MetricsEndpointChangeEvent(EventBase):
"""A custom event for metrics endpoint changes."""

def __init__(self, handle):
super().__init__(handle)

with open(PAYLOAD_FILE_PATH, "r") as f:
self._discovered = json.loads(f.read())

def snapshot(self):
"""Save the event payload data."""
return {"payload": self._discovered}

def restore(self, snapshot):
"""Restore the event payload data."""
self._discovered = {}

if snapshot:
self._discovered = snapshot["payload"]

@property
def discovered(self):
"""Return the payload of detected endpoint changes for this event."""
return self._discovered


class MetricsEndpointChangeCharmEvents(CharmEvents):
"""A CharmEvents extension for metrics endpoint changes.
Includes :class:`MetricsEndpointChangeEvent` in those that can be handled.
"""

metrics_endpoint_change = EventSource(MetricsEndpointChangeEvent)


class MetricsEndpointObserver(Object):
"""Observes changing metrics endpoints in the cluster.
Observed endpoint changes cause :class"`MetricsEndpointChangeEvent` to be emitted.
"""

def __init__(self, charm: CharmBase, labels: Dict[str, Iterable]):
"""Constructor for MetricsEndpointObserver.
Args:
charm: the charm that is instantiating the library.
labels: dictionary of label/value to be observed for changing metrics endpoints.
"""
super().__init__(charm, "metrics-endpoint-observer")

self._charm = charm
self._observer_pid = 0

self._labels = labels

def start_observer(self):
"""Start the metrics endpoint observer running in a new process."""
self.stop_observer()

logging.info("Starting metrics endpoint observer process")

# We need to trick Juju into thinking that we are not running
# in a hook context, as Juju will disallow use of juju-run.
new_env = os.environ.copy()
if "JUJU_CONTEXT_ID" in new_env:
new_env.pop("JUJU_CONTEXT_ID")

pid = subprocess.Popen(
[
"/usr/bin/python3",
"lib/charms/observability_libs/v{}/metrics_endpoint_discovery.py".format(LIBAPI),
json.dumps(self._labels),
"/var/lib/juju/tools/{}/juju-run".format(self.unit_tag),
self._charm.unit.name,
self._charm.charm_dir,
],
stdout=open(LOG_FILE_PATH, "a"),
stderr=subprocess.STDOUT,
env=new_env,
).pid

self._observer_pid = pid
logging.info("Started metrics endopint observer process with PID {}".format(pid))

def stop_observer(self):
"""Stop the running observer process if we have previously started it."""
if not self._observer_pid:
return

try:
os.kill(self._observer_pid, signal.SIGINT)
msg = "Stopped running metrics endpoint observer process with PID {}"
logging.info(msg.format(self._observer_pid))
except OSError:
pass

@property
def unit_tag(self):
"""Juju-style tag identifying the unit being run by this charm."""
unit_num = self._charm.unit.name.split("/")[-1]
return "unit-{}-{}".format(self._charm.app.name, unit_num)


def write_payload(payload):
"""Write the input event data to event payload file."""
with open(PAYLOAD_FILE_PATH, "w") as f:
f.write(json.dumps(payload))


def dispatch(run_cmd, unit, charm_dir):
"""Use the input juju-run command to dispatch a :class:`MetricsEndpointChangeEvent`."""
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/metrics_endpoint_change {}/dispatch"
subprocess.run([run_cmd, "-u", unit, dispatch_sub_cmd.format(charm_dir)])


def main():
"""Main watch and dispatch loop.
Watch the input k8s service names. When changes are detected, write the
observed data to the payload file, and dispatch the change event.
"""
labels, run_cmd, unit, charm_dir = sys.argv[1:]

client = Client()
labels = json.loads(labels)

for change, entity in client.watch(Pod, namespace="*", labels=labels):
meta = entity.metadata
metrics_path = ""
if entity.metadata.annotations.get("prometheus.io/path", ""):
metrics_path = entity.metadata.annotations.get("prometheus.io/path", "")

target_ports = []
for c in filter(lambda c: c.ports is not None, entity.spec.containers):
for p in filter(lambda p: p.name == "metrics", c.ports):
target_ports.append("*:{}".format(p.containerPort))

payload = {
"change": change,
"namespace": meta.namespace,
"name": meta.name,
"path": metrics_path,
"targets": target_ports or ["*:80"],
}

write_payload(payload)
dispatch(run_cmd, unit, charm_dir)


if __name__ == "__main__":
main()
18 changes: 9 additions & 9 deletions lib/charms/observability_libs/v1/kubernetes_service_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def setUp(self, *unused):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 4
LIBPATCH = 5

ServiceType = Literal["ClusterIP", "LoadBalancer"]

Expand All @@ -158,11 +158,11 @@ def __init__(
self,
charm: CharmBase,
ports: List[ServicePort],
service_name: str = None,
service_name: Optional[str] = None,
service_type: ServiceType = "ClusterIP",
additional_labels: dict = None,
additional_selectors: dict = None,
additional_annotations: dict = None,
additional_labels: Optional[dict] = None,
additional_selectors: Optional[dict] = None,
additional_annotations: Optional[dict] = None,
*,
refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
):
Expand Down Expand Up @@ -213,11 +213,11 @@ def __init__(
def _service_object(
self,
ports: List[ServicePort],
service_name: str = None,
service_name: Optional[str] = None,
service_type: ServiceType = "ClusterIP",
additional_labels: dict = None,
additional_selectors: dict = None,
additional_annotations: dict = None,
additional_labels: Optional[dict] = None,
additional_selectors: Optional[dict] = None,
additional_annotations: Optional[dict] = None,
) -> Service:
"""Creates a valid Service representation.
Expand Down
Loading

0 comments on commit aefbc95

Please sign in to comment.