diff --git a/src/relations/tls.py b/src/relations/tls.py index 99f4a42cd..a869929c9 100644 --- a/src/relations/tls.py +++ b/src/relations/tls.py @@ -25,7 +25,7 @@ class _PeerUnitDatabag: """Peer relation unit databag""" - key: str + private_key: str # CSR stands for certificate signing request requested_csr: str active_csr: str @@ -60,9 +60,12 @@ def __delattr__(self, name: str) -> None: self._databag.pop(self._get_key(name), None) def clear(self) -> None: - """Delete all items in databag.""" - for name in self._attribute_names: - delattr(self, name) + """Delete all items in databag except for private key.""" + del self.requested_csr + del self.active_csr + del self.certificate + del self.ca + del self.chain @dataclasses.dataclass(kw_only=True) @@ -71,21 +74,12 @@ class _Relation: _charm: "charm.MySQLRouterOperatorCharm" _interface: tls_certificates.TLSCertificatesRequiresV1 - - @property - def _peer_relation(self) -> ops.Relation: - """MySQL Router charm peer relation""" - return self._charm.model.get_relation(_PEER_RELATION_ENDPOINT_NAME) - - @property - def peer_unit_databag(self) -> _PeerUnitDatabag: - """MySQL Router charm peer relation unit databag""" - return _PeerUnitDatabag(self._peer_relation.data[self._charm.unit]) + _peer_unit_databag: _PeerUnitDatabag @property def certificate_saved(self) -> bool: """Whether a TLS certificate is available to use""" - for value in [self.peer_unit_databag.certificate, self.peer_unit_databag.ca]: + for value in [self._peer_unit_databag.certificate, self._peer_unit_databag.ca]: if not value: return False return True @@ -94,39 +88,29 @@ def save_certificate(self, event: tls_certificates.CertificateAvailableEvent) -> """Save TLS certificate in peer relation unit databag.""" if ( event.certificate_signing_request.strip() - != self.peer_unit_databag.requested_csr.strip() + != self._peer_unit_databag.requested_csr.strip() ): logger.warning("Unknown certificate received. Ignoring.") return if ( self.certificate_saved and event.certificate_signing_request.strip() - == self.peer_unit_databag.active_csr.strip() + == self._peer_unit_databag.active_csr.strip() ): # Workaround for https://github.com/canonical/tls-certificates-operator/issues/34 logger.debug("TLS certificate already saved.") return logger.debug(f"Saving TLS certificate {event=}") - self.peer_unit_databag.certificate = event.certificate - self.peer_unit_databag.ca = event.ca - self.peer_unit_databag.chain = json.dumps(event.chain) - self.peer_unit_databag.active_csr = self.peer_unit_databag.requested_csr + self._peer_unit_databag.certificate = event.certificate + self._peer_unit_databag.ca = event.ca + self._peer_unit_databag.chain = json.dumps(event.chain) + self._peer_unit_databag.active_csr = self._peer_unit_databag.requested_csr logger.debug(f"Saved TLS certificate {event=}") self._charm.workload.enable_tls( - key=self.peer_unit_databag.key, certificate=self.peer_unit_databag.certificate + key=self._peer_unit_databag.private_key, + certificate=self._peer_unit_databag.certificate, ) - @staticmethod - def _parse_tls_key(raw_content: str) -> bytes: - """Parse TLS key from plain text or base64 format.""" - if re.match(r"(-+(BEGIN|END) [A-Z ]+-+)", raw_content): - return re.sub( - r"(-+(BEGIN|END) [A-Z ]+-+)", - "\n\\1\n", - raw_content, - ).encode("utf-8") - return base64.b64decode(raw_content) - def _generate_csr(self, key: bytes) -> bytes: """Generate certificate signing request (CSR).""" unit_name = self._charm.unit.name.replace("/", "-") @@ -146,34 +130,36 @@ def _generate_csr(self, key: bytes) -> bytes: f"{self._charm.app.name}.{self._charm.model_service_domain}", ], sans_ip=[ - str(self._charm.model.get_binding(self._peer_relation).network.bind_address), + str(self._charm.model.get_binding("juju-info").network.bind_address), ], ) - def request_certificate_creation(self, internal_key: str = None): + def request_certificate_creation(self): """Request new TLS certificate from related provider charm.""" logger.debug("Requesting TLS certificate creation") - if internal_key: - key = self._parse_tls_key(internal_key) + if key := self._peer_unit_databag.private_key: + key = key.encode("utf-8") else: key = tls_certificates.generate_private_key() + self._peer_unit_databag.private_key = key.decode("utf-8") csr = self._generate_csr(key) self._interface.request_certificate_creation(certificate_signing_request=csr) - self.peer_unit_databag.key = key.decode("utf-8") - self.peer_unit_databag.requested_csr = csr.decode("utf-8") - logger.debug(f"Requested TLS certificate creation {self.peer_unit_databag.requested_csr=}") + self._peer_unit_databag.requested_csr = csr.decode("utf-8") + logger.debug( + f"Requested TLS certificate creation {self._peer_unit_databag.requested_csr=}" + ) def request_certificate_renewal(self): """Request TLS certificate renewal from related provider charm.""" - logger.debug(f"Requesting TLS certificate renewal {self.peer_unit_databag.active_csr=}") - old_csr = self.peer_unit_databag.active_csr.encode("utf-8") - key = self.peer_unit_databag.key.encode("utf-8") + logger.debug(f"Requesting TLS certificate renewal {self._peer_unit_databag.active_csr=}") + old_csr = self._peer_unit_databag.active_csr.encode("utf-8") + key = self._peer_unit_databag.private_key.encode("utf-8") new_csr = self._generate_csr(key) self._interface.request_certificate_renewal( old_certificate_signing_request=old_csr, new_certificate_signing_request=new_csr ) - self.peer_unit_databag.requested_csr = new_csr.decode("utf-8") - logger.debug(f"Requested TLS certificate renewal {self.peer_unit_databag.requested_csr=}") + self._peer_unit_databag.requested_csr = new_csr.decode("utf-8") + logger.debug(f"Requested TLS certificate renewal {self._peer_unit_databag.requested_csr=}") class RelationEndpoint(ops.Object): @@ -204,11 +190,21 @@ def __init__(self, charm_: "charm.MySQLRouterOperatorCharm"): self._interface.on.certificate_expiring, self._on_certificate_expiring ) + @property + def peer_unit_databag(self) -> _PeerUnitDatabag: + """MySQL Router charm peer relation unit databag""" + peer_relation = self._charm.model.get_relation(_PEER_RELATION_ENDPOINT_NAME) + return _PeerUnitDatabag(peer_relation.data[self._charm.unit]) + @property def _relation(self) -> typing.Optional[_Relation]: if not self._charm.model.get_relation(self.NAME): return - return _Relation(_charm=self._charm, _interface=self._interface) + return _Relation( + _charm=self._charm, + _interface=self._interface, + _peer_unit_databag=self.peer_unit_databag, + ) @property def certificate_saved(self) -> bool: @@ -217,21 +213,39 @@ def certificate_saved(self) -> bool: return False return self._relation.certificate_saved + @staticmethod + def _parse_tls_key(raw_content: str) -> str: + """Parse TLS key from plain text or base64 format.""" + if re.match(r"(-+(BEGIN|END) [A-Z ]+-+)", raw_content): + return re.sub( + r"(-+(BEGIN|END) [A-Z ]+-+)", + "\n\\1\n", + raw_content, + ) + return base64.b64decode(raw_content).decode("utf-8") + def _on_set_tls_private_key(self, event: ops.ActionEvent) -> None: """Handle action to set unit TLS private key.""" logger.debug("Handling set TLS private key action") + if self.peer_unit_databag.private_key: + event.log("Warning: Deleted existing TLS private key") + logger.warning("Deleted existing TLS private key") + self.peer_unit_databag.private_key = self._parse_tls_key(event.params.get("internal-key")) + event.log("Saved TLS private key") + logger.debug("Saved TLS private key") if self._relation is None: - event.fail("No TLS relation available.") - logger.debug("Unable to set TLS private key: no TLS relation available") - return - try: - self._relation.request_certificate_creation(event.params.get("internal-key")) - except Exception as e: - event.fail(f"Failed to request certificate: {e}") - logger.exception("Failed to set TLS private key via action") - raise + event.log("No TLS relation active. Relate TLS provider to create certificate.") + logger.debug("No TLS relation active. Skipped certificate request") else: - logger.debug("Handled set TLS private key action") + try: + self._relation.request_certificate_creation() + except Exception as e: + event.fail(f"Failed to request certificate: {e}") + logger.exception( + "Failed to request certificate after TLS private key set via action" + ) + raise + logger.debug("Handled set TLS private key action") def _on_tls_relation_joined(self, _) -> None: """Request certificate when TLS relation joined.""" @@ -240,7 +254,7 @@ def _on_tls_relation_joined(self, _) -> None: def _on_tls_relation_broken(self, _) -> None: """Delete TLS certificate.""" logger.debug("Deleting TLS certificate") - self._relation.peer_unit_databag.clear() + self.peer_unit_databag.clear() self._charm.workload.disable_tls() logger.debug("Deleted TLS certificate") @@ -250,7 +264,7 @@ def _on_certificate_available(self, event: tls_certificates.CertificateAvailable def _on_certificate_expiring(self, event: tls_certificates.CertificateExpiringEvent) -> None: """Request the new certificate when old certificate is expiring.""" - if event.certificate != self._relation.peer_unit_databag.certificate: + if event.certificate != self.peer_unit_databag.certificate: logger.warning("Unknown certificate expiring") return