Skip to content

Commit

Permalink
Add observer that updates endpoints on relation (#58)
Browse files Browse the repository at this point in the history
* Pass prebuilt charm to tests for local execution

* Enable postgres process for full cluster restart test

* Revert "Pass prebuilt charm to tests for local execution"

This reverts commit cc7f199.

* Add observer that updates endpoints on relation

* Fixing logic

* Bump up ops framework

* Fix the logic to update the unit databag when it's clearead on another unit

* Fix observer start

* Fixes in the test

* Fix part of the unit tests

* Fix remaining unit tests

* Minor adjustments in the code

* Fix machine start

* Add copyright notice

* Add retries when dropping continuous_writes table

* Terminating continuous_writes in test charm

* Version bump for deps

* Revert "Version bump for deps"

This reverts commit 8e70a6f.

* Version bump for libs

* Switch test app charm to peer relation for storing cont writes PId

* Code review tweaks

* Fix TLS test

* Comment logs

* Fix get_member_ip

* Add a call to relation endpoints update

* Fix and revert changes

* Change sleep time

* Revert changes

* Minor fixes

* Revert lib changes

* Revert requirements changes

* Uncomment charms removal

* Remove log calls

* Add unit tests to the observer

* Fix incorrect user deletion

* Fix unit tests

---------

Co-authored-by: Dragomir Penev <[email protected]>
  • Loading branch information
marceloneppel and dragomirp authored Feb 12, 2023
1 parent 5b228ee commit f4b372b
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 220 deletions.
61 changes: 39 additions & 22 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
RemoveRaftMemberFailedError,
SwitchoverFailedError,
)
from cluster_topology_observer import (
ClusterTopologyChangeCharmEvents,
ClusterTopologyObserver,
)
from constants import (
PEER,
REPLICATION_PASSWORD_KEY,
Expand All @@ -69,11 +73,15 @@
class PostgresqlOperatorCharm(CharmBase):
"""Charmed Operator for the PostgreSQL database."""

on = ClusterTopologyChangeCharmEvents()

def __init__(self, *args):
super().__init__(*args)

self._postgresql_service = "postgresql"

self._observer = ClusterTopologyObserver(self)
self.framework.observe(self.on.cluster_topology_change, self._on_cluster_topology_change)
self.framework.observe(self.on.install, self._on_install)
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.leader_elected, self._on_leader_elected)
Expand All @@ -96,6 +104,7 @@ def __init__(self, *args):
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
)
self._observer.start_observer()

@property
def app_peer_data(self) -> Dict:
Expand Down Expand Up @@ -227,9 +236,7 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.update_config()

if self.primary_endpoint:
self.postgresql_client_relation.update_endpoints()
self.legacy_db_relation.update_endpoints()
self.legacy_db_admin_relation.update_endpoints()
self._update_relation_endpoints()
else:
self.unit.status = BlockedStatus("no primary in the cluster")
return
Expand Down Expand Up @@ -278,9 +285,7 @@ def _on_pgdata_storage_detaching(self, _) -> None:
# A cluster can have all members as replicas for some time after
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self.postgresql_client_relation.update_endpoints()
self.legacy_db_relation.update_endpoints()
self.legacy_db_admin_relation.update_endpoints()
self._update_relation_endpoints()

def _on_peer_relation_changed(self, event: RelationChangedEvent):
"""Reconfigure cluster members when something changes."""
Expand Down Expand Up @@ -319,18 +324,11 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent):
event.defer()
return

# If the unit is not the leader, just set an ActiveStatus.
if not self.unit.is_leader():
self.unit.status = ActiveStatus()
return

# Only update the connection endpoints if there is a primary.
# A cluster can have all members as replicas for some time after
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self.postgresql_client_relation.update_endpoints()
self.legacy_db_relation.update_endpoints()
self.legacy_db_admin_relation.update_endpoints()
self._update_relation_endpoints()
self.unit.status = ActiveStatus()
else:
self.unit.status = BlockedStatus("no primary in the cluster")
Expand Down Expand Up @@ -517,6 +515,12 @@ def _unit_ip(self) -> str:
"""Current unit ip."""
return str(self.model.get_binding(PEER).network.bind_address)

def _on_cluster_topology_change(self, _):
"""Updates endpoints and (optionally) certificates when the cluster topology changes."""
logger.info("Cluster topology changed")
self._update_relation_endpoints()
self._update_certificate()

def _on_install(self, event: InstallEvent) -> None:
"""Install prerequisites for the application."""
if not self._is_storage_attached():
Expand Down Expand Up @@ -593,9 +597,7 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
# A cluster can have all members as replicas for some time after
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self.postgresql_client_relation.update_endpoints()
self.legacy_db_relation.update_endpoints()
self.legacy_db_admin_relation.update_endpoints()
self._update_relation_endpoints()
else:
self.unit.status = BlockedStatus("no primary in the cluster")

Expand Down Expand Up @@ -644,6 +646,8 @@ def _on_start(self, event: StartEvent) -> None:
event.defer()
return

self.unit_peer_data.update({"ip": self.get_hostname_by_unit(None)})

# Only the leader can bootstrap the cluster.
# On replicas, only prepare for starting the instance later.
if not self.unit.is_leader():
Expand Down Expand Up @@ -683,6 +687,10 @@ def _start_primary(self, event: StartEvent) -> None:

# Set the flag to enable the replicas to start the Patroni service.
self._peers.data[self.app]["cluster_initialised"] = "True"

# Clear unit data if this unit became a replica after a failover/switchover.
self._update_relation_endpoints()

self.unit.status = ActiveStatus()

def _start_replica(self, event) -> None:
Expand All @@ -693,6 +701,9 @@ def _start_replica(self, event) -> None:
event.defer()
return

# Clear unit data if this unit is still replica.
self._update_relation_endpoints()

# Member already started, so we can set an ActiveStatus.
# This can happen after a reboot.
if self._patroni.member_started:
Expand Down Expand Up @@ -767,12 +778,12 @@ def _on_set_password(self, event: ActionEvent) -> None:
event.set_results({f"{username}-password": password})

def _on_update_status(self, _) -> None:
"""Update endpoints of the postgres client relation and update users list."""
self.postgresql_client_relation.update_endpoints()
self.legacy_db_relation.update_endpoints()
self.legacy_db_admin_relation.update_endpoints()
"""Update users list in the database."""
if "cluster_initialised" not in self._peers.data[self.app]:
return

self.postgresql_client_relation.oversee_users()
self._update_certificate()
self._update_relation_endpoints()

# Restart the workload if it's stuck on the starting state after a restart.
if (
Expand Down Expand Up @@ -942,6 +953,12 @@ def update_config(self) -> None:
self._peers.data[self.unit].pop("postgresql_restarted", None)
self.on[self.restart_manager.name].acquire_lock.emit()

def _update_relation_endpoints(self) -> None:
"""Updates endpoints and read-only endpoint in all relations."""
self.postgresql_client_relation.update_endpoints()
self.legacy_db_relation.update_endpoints()
self.legacy_db_admin_relation.update_endpoints()


if __name__ == "__main__":
main(PostgresqlOperatorCharm)
20 changes: 14 additions & 6 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def get_member_ip(self, member_name: str) -> str:
IP address of the cluster member.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)):
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
cluster_status = requests.get(
Expand All @@ -199,7 +199,7 @@ def get_primary(self, unit_name_pattern=False) -> str:
primary pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)):
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
cluster_status = requests.get(
Expand All @@ -221,10 +221,18 @@ def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if attempt.retry_state.attempt_number > 1:
url = self._patroni_url.replace(
self.unit_ip, list(self.peers_ips)[attempt.retry_state.attempt_number - 2]
)
attempt_number = attempt.retry_state.attempt_number
if attempt_number > 1:
url = self._patroni_url
# Build the URL using http and later using https for each peer.
if (attempt_number - 1) <= len(self.peers_ips):
url = url.replace("https://", "http://")
unit_number = attempt_number - 2
else:
url = url.replace("http://", "https://")
unit_number = attempt_number - 2 - len(self.peers_ips)
other_unit_ip = list(self.peers_ips)[unit_number]
url = url.replace(self.unit_ip, other_unit_ip)
else:
url = self._patroni_url
return url
Expand Down
153 changes: 153 additions & 0 deletions src/cluster_topology_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

"""Cluster topology changes observer."""

import logging
import os
import signal
import subprocess
import sys
from time import sleep

import requests
from ops.charm import CharmBase, CharmEvents
from ops.framework import EventBase, EventSource, Object
from ops.model import ActiveStatus

from constants import API_REQUEST_TIMEOUT, PATRONI_CLUSTER_STATUS_ENDPOINT

logger = logging.getLogger(__name__)

# File path for the spawned cluster topology observer process to write logs.
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"


class ClusterTopologyChangeEvent(EventBase):
"""A custom event for cluster topology changes."""


class ClusterTopologyChangeCharmEvents(CharmEvents):
"""A CharmEvents extension for cluster topology changes.
Includes :class:`ClusterTopologyChangeEvent` in those that can be handled.
"""

cluster_topology_change = EventSource(ClusterTopologyChangeEvent)


class ClusterTopologyObserver(Object):
"""Observes changing topology in the cluster.
Observed cluster topology changes cause :class"`ClusterTopologyChangeEvent` to be emitted.
"""

def __init__(self, charm: CharmBase):
"""Constructor for ClusterTopologyObserver.
Args:
charm: the charm that is instantiating the library.
"""
super().__init__(charm, "cluster-topology-observer")

self._charm = charm

def start_observer(self):
"""Start the cluster topology observer running in a new process."""
if (
not isinstance(self._charm.unit.status, ActiveStatus)
or self._charm._peers is None
or "observer-pid" in self._charm._peers.data[self._charm.unit]
):
return

logging.info("Starting cluster topology observer process")

# We need to trick Juju into thinking that we are not running
# in a hook context, as Juju will disallow use of juju-run.
new_env = os.environ.copy()
if "JUJU_CONTEXT_ID" in new_env:
new_env.pop("JUJU_CONTEXT_ID")

pid = subprocess.Popen(
[
"/usr/bin/python3",
"src/cluster_topology_observer.py",
self._charm._patroni._patroni_url,
f"{self._charm._patroni.verify}",
"/usr/bin/juju-run",
self._charm.unit.name,
self._charm.charm_dir,
],
stdout=open(LOG_FILE_PATH, "a"),
stderr=subprocess.STDOUT,
env=new_env,
).pid

self._charm._peers.data[self._charm.unit].update({"observer-pid": f"{pid}"})
logging.info("Started cluster topology observer process with PID {}".format(pid))

def stop_observer(self):
"""Stop the running observer process if we have previously started it."""
if (
self._charm._peers is None
or "observer-pid" not in self._charm._peers.data[self._charm.unit]
):
return

observer_pid = int(self._charm._peers.data[self._charm.unit].get("observer-pid"))

try:
os.kill(observer_pid, signal.SIGINT)
msg = "Stopped running cluster topology observer process with PID {}"
logging.info(msg.format(observer_pid))
self._charm._peers.data[self._charm.unit].update({"observer-pid": ""})
except OSError:
pass

@property
def unit_tag(self):
"""Juju-style tag identifying the unit being run by this charm."""
unit_num = self._charm.unit.name.split("/")[-1]
return "unit-{}-{}".format(self._charm.app.name, unit_num)


def dispatch(run_cmd, unit, charm_dir):
"""Use the input juju-run command to dispatch a :class:`ClusterTopologyChangeEvent`."""
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/cluster_topology_change {}/dispatch"
subprocess.run([run_cmd, "-u", unit, dispatch_sub_cmd.format(charm_dir)])


def main():
"""Main watch and dispatch loop.
Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
"""
patroni_url, verify, run_cmd, unit, charm_dir = sys.argv[1:]

previous_cluster_topology = {}
while True:
cluster_status = requests.get(
f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=verify,
timeout=API_REQUEST_TIMEOUT,
)
current_cluster_topology = {
member["name"]: member["role"] for member in cluster_status.json()["members"]
}

# If it's the first time the cluster topology was retrieved, then store it and use
# it for subsequent checks.
if not previous_cluster_topology:
previous_cluster_topology = current_cluster_topology
# If the cluster topology changed, dispatch a charm event to handle this change.
elif current_cluster_topology != previous_cluster_topology:
previous_cluster_topology = current_cluster_topology
dispatch(run_cmd, unit, charm_dir)

# Wait some time before checking again for a cluster topology change.
sleep(30)


if __name__ == "__main__":
main()
Loading

0 comments on commit f4b372b

Please sign in to comment.