Skip to content

Commit

Permalink
Persist TLS private key across TLS relation created/broken
Browse files Browse the repository at this point in the history
Fixes #56

Depends on #51
  • Loading branch information
carlcsaposs-canonical committed May 18, 2023
1 parent 870b7eb commit 010c57d
Showing 1 changed file with 72 additions and 58 deletions.
130 changes: 72 additions & 58 deletions src/relations/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class _PeerUnitDatabag:
"""Peer relation unit databag"""

key: str
private_key: str
# CSR stands for certificate signing request
requested_csr: str
active_csr: str
Expand Down Expand Up @@ -60,9 +60,12 @@ def __delattr__(self, name: str) -> None:
self._databag.pop(self._get_key(name), None)

def clear(self) -> None:
"""Delete all items in databag."""
for name in self._attribute_names:
delattr(self, name)
"""Delete all items in databag except for private key."""
del self.requested_csr
del self.active_csr
del self.certificate
del self.ca
del self.chain


@dataclasses.dataclass(kw_only=True)
Expand All @@ -71,21 +74,12 @@ class _Relation:

_charm: "charm.MySQLRouterOperatorCharm"
_interface: tls_certificates.TLSCertificatesRequiresV1

@property
def _peer_relation(self) -> ops.Relation:
"""MySQL Router charm peer relation"""
return self._charm.model.get_relation(_PEER_RELATION_ENDPOINT_NAME)

@property
def peer_unit_databag(self) -> _PeerUnitDatabag:
"""MySQL Router charm peer relation unit databag"""
return _PeerUnitDatabag(self._peer_relation.data[self._charm.unit])
_peer_unit_databag: _PeerUnitDatabag

@property
def certificate_saved(self) -> bool:
"""Whether a TLS certificate is available to use"""
for value in [self.peer_unit_databag.certificate, self.peer_unit_databag.ca]:
for value in [self._peer_unit_databag.certificate, self._peer_unit_databag.ca]:
if not value:
return False
return True
Expand All @@ -94,39 +88,29 @@ def save_certificate(self, event: tls_certificates.CertificateAvailableEvent) ->
"""Save TLS certificate in peer relation unit databag."""
if (
event.certificate_signing_request.strip()
!= self.peer_unit_databag.requested_csr.strip()
!= self._peer_unit_databag.requested_csr.strip()
):
logger.warning("Unknown certificate received. Ignoring.")
return
if (
self.certificate_saved
and event.certificate_signing_request.strip()
== self.peer_unit_databag.active_csr.strip()
== self._peer_unit_databag.active_csr.strip()
):
# Workaround for https://github.com/canonical/tls-certificates-operator/issues/34
logger.debug("TLS certificate already saved.")
return
logger.debug(f"Saving TLS certificate {event=}")
self.peer_unit_databag.certificate = event.certificate
self.peer_unit_databag.ca = event.ca
self.peer_unit_databag.chain = json.dumps(event.chain)
self.peer_unit_databag.active_csr = self.peer_unit_databag.requested_csr
self._peer_unit_databag.certificate = event.certificate
self._peer_unit_databag.ca = event.ca
self._peer_unit_databag.chain = json.dumps(event.chain)
self._peer_unit_databag.active_csr = self._peer_unit_databag.requested_csr
logger.debug(f"Saved TLS certificate {event=}")
self._charm.workload.enable_tls(
key=self.peer_unit_databag.key, certificate=self.peer_unit_databag.certificate
key=self._peer_unit_databag.private_key,
certificate=self._peer_unit_databag.certificate,
)

@staticmethod
def _parse_tls_key(raw_content: str) -> bytes:
"""Parse TLS key from plain text or base64 format."""
if re.match(r"(-+(BEGIN|END) [A-Z ]+-+)", raw_content):
return re.sub(
r"(-+(BEGIN|END) [A-Z ]+-+)",
"\n\\1\n",
raw_content,
).encode("utf-8")
return base64.b64decode(raw_content)

def _generate_csr(self, key: bytes) -> bytes:
"""Generate certificate signing request (CSR)."""
unit_name = self._charm.unit.name.replace("/", "-")
Expand All @@ -146,34 +130,36 @@ def _generate_csr(self, key: bytes) -> bytes:
f"{self._charm.app.name}.{self._charm.model_service_domain}",
],
sans_ip=[
str(self._charm.model.get_binding(self._peer_relation).network.bind_address),
str(self._charm.model.get_binding("juju-info").network.bind_address),
],
)

def request_certificate_creation(self, internal_key: str = None):
def request_certificate_creation(self):
"""Request new TLS certificate from related provider charm."""
logger.debug("Requesting TLS certificate creation")
if internal_key:
key = self._parse_tls_key(internal_key)
if key := self._peer_unit_databag.private_key:
key = key.encode("utf-8")
else:
key = tls_certificates.generate_private_key()
self._peer_unit_databag.private_key = key.decode("utf-8")
csr = self._generate_csr(key)
self._interface.request_certificate_creation(certificate_signing_request=csr)
self.peer_unit_databag.key = key.decode("utf-8")
self.peer_unit_databag.requested_csr = csr.decode("utf-8")
logger.debug(f"Requested TLS certificate creation {self.peer_unit_databag.requested_csr=}")
self._peer_unit_databag.requested_csr = csr.decode("utf-8")
logger.debug(
f"Requested TLS certificate creation {self._peer_unit_databag.requested_csr=}"
)

def request_certificate_renewal(self):
"""Request TLS certificate renewal from related provider charm."""
logger.debug(f"Requesting TLS certificate renewal {self.peer_unit_databag.active_csr=}")
old_csr = self.peer_unit_databag.active_csr.encode("utf-8")
key = self.peer_unit_databag.key.encode("utf-8")
logger.debug(f"Requesting TLS certificate renewal {self._peer_unit_databag.active_csr=}")
old_csr = self._peer_unit_databag.active_csr.encode("utf-8")
key = self._peer_unit_databag.private_key.encode("utf-8")
new_csr = self._generate_csr(key)
self._interface.request_certificate_renewal(
old_certificate_signing_request=old_csr, new_certificate_signing_request=new_csr
)
self.peer_unit_databag.requested_csr = new_csr.decode("utf-8")
logger.debug(f"Requested TLS certificate renewal {self.peer_unit_databag.requested_csr=}")
self._peer_unit_databag.requested_csr = new_csr.decode("utf-8")
logger.debug(f"Requested TLS certificate renewal {self._peer_unit_databag.requested_csr=}")


class RelationEndpoint(ops.Object):
Expand Down Expand Up @@ -204,11 +190,21 @@ def __init__(self, charm_: "charm.MySQLRouterOperatorCharm"):
self._interface.on.certificate_expiring, self._on_certificate_expiring
)

@property
def peer_unit_databag(self) -> _PeerUnitDatabag:
"""MySQL Router charm peer relation unit databag"""
peer_relation = self._charm.model.get_relation(_PEER_RELATION_ENDPOINT_NAME)
return _PeerUnitDatabag(peer_relation.data[self._charm.unit])

@property
def _relation(self) -> typing.Optional[_Relation]:
if not self._charm.model.get_relation(self.NAME):
return
return _Relation(_charm=self._charm, _interface=self._interface)
return _Relation(
_charm=self._charm,
_interface=self._interface,
_peer_unit_databag=self.peer_unit_databag,
)

@property
def certificate_saved(self) -> bool:
Expand All @@ -217,21 +213,39 @@ def certificate_saved(self) -> bool:
return False
return self._relation.certificate_saved

@staticmethod
def _parse_tls_key(raw_content: str) -> str:
"""Parse TLS key from plain text or base64 format."""
if re.match(r"(-+(BEGIN|END) [A-Z ]+-+)", raw_content):
return re.sub(
r"(-+(BEGIN|END) [A-Z ]+-+)",
"\n\\1\n",
raw_content,
)
return base64.b64decode(raw_content).decode("utf-8")

def _on_set_tls_private_key(self, event: ops.ActionEvent) -> None:
"""Handle action to set unit TLS private key."""
logger.debug("Handling set TLS private key action")
if self.peer_unit_databag.private_key:
event.log("Warning: Deleted existing TLS private key")
logger.warning("Deleted existing TLS private key")
self.peer_unit_databag.private_key = self._parse_tls_key(event.params.get("internal-key"))
event.log("Saved TLS private key")
logger.debug("Saved TLS private key")
if self._relation is None:
event.fail("No TLS relation available.")
logger.debug("Unable to set TLS private key: no TLS relation available")
return
try:
self._relation.request_certificate_creation(event.params.get("internal-key"))
except Exception as e:
event.fail(f"Failed to request certificate: {e}")
logger.exception("Failed to set TLS private key via action")
raise
event.log("No TLS relation active. Relate TLS provider to create certificate.")
logger.debug("No TLS relation active. Skipped certificate request")
else:
logger.debug("Handled set TLS private key action")
try:
self._relation.request_certificate_creation()
except Exception as e:
event.fail(f"Failed to request certificate: {e}")
logger.exception(
"Failed to request certificate after TLS private key set via action"
)
raise
logger.debug("Handled set TLS private key action")

def _on_tls_relation_joined(self, _) -> None:
"""Request certificate when TLS relation joined."""
Expand All @@ -240,7 +254,7 @@ def _on_tls_relation_joined(self, _) -> None:
def _on_tls_relation_broken(self, _) -> None:
"""Delete TLS certificate."""
logger.debug("Deleting TLS certificate")
self._relation.peer_unit_databag.clear()
self.peer_unit_databag.clear()
self._charm.workload.disable_tls()
logger.debug("Deleted TLS certificate")

Expand All @@ -250,7 +264,7 @@ def _on_certificate_available(self, event: tls_certificates.CertificateAvailable

def _on_certificate_expiring(self, event: tls_certificates.CertificateExpiringEvent) -> None:
"""Request the new certificate when old certificate is expiring."""
if event.certificate != self._relation.peer_unit_databag.certificate:
if event.certificate != self.peer_unit_databag.certificate:
logger.warning("Unknown certificate expiring")
return

Expand Down

0 comments on commit 010c57d

Please sign in to comment.