diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c822150c3..5d2013233 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -88,9 +88,7 @@ jobs: - name: Run integration tests run: | # set sysctl values in case the cloudinit-userdata not applied - sudo sysctl -w vm.max_map_count=262144 - sudo sysctl -w vm.swappiness=0 - sudo sysctl -w net.ipv4.tcp_retries2=5 + sudo sysctl -w vm.max_map_count=262144 vm.swappiness=0 net.ipv4.tcp_retries2=5 tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}' env: diff --git a/.gitignore b/.gitignore index d3561e623..f1b819d2b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ __pycache__/ .vscode *.tar.gz +*.tar.xz cloudinit-userdata.yaml /.pytest_cache/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c1cfa3ab5..92d5b865d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -63,14 +63,12 @@ cloudinit-userdata: | - [ 'sysctl', '-w', 'fs.file-max=1048576' ] EOF ``` + or in a single machine: ``` -sudo sysctl -w vm.max_map_count=262144 -sudo sysctl -w vm.swappiness=0 -sudo sysctl -w net.ipv4.tcp_retries2=5 +sudo sysctl -w vm.max_map_count=262144 vm.swappiness=0 net.ipv4.tcp_retries2=5 ``` - Then create a new model and set the previously generated file in it. ```bash # Create a model @@ -101,7 +99,7 @@ juju deploy -n 1 ./opensearch_ubuntu-22.04-amd64.charm --series jammy --show-log juju relate tls-certificates-operator opensearch ``` -**Note:** The TLS settings shown here are for self-signed-certificates, which are not recommended for production clusters. The TLS Certificates Operator offers a variety of configurations. Read more on the TLS Certificates Operator [here](https://charmhub.io/tls-certificates-operator). +**Note:** The TLS settings shown here are for self-signed-certificates, which are not recommended for production clusters. The TLS Certificates Operator offers a variety of configurations. Read more on the TLS Certificates Operator [here](https://charmhub.io/tls-certificates-operator). ## Canonical Contributor Agreement diff --git a/README.md b/README.md index 0c7fa3b26..cdbb4ee52 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Bootstrap a [lxd controller](https://juju.is/docs/olm/lxd#heading--create-a-cont juju add-model opensearch ``` -Configure the system settings required by [OpenSearch](https://opensearch.org/docs/2.3/opensearch/install/important-settings/), +Configure the system settings required by [OpenSearch](https://opensearch.org/docs/2.3/opensearch/install/important-settings/), we'll do that by creating and setting a [`cloudinit-userdata.yaml` file](https://juju.is/docs/olm/juju-model-config) on the model. ``` cat < cloudinit-userdata.yaml @@ -34,9 +34,7 @@ juju model-config ./cloudinit-userdata.yaml ``` or in a single machine: ``` -sudo sysctl -w vm.max_map_count=262144 -sudo sysctl -w vm.swappiness=0 -sudo sysctl -w net.ipv4.tcp_retries2=5 +sudo sysctl -w vm.max_map_count=262144 vm.swappiness=0 net.ipv4.tcp_retries2=5 ``` ### Basic Usage @@ -55,10 +53,10 @@ Supported [relations](https://juju.is/docs/olm/relations): The Charmed OpenSearch Operator also supports TLS encryption on the HTTP and Transport layers. TLS is enabled by default: ```shell -# Deploy the TLS Certificates Operator. +# Deploy the TLS Certificates Operator. juju deploy tls-certificates-operator --channel=edge # Add the necessary configurations for TLS. -juju config tls-certificates-operator generate-self-signed-certificates="true" ca-common-name="Test CA" +juju config tls-certificates-operator generate-self-signed-certificates="true" ca-common-name="Test CA" # Enable TLS via relation. juju relate opensearch tls-certificates-operator # Disable TLS by removing relation. diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index b3da5aa4c..8863f8d80 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -303,7 +303,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 7 +LIBPATCH = 8 PYDEPS = ["ops>=2.0.0"] @@ -682,7 +682,7 @@ def replset(self) -> Optional[str]: def uris(self) -> Optional[str]: """Returns the connection URIs. - MongoDB, Redis, OpenSearch. + MongoDB, Redis. """ return self.relation.data[self.relation.app].get("uris") @@ -992,7 +992,7 @@ class KafkaRequiresEvent(RelationEvent): @property def bootstrap_server(self) -> Optional[str]: - """Returns a a comma-seperated list of broker uris.""" + """Returns a a comma-separated list of broker uris.""" return self.relation.data[self.relation.app].get("endpoints") @property @@ -1072,7 +1072,7 @@ def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None: Args: relation_id: the identifier for a particular relation. - zookeeper_uris: comma-seperated list of ZooKeeper server uris. + zookeeper_uris: comma-separated list of ZooKeeper server uris. """ self._update_relation_data(relation_id, {"zookeeper-uris": zookeeper_uris}) @@ -1119,7 +1119,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: # “endpoints_changed“ event if “topic_created“ is triggered. return - # Emit an endpoints (bootstap-server) changed event if the Kakfa endpoints + # Emit an endpoints (bootstrap-server) changed event if the Kakfa endpoints # added or changed this info in the relation databag. if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). @@ -1128,3 +1128,164 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: event.relation, app=event.app, unit=event.unit ) # here check if this is the right design return + + +# Opensearch related events + + +class OpenSearchProvidesEvent(RelationEvent): + """Base class for OpenSearch events.""" + + @property + def index(self) -> Optional[str]: + """Returns the index that was requested.""" + return self.relation.data[self.relation.app].get("index") + + +class IndexRequestedEvent(OpenSearchProvidesEvent, ExtraRoleEvent): + """Event emitted when a new index is requested for use on this relation.""" + + +class OpenSearchProvidesEvents(CharmEvents): + """OpenSearch events. + + This class defines the events that OpenSearch can emit. + """ + + index_requested = EventSource(IndexRequestedEvent) + + +class OpenSearchRequiresEvent(DatabaseRequiresEvent): + """Base class for OpenSearch requirer events.""" + + +class IndexCreatedEvent(AuthenticationEvent, OpenSearchRequiresEvent): + """Event emitted when a new index is created for use on this relation.""" + + +class OpenSearchRequiresEvents(CharmEvents): + """OpenSearch events. + + This class defines the events that the opensearch requirer can emit. + """ + + index_created = EventSource(IndexCreatedEvent) + endpoints_changed = EventSource(DatabaseEndpointsChangedEvent) + authentication_updated = EventSource(AuthenticationEvent) + + +# OpenSearch Provides and Requires Objects + + +class OpenSearchProvides(DataProvides): + """Provider-side of the OpenSearch relation.""" + + on = OpenSearchProvidesEvents() + + def __init__(self, charm: CharmBase, relation_name: str) -> None: + super().__init__(charm, relation_name) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """Event emitted when the relation has changed.""" + # Only the leader should handle this event. + if not self.local_unit.is_leader(): + return + + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Emit an index requested event if the setup key (index name and optional extra user roles) + # have been added to the relation databag by the application. + if "index" in diff.added: + self.on.index_requested.emit(event.relation, app=event.app, unit=event.unit) + + def set_index(self, relation_id: int, index: str) -> None: + """Set the index in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + index: the index as it is _created_ on the provider charm. This needn't match the + requested index, and can be used to present a different index name if, for example, + the requested index is invalid. + """ + self._update_relation_data(relation_id, {"index": index}) + + def set_endpoints(self, relation_id: int, endpoints: str) -> None: + """Set the endpoints in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + endpoints: the endpoint addresses for opensearch nodes. + """ + self._update_relation_data(relation_id, {"endpoints": endpoints}) + + def set_version(self, relation_id: int, version: str) -> None: + """Set the database version in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + version: database version. + """ + self._update_relation_data(relation_id, {"version": version}) + + +class OpenSearchRequires(DataRequires): + """Requires-side of the OpenSearch relation.""" + + on = OpenSearchRequiresEvents() + + def __init__( + self, charm, relation_name: str, index: str, extra_user_roles: Optional[str] = None + ): + """Manager of OpenSearch client relations.""" + super().__init__(charm, relation_name, extra_user_roles) + self.charm = charm + self.index = index + + def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: + """Event emitted when the application joins the OpenSearch relation.""" + # Sets both index and extra user roles in the relation if the roles are provided. + # Otherwise, sets only the index. + data = {"index": self.index} + if self.extra_user_roles: + data["extra-user-roles"] = self.extra_user_roles + + self._update_relation_data(event.relation.id, data) + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the OpenSearch relation has changed. + + This event triggers individual custom events depending on the changing relation. + """ + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Check if the index is created + # (the OpenSearch charm shares the credentials). + if "username" in diff.added and "password" in diff.added: + # Emit the default event (the one without an alias). + logger.info("index created at: %s", datetime.now()) + self.on.index_created.emit(event.relation, app=event.app, unit=event.unit) + + # To avoid unnecessary application restarts do not trigger + # “endpoints_changed“ or "authentication_updated" event if “index_created“ is + # triggered. + return + + # Check if authentication has updated, emit event if so + updates = {"password", "tls", "tls-ca"} + if len(set(diff._asdict().keys()) - updates) < len(diff): + logger.info("authentication updated at: %s", datetime.now()) + self.on.authentication_updated.emit(event.relation, app=event.app, unit=event.unit) + + return + + # Emit a endpoints changed event if the OpenSearch application added or changed this info + # in the relation databag. + if "endpoints" in diff.added or "endpoints" in diff.changed: + # Emit the default event (the one without an alias). + logger.info("endpoints changed on %s", datetime.now()) + self.on.endpoints_changed.emit( + event.relation, app=event.app, unit=event.unit + ) # here check if this is the right design + return diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 9510eac2b..5e426f372 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -332,7 +332,7 @@ def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): def _on_update_status(self, event: UpdateStatusEvent): """On update status event. - We want to periodically check for 3 things: + We want to periodically check for the following: 1- Do we have users that need to be deleted, and if so we need to delete them. 2- The system requirements are still met 3- every 6 hours check if certs are expiring soon (in 7 days), @@ -355,6 +355,9 @@ def _on_update_status(self, event: UpdateStatusEvent): self.opensearch_exclusions.cleanup() self._apply_cluster_health() + for relation in self.model.relations.get(ClientRelationName, []): + self.opensearch_provider.update_endpoints(relation) + self.user_manager.remove_users_and_roles() # If relation broken - leave @@ -472,8 +475,11 @@ def _is_tls_fully_configured(self) -> bool: def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 """Start OpenSearch, with a generated or passed conf, if all resources configured.""" if self.opensearch.is_started(): - self._post_start_init() - self.status.clear(WaitingToStart) + try: + self._post_start_init() + self.status.clear(WaitingToStart) + except OpenSearchHttpError: + event.defer() return if not self._can_service_start(): @@ -842,8 +848,8 @@ def _check_certs_expiration(self, event: UpdateStatusEvent) -> None: certs = self.secrets.get_unit_certificates() # keep certificates that are expiring in less than 24h - for cert_type, cert in certs.items(): - hours = cert_expiration_remaining_hours(cert) + for cert_type in list(certs.keys()): + hours = cert_expiration_remaining_hours(certs[cert_type]) if hours > 24 * 7: del certs[cert_type] diff --git a/lib/charms/opensearch/v0/opensearch_relation_provider.py b/lib/charms/opensearch/v0/opensearch_relation_provider.py index 5c852289c..d61ccdb7d 100644 --- a/lib/charms/opensearch/v0/opensearch_relation_provider.py +++ b/lib/charms/opensearch/v0/opensearch_relation_provider.py @@ -14,30 +14,43 @@ https://opensearch.org/docs/latest/security/access-control/index/. TODO add databag reference information -TODO add tls """ import json import logging -from typing import Dict, List +from typing import Dict, List, Optional, Set from charms.data_platform_libs.v0.data_interfaces import ( - DatabaseProvides, - DatabaseRequestedEvent, + IndexRequestedEvent, + OpenSearchProvides, ) from charms.opensearch.v0.constants_charm import ( ClientRelationBadRoleRequestMessage, ClientRelationName, - PeerRelationName, ) +from charms.opensearch.v0.constants_tls import CertType from charms.opensearch.v0.helper_databag import Scope -from charms.opensearch.v0.helper_networking import units_ips from charms.opensearch.v0.helper_security import generate_hashed_password from charms.opensearch.v0.opensearch_users import OpenSearchUserMgmtError -from ops.charm import CharmBase, RelationBrokenEvent, RelationDepartedEvent +from ops.charm import ( + CharmBase, + RelationBrokenEvent, + RelationChangedEvent, + RelationDepartedEvent, +) from ops.framework import Object from ops.model import BlockedStatus, Relation +# The unique Charmhub library identifier, never change it +LIBID = "c0f1d8f94bdd41a781fe2871e1922480" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + logger = logging.getLogger(__name__) # The unique Charmhub library identifier, never change it @@ -75,10 +88,11 @@ def __init__(self, charm: CharmBase) -> None: self.user_manager = self.charm.user_manager self.relation_name = ClientRelationName - self.database_provides = DatabaseProvides(self.charm, relation_name=self.relation_name) + self.opensearch_provides = OpenSearchProvides(self.charm, relation_name=self.relation_name) + self.relations = self.opensearch_provides.relations self.framework.observe( - self.database_provides.on.database_requested, self._on_database_requested + self.opensearch_provides.on.index_requested, self._on_index_requested ) self.framework.observe( charm.on[self.relation_name].relation_departed, self._on_relation_departed @@ -96,8 +110,8 @@ def _depart_flag(self, relation: Relation): def _unit_departing(self, relation): return self.charm.peers_data.get(Scope.UNIT, self._depart_flag(relation)) - def _on_database_requested(self, event: DatabaseRequestedEvent) -> None: - """Handle client database-requested event. + def _on_index_requested(self, event: IndexRequestedEvent) -> None: + """Handle client index-requested event. The read-only-endpoints field of DatabaseProvides is unused in this relation because this concept is irrelevant to OpenSearch. In this relation, the application charm should have @@ -128,10 +142,12 @@ def _on_database_requested(self, event: DatabaseRequestedEvent) -> None: return rel_id = event.relation.id + # Share the credentials and updated connection info with the client application. - self.database_provides.set_credentials(rel_id, username, pwd) + self.opensearch_provides.set_credentials(rel_id, username, pwd) self.update_endpoints(event.relation) - self.database_provides.set_version(rel_id, self.opensearch.version) + self.opensearch_provides.set_version(rel_id, self.opensearch.version) + self.update_certs(rel_id) def create_opensearch_users( self, username: str, hashed_pwd: str, access_control: Dict[str, List[str]] @@ -176,9 +192,33 @@ def create_opensearch_users( logger.error(err) raise + def update_certs(self, relation_id, ca_chain=None): + """Update TLS certs passed into this relation. + + If ca_chain is not provided, it'll get the app-admin CA generated by the TLS charm. + """ + if not self.unit.is_leader(): + # We're updating app databags in this function, so it can't be called on follower + # units. + return + if not ca_chain: + try: + ca_chain = self.charm.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val).get( + "chain" + ) + except AttributeError: + # cert doesn't exist - presumably we don't yet have a TLS relation. + return + self.opensearch_provides.set_tls_ca(relation_id, "\n".join(ca_chain[::-1])) + self.opensearch_provides.set_tls(relation_id, "True") + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + self.update_endpoints(event.relation) + def _on_relation_departed(self, event: RelationDepartedEvent) -> None: """Check if this relation is being removed, and update the peer databag accordingly.""" - if event.departing_unit == self.charm.unit: + departing = event.departing_unit == self.charm.unit + if departing: self.charm.peers_data.put(Scope.UNIT, self._depart_flag(event.relation), True) def _on_relation_broken(self, event: RelationBrokenEvent) -> None: @@ -192,8 +232,19 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None: self.user_manager.remove_users_and_roles(event.relation.id) - def update_endpoints(self, relation): + def update_endpoints(self, relation: Relation, omit_endpoints: Optional[Set[str]] = None): """Updates endpoints in the databag for the given relation.""" + # we can only set endpoints if we're the leader + if not self.unit.is_leader(): + return + + if not omit_endpoints: + omit_endpoints = set() + port = self.opensearch.port - endpoints = [f"{ip}:{port}" for ip in units_ips(self.charm, PeerRelationName).values()] - self.database_provides.set_endpoints(relation.id, ",".join(endpoints)) + ips = set([node.ip for node in self.charm._get_nodes(use_localhost=True)]) + endpoints = ",".join([f"{ip}:{port}" for ip in ips - omit_endpoints]) + databag_endpoints = relation.data[relation.app].get("endpoints") + + if endpoints and endpoints != databag_endpoints: + self.opensearch_provides.set_endpoints(relation.id, endpoints) diff --git a/lib/charms/opensearch/v0/opensearch_tls.py b/lib/charms/opensearch/v0/opensearch_tls.py index b543d3201..4df8c7be7 100644 --- a/lib/charms/opensearch/v0/opensearch_tls.py +++ b/lib/charms/opensearch/v0/opensearch_tls.py @@ -99,7 +99,10 @@ def _on_tls_relation_broken(self, event: RelationBrokenEvent) -> None: self.charm.on_tls_relation_broken(event) def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: - """Enable TLS when TLS certificate available.""" + """Enable TLS when TLS certificate available. + + CertificateAvailableEvents fire whenever a new certificate is created by the TLS charm. + """ try: scope, cert_type, secrets = self._find_secret(event.certificate_signing_request, "csr") logger.debug(f"{scope.val}.{cert_type.val} TLS certificate available.") @@ -121,6 +124,9 @@ def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: merge=True, ) + for relation in self.charm.opensearch_provider.relations: + self.charm.opensearch_provider.update_certs(relation.id, event.chain) + try: self.charm.on_tls_conf_set(event, scope, cert_type, renewal) except OpenSearchError as e: diff --git a/lib/charms/opensearch/v0/opensearch_users.py b/lib/charms/opensearch/v0/opensearch_users.py index 93a5deea5..c961212ff 100644 --- a/lib/charms/opensearch/v0/opensearch_users.py +++ b/lib/charms/opensearch/v0/opensearch_users.py @@ -49,7 +49,10 @@ def get_roles(self) -> Dict[str, any]: Raises: OpenSearchUserMgmtError: If the request fails. """ - return self.opensearch.request("GET", f"{ROLE_ENDPOINT}/") + try: + return self.opensearch.request("GET", f"{ROLE_ENDPOINT}/") + except OpenSearchHttpError: + raise OpenSearchUserMgmtError() def create_role( self, @@ -69,6 +72,9 @@ def create_role( Raises: OpenSearchUserMgmtError: If the role creation request fails. + + Returns: + HTTP response to opensearch API request. """ resp = self.opensearch.request( "PUT", @@ -87,13 +93,27 @@ def remove_role(self, role_name: str) -> Dict[str, any]: Raises: OpenSearchUserMgmtError: If the request fails, or if role_name is empty + + Returns: + HTTP response to opensearch API request. """ if not role_name: raise OpenSearchUserMgmtError( "role name empty - sending a DELETE request to endpoint root isn't permitted" ) - resp = self.opensearch.request("DELETE", f"{ROLE_ENDPOINT}/{role_name}") + try: + resp = self.opensearch.request("DELETE", f"{ROLE_ENDPOINT}/{role_name}") + except OpenSearchHttpError as e: + if e.status_code == 404: + return { + "status": "OK", + "response": "role does not exist, and therefore has not been removed", + } + else: + raise + + logger.debug(resp) if resp.get("status") != "OK": raise OpenSearchUserMgmtError(f"removing role {role_name} failed") return resp @@ -104,7 +124,10 @@ def get_users(self) -> Dict[str, any]: Raises: OpenSearchUserMgmtError: If the request fails. """ - return self.opensearch.request("GET", f"{USER_ENDPOINT}/") + try: + return self.opensearch.request("GET", f"{USER_ENDPOINT}/") + except OpenSearchHttpError: + raise OpenSearchUserMgmtError() def create_user( self, user_name: str, roles: Optional[List[str]], hashed_pwd: str @@ -118,6 +141,9 @@ def create_user( Raises: OpenSearchUserMgmtError: If the request fails. + + Returns: + HTTP response to opensearch API request. """ payload = {"hash": hashed_pwd} if roles: @@ -140,14 +166,27 @@ def remove_user(self, user_name: str) -> Dict[str, any]: Raises: OpenSearchUserMgmtError: If the request fails, or if user_name is empty + + Returns: + HTTP response to opensearch API request. """ if not user_name: raise OpenSearchUserMgmtError( "user name empty - sending a DELETE request to endpoint root isn't permitted" ) - resp = self.opensearch.request("DELETE", f"{USER_ENDPOINT}/{user_name}/") - # TODO update to handle if the user doesn't exist + try: + resp = self.opensearch.request("DELETE", f"{USER_ENDPOINT}/{user_name}") + except OpenSearchHttpError as e: + if e.status_code == 404: + return { + "status": "OK", + "response": "user does not exist, and therefore has not been removed", + } + else: + raise + + logger.debug(resp) if resp.get("status") != "OK": raise OpenSearchUserMgmtError(f"removing user {user_name} failed") return resp @@ -161,6 +200,9 @@ def patch_user(self, user_name: str, patches: List[Dict[str, any]]) -> Dict[str, Raises: OpenSearchUserMgmtError: If the request fails. + + Returns: + HTTP response to opensearch API request. """ resp = self.opensearch.request( "PATCH", diff --git a/lib/charms/rolling_ops/v0/rollingops.py b/lib/charms/rolling_ops/v0/rollingops.py index 4dddcd3fe..3e6c2fae6 100644 --- a/lib/charms/rolling_ops/v0/rollingops.py +++ b/lib/charms/rolling_ops/v0/rollingops.py @@ -13,19 +13,24 @@ # limitations under the License. """This library enables "rolling" operations across units of a charmed Application. + For example, a charm author might use this library to implement a "rolling restart", in which all units in an application restart their workload, but no two units execute the restart at the same time. + To implement the rolling restart, a charm author would do the following: + 1. Add a peer relation called 'restart' to a charm's `metadata.yaml`: ```yaml peers: restart: interface: rolling_op ``` + Import this library into src/charm.py, and initialize a RollingOpsManager in the Charm's `__init__`. The Charm should also define a callback routine, which will be executed when a unit holds the distributed lock: + src/charm.py ```python # ... @@ -41,29 +46,35 @@ def __init__(...) def _restart(self, event): systemd.service_restart('foo') ``` + To kick off the rolling restart, emit this library's AcquireLock event. The simplest way to do so would be with an action, though it might make sense to acquire the lock in response to another event. + ```python def _on_trigger_restart(self, event): self.charm.on[self.restart_manager.name].acquire_lock.emit() ``` + In order to trigger the restart, a human operator would execute the following command on the CLI: + ``` juju run-action some-charm/0 some-charm/1 <... some-charm/n> restart ``` + Note that all units that plan to restart must receive the action and emit the aquire event. Any units that do not run their acquire handler will be left out of the rolling restart. (An operator might take advantage of this fact to recover from a failed rolling operation without restarting workloads that were able to successfully restart -- simply omit the successful units from a subsequent run-action call.) + """ import logging from enum import Enum -from typing import AnyStr, Callable, Optional +from typing import AnyStr, Callable, Optional, Union -from ops.charm import ActionEvent, CharmBase, RelationChangedEvent +from ops.charm import ActionEvent, CharmBase, LeaderElectedEvent, RelationChangedEvent from ops.framework import EventBase, Object from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus @@ -77,7 +88,7 @@ def _on_trigger_restart(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 3 +LIBPATCH = 4 class LockNoRelationError(Exception): @@ -88,7 +99,9 @@ class LockNoRelationError(Exception): class LockState(Enum): """Possible states for our Distributed lock. + Note that there are two states set on the unit, and two on the application. + """ ACQUIRE = "acquire" @@ -99,31 +112,40 @@ class LockState(Enum): class Lock: """A class that keeps track of a single asynchronous lock. + Warning: a Lock has permission to update relation data, which means that there are side effects to invoking the .acquire, .release and .grant methods. Running any one of them will trigger a RelationChanged event, once per transition from one internal status to another. + This class tracks state across the cloud by implementing a peer relation interface. There are two parts to the interface: + 1) The data on a unit's peer relation (defined in metadata.yaml.) Each unit can update this data. The only meaningful values are "acquire", and "release", which represent a request to acquire the lock, and a request to release the lock, respectively. + 2) The application data in the relation. This tracks whether the lock has been "granted", Or has been released (and reverted to idle). There are two valid states: "granted" or None. If a lock is in the "granted" state, a unit should emit a RunWithLocks event and then release the lock. + If a lock is in "None", this means that a unit has not yet requested the lock, or that the request has been completed. + In more detail, here is the relation structure: + relation.data: : status: 'acquire|release' : : 'granted|None' + Note that this class makes no attempts to timestamp the locks and thus handle multiple requests in a row. If a unit re-requests a lock before being granted the lock, the lock will simply stay in the "acquire" state. If a unit wishes to clear its lock, it simply needs to call lock.release(). + """ def __init__(self, manager, unit=None): @@ -138,10 +160,13 @@ def __init__(self, manager, unit=None): @property def _state(self) -> LockState: """Return an appropriate state. + Note that the state exists in the unit's relation data, and the application relation data, so we have to be careful about what our states mean. + Unit state can only be in "acquire", "release", "None" (None means unset) Application state can only be in "granted" or "None" (None means unset or released) + """ unit_state = LockState(self.relation.data[self.unit].get("state", LockState.IDLE.value)) app_state = LockState( @@ -161,6 +186,7 @@ def _state(self) -> LockState: @_state.setter def _state(self, state: LockState): """Set the given state. + Since we update the relation data, this may fire off a RelationChanged event. """ if state == LockState.ACQUIRE: @@ -239,9 +265,11 @@ def __init__(self, handle, callback_override: Optional[str] = None): self.callback_override = callback_override or "" def snapshot(self): + """Returns a snapshot of the callback override.""" return {"callback_override": self.callback_override} def restore(self, snapshot): + """Sets the callback override.""" self.callback_override = snapshot["callback_override"] @@ -256,6 +284,7 @@ class RollingOpsManager(Object): def __init__(self, charm: CharmBase, relation: AnyStr, callback: Callable): """Register our custom events. + params: charm: the charm we are attaching this to. relation: an identifier, by convention based on the name of the relation in the @@ -276,23 +305,32 @@ def __init__(self, charm: CharmBase, relation: AnyStr, callback: Callable): charm.on.define_event("{}_acquire_lock".format(self.name), AcquireLock) charm.on.define_event("{}_process_locks".format(self.name), ProcessLocks) - # Watch those events (plus the built in relation event). - self.framework.observe(charm.on[self.name].relation_changed, self._on_relation_changed) + # Watch those events self.framework.observe(charm.on[self.name].acquire_lock, self._on_acquire_lock) self.framework.observe(charm.on[self.name].run_with_lock, self._on_run_with_lock) self.framework.observe(charm.on[self.name].process_locks, self._on_process_locks) + # Observe events where we need to update locks + self.framework.observe(charm.on[self.name].relation_changed, self._update_locks) + self.framework.observe(charm.on.leader_elected, self._update_locks) + def _callback(self: CharmBase, event: EventBase) -> None: """Placeholder for the function that actually runs our event. + Usually overridden in the init. """ raise NotImplementedError - def _on_relation_changed(self: CharmBase, event: RelationChangedEvent): - """Process relation changed. + def _update_locks(self: CharmBase, _): + """Update Locks. + First, determine whether this unit has been granted a lock. If so, emit a RunWithLock event. + Then, if we are the leader, fire off a process locks event. + + If a leader is removed before the chain of events finishes, the new leader never receives + a ProcessLocks event. To get around this, call this function on LeaderElected. """ lock = Lock(self) @@ -307,7 +345,9 @@ def _on_relation_changed(self: CharmBase, event: RelationChangedEvent): def _on_process_locks(self: CharmBase, event: ProcessLocks): """Process locks. + Runs only on the leader. Updates the status of all locks. + """ if not self.model.unit.is_leader(): return diff --git a/src/opensearch.py b/src/opensearch.py index 66dfb2d36..c5b1dedab 100644 --- a/src/opensearch.py +++ b/src/opensearch.py @@ -159,7 +159,7 @@ def _start_service(self): "--clear-groups --reuid ubuntu --regid ubuntu -- sudo systemctl start opensearch.service", ) except OpenSearchCmdError: - raise OpenSearchStartError() + raise OpenSearchStartError @override def _stop_service(self): diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 7d383d124..1d442fea0 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -183,8 +183,6 @@ async def http_request( chain.write(admin_secrets["ca-chain"]) chain.seek(0) - session.auth = ("admin", user_password or admin_secrets["password"]) - request_kwargs = { "method": method, "url": endpoint, @@ -195,6 +193,8 @@ async def http_request( elif isinstance(payload, dict): request_kwargs["data"] = json.dumps(payload) + session.auth = ("admin", user_password or admin_secrets["password"]) + request_kwargs["verify"] = chain.name if verify else False resp = session.request(**request_kwargs) @@ -244,3 +244,29 @@ async def check_cluster_formation_successful( registered_nodes = [node_desc["name"] for node_desc in response["nodes"].values()] return set(unit_names) == set(registered_nodes) + + +async def scale_application( + ops_test: OpsTest, application_name: str, count: int, timeout=1000 +) -> None: + """Scale a given application to a specific unit count. + + Args: + ops_test: The ops test framework instance + application_name: The name of the application + count: The desired number of units to scale to + timeout: Time to wait for application to become stable + """ + application = ops_test.model.applications[application_name] + change = count - len(application.units) + if change > 0: + await application.add_units(change) + elif change < 0: + units = [unit.name for unit in application.units[0:-change]] + await application.destroy_units(*units) + else: + return + + await ops_test.model.wait_for_idle( + apps=[application_name], status="active", timeout=timeout, wait_for_exact_units=count + ) diff --git a/tests/integration/relations/opensearch_provider/application-charm/actions.yaml b/tests/integration/relations/opensearch_provider/application-charm/actions.yaml index 95562f3e2..5b1572f9c 100644 --- a/tests/integration/relations/opensearch_provider/application-charm/actions.yaml +++ b/tests/integration/relations/opensearch_provider/application-charm/actions.yaml @@ -12,7 +12,6 @@ run-request: type: string payload: description: fully escaped payload to be sent in bulk - type: string required: - relation-id diff --git a/tests/integration/relations/opensearch_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py b/tests/integration/relations/opensearch_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py index d375f8e01..b592cd70b 100644 --- a/tests/integration/relations/opensearch_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/tests/integration/relations/opensearch_provider/application-charm/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -280,7 +280,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): import json import logging -from abc import ABC, ABCMeta, abstractmethod +from abc import ABC, abstractmethod from collections import namedtuple from datetime import datetime from typing import List, Optional @@ -292,7 +292,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): RelationEvent, RelationJoinedEvent, ) -from ops.framework import EventSource, Object, _Metaclass +from ops.framework import EventSource, Object from ops.model import Relation # The unique Charmhub library identifier, never change it @@ -303,10 +303,11 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 8 -logger = logging.getLogger(__name__) +PYDEPS = ["ops>=2.0.0"] +logger = logging.getLogger(__name__) Diff = namedtuple("Diff", "added changed deleted") Diff.__doc__ = """ @@ -349,16 +350,10 @@ def diff(event: RelationChangedEvent, bucket: str) -> Diff: return Diff(added, changed, deleted) -class _AbstractMetaclass(ABCMeta, _Metaclass): - """Meta class.""" - - pass - - # Base DataProvides and DataRequires -class DataProvides(Object, ABC, metaclass=_AbstractMetaclass): +class DataProvides(Object, ABC): """Base provides-side of the data products relation.""" def __init__(self, charm: CharmBase, relation_name: str) -> None: @@ -464,7 +459,7 @@ def set_tls_ca(self, relation_id: int, tls_ca: str) -> None: self._update_relation_data(relation_id, {"tls_ca": tls_ca}) -class DataRequires(Object, ABC, metaclass=_AbstractMetaclass): +class DataRequires(Object, ABC): """Requires-side of the relation.""" def __init__( @@ -501,6 +496,7 @@ def fetch_relation_data(self) -> dict: This function can be used to retrieve data from a relation in the charm code when outside an event callback. + Function cannot be used in `*-relation-broken` events and will raise an exception. Returns: a dict of the values stored in the relation data bag @@ -543,7 +539,61 @@ def _diff(self, event: RelationChangedEvent) -> Diff: @property def relations(self) -> List[Relation]: """The list of Relation instances associated with this relation_name.""" - return list(self.charm.model.relations[self.relation_name]) + return [ + relation + for relation in self.charm.model.relations[self.relation_name] + if self._is_relation_active(relation) + ] + + @staticmethod + def _is_relation_active(relation: Relation): + try: + _ = repr(relation.data) + return True + except RuntimeError: + return False + + @staticmethod + def _is_resource_created_for_relation(relation: Relation): + return ( + "username" in relation.data[relation.app] and "password" in relation.data[relation.app] + ) + + def is_resource_created(self, relation_id: Optional[int] = None) -> bool: + """Check if the resource has been created. + + This function can be used to check if the Provider answered with data in the charm code + when outside an event callback. + + Args: + relation_id (int, optional): When provided the check is done only for the relation id + provided, otherwise the check is done for all relations + + Returns: + True or False + + Raises: + IndexError: If relation_id is provided but that relation does not exist + """ + if relation_id is not None: + try: + relation = [relation for relation in self.relations if relation.id == relation_id][ + 0 + ] + return self._is_resource_created_for_relation(relation) + except IndexError: + raise IndexError(f"relation id {relation_id} cannot be accessed") + else: + return ( + all( + [ + self._is_resource_created_for_relation(relation) + for relation in self.relations + ] + ) + if self.relations + else False + ) # General events @@ -632,7 +682,7 @@ def replset(self) -> Optional[str]: def uris(self) -> Optional[str]: """Returns the connection URIs. - MongoDB, Redis, OpenSearch. + MongoDB, Redis. """ return self.relation.data[self.relation.app].get("uris") @@ -942,7 +992,7 @@ class KafkaRequiresEvent(RelationEvent): @property def bootstrap_server(self) -> Optional[str]: - """Returns a a comma-seperated list of broker uris.""" + """Returns a a comma-separated list of broker uris.""" return self.relation.data[self.relation.app].get("endpoints") @property @@ -1022,7 +1072,7 @@ def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None: Args: relation_id: the identifier for a particular relation. - zookeeper_uris: comma-seperated list of ZooKeeper server uris. + zookeeper_uris: comma-separated list of ZooKeeper server uris. """ self._update_relation_data(relation_id, {"zookeeper-uris": zookeeper_uris}) @@ -1069,7 +1119,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: # “endpoints_changed“ event if “topic_created“ is triggered. return - # Emit an endpoints (bootstap-server) changed event if the Kakfa endpoints + # Emit an endpoints (bootstrap-server) changed event if the Kakfa endpoints # added or changed this info in the relation databag. if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). @@ -1078,3 +1128,164 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: event.relation, app=event.app, unit=event.unit ) # here check if this is the right design return + + +# Opensearch related events + + +class OpenSearchProvidesEvent(RelationEvent): + """Base class for OpenSearch events.""" + + @property + def index(self) -> Optional[str]: + """Returns the index that was requested.""" + return self.relation.data[self.relation.app].get("index") + + +class IndexRequestedEvent(OpenSearchProvidesEvent, ExtraRoleEvent): + """Event emitted when a new index is requested for use on this relation.""" + + +class OpenSearchProvidesEvents(CharmEvents): + """OpenSearch events. + + This class defines the events that OpenSearch can emit. + """ + + index_requested = EventSource(IndexRequestedEvent) + + +class OpenSearchRequiresEvent(DatabaseRequiresEvent): + """Base class for OpenSearch requirer events.""" + + +class IndexCreatedEvent(AuthenticationEvent, OpenSearchRequiresEvent): + """Event emitted when a new index is created for use on this relation.""" + + +class OpenSearchRequiresEvents(CharmEvents): + """OpenSearch events. + + This class defines the events that the opensearch requirer can emit. + """ + + index_created = EventSource(IndexCreatedEvent) + endpoints_changed = EventSource(DatabaseEndpointsChangedEvent) + authentication_updated = EventSource(AuthenticationEvent) + + +# OpenSearch Provides and Requires Objects + + +class OpenSearchProvides(DataProvides): + """Provider-side of the OpenSearch relation.""" + + on = OpenSearchProvidesEvents() + + def __init__(self, charm: CharmBase, relation_name: str) -> None: + super().__init__(charm, relation_name) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """Event emitted when the relation has changed.""" + # Only the leader should handle this event. + if not self.local_unit.is_leader(): + return + + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Emit an index requested event if the setup key (index name and optional extra user roles) + # have been added to the relation databag by the application. + if "index" in diff.added: + self.on.index_requested.emit(event.relation, app=event.app, unit=event.unit) + + def set_index(self, relation_id: int, index: str) -> None: + """Set the index in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + index: the index as it is _created_ on the provider charm. This needn't match the + requested index, and can be used to present a different index name if, for example, + the requested index is invalid. + """ + self._update_relation_data(relation_id, {"index": index}) + + def set_endpoints(self, relation_id: int, endpoints: str) -> None: + """Set the endpoints in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + endpoints: the endpoint addresses for opensearch nodes. + """ + self._update_relation_data(relation_id, {"endpoints": endpoints}) + + def set_version(self, relation_id: int, version: str) -> None: + """Set the database version in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + version: database version. + """ + self._update_relation_data(relation_id, {"version": version}) + + +class OpenSearchRequires(DataRequires): + """Requires-side of the OpenSearch relation.""" + + on = OpenSearchRequiresEvents() + + def __init__( + self, charm, relation_name: str, index: str, extra_user_roles: Optional[str] = None + ): + """Manager of OpenSearch client relations.""" + super().__init__(charm, relation_name, extra_user_roles) + self.charm = charm + self.index = index + + def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: + """Event emitted when the application joins the OpenSearch relation.""" + # Sets both index and extra user roles in the relation if the roles are provided. + # Otherwise, sets only the index. + data = {"index": self.index} + if self.extra_user_roles: + data["extra-user-roles"] = self.extra_user_roles + + self._update_relation_data(event.relation.id, data) + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the OpenSearch relation has changed. + + This event triggers individual custom events depending on the changing relation. + """ + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Check if the index is created + # (the OpenSearch charm shares the credentials). + if "username" in diff.added and "password" in diff.added: + # Emit the default event (the one without an alias). + logger.info("index created at: %s", datetime.now()) + self.on.index_created.emit(event.relation, app=event.app, unit=event.unit) + + # To avoid unnecessary application restarts do not trigger + # “endpoints_changed“ or "authentication_updated" event if “index_created“ is + # triggered. + return + + # Check if authentication has updated, emit event if so + updates = {"password", "tls", "tls-ca"} + if len(set(diff._asdict().keys()) - updates) < len(diff): + logger.info("authentication updated at: %s", datetime.now()) + self.on.authentication_updated.emit(event.relation, app=event.app, unit=event.unit) + + return + + # Emit a endpoints changed event if the OpenSearch application added or changed this info + # in the relation databag. + if "endpoints" in diff.added or "endpoints" in diff.changed: + # Emit the default event (the one without an alias). + logger.info("endpoints changed on %s", datetime.now()) + self.on.endpoints_changed.emit( + event.relation, app=event.app, unit=event.unit + ) # here check if this is the right design + return diff --git a/tests/integration/relations/opensearch_provider/application-charm/metadata.yaml b/tests/integration/relations/opensearch_provider/application-charm/metadata.yaml index a4ffb7e3f..b2133b289 100644 --- a/tests/integration/relations/opensearch_provider/application-charm/metadata.yaml +++ b/tests/integration/relations/opensearch_provider/application-charm/metadata.yaml @@ -9,7 +9,7 @@ series: - jammy requires: - first-database: + first-index: interface: opensearch_client - second-database: + second-index: interface: opensearch_client diff --git a/tests/integration/relations/opensearch_provider/application-charm/requirements.txt b/tests/integration/relations/opensearch_provider/application-charm/requirements.txt index b5ff18750..635654aab 100644 --- a/tests/integration/relations/opensearch_provider/application-charm/requirements.txt +++ b/tests/integration/relations/opensearch_provider/application-charm/requirements.txt @@ -1 +1 @@ -ops==1.5.4 \ No newline at end of file +ops==2.1.1 diff --git a/tests/integration/relations/opensearch_provider/application-charm/src/charm.py b/tests/integration/relations/opensearch_provider/application-charm/src/charm.py index 97c1ec473..f48ec5cf7 100755 --- a/tests/integration/relations/opensearch_provider/application-charm/src/charm.py +++ b/tests/integration/relations/opensearch_provider/application-charm/src/charm.py @@ -10,9 +10,8 @@ import requests from charms.data_platform_libs.v0.data_interfaces import ( - DatabaseCreatedEvent, - DatabaseEndpointsChangedEvent, - DatabaseRequires, + AuthenticationEvent, + OpenSearchRequires, ) from ops.charm import ActionEvent, CharmBase from ops.main import main @@ -21,6 +20,9 @@ logger = logging.getLogger(__name__) +CERT_PATH = "/tmp/test_cert.ca" + + class ApplicationCharm(CharmBase): """Application charm that connects to database charms. @@ -35,37 +37,34 @@ def __init__(self, *args): # Events related to the first database that is requested # (these events are defined in the database requires charm library). - database_name = f'{self.app.name.replace("-", "_")}_first_database' + index_name = f'{self.app.name.replace("-", "_")}_first_opensearch' + # TODO change this to "admin" permissive_roles = json.dumps({"roles": ["all_access"]}) - self.first_database = DatabaseRequires( - self, "first-database", database_name, permissive_roles + self.first_opensearch = OpenSearchRequires( + self, "first-index", index_name, permissive_roles ) + self.framework.observe( - self.first_database.on.database_created, self._on_first_database_created + self.first_opensearch.on.index_created, self._on_authentication_updated ) self.framework.observe( - self.first_database.on.endpoints_changed, self._on_first_database_endpoints_changed + self.first_opensearch.on.authentication_updated, self._on_authentication_updated ) # Events related to the second database that is requested # (these events are defined in the database requires charm library). - database_name = f'{self.app.name.replace("-", "_")}_second_database' - # TODO change this to use new permissions - roles = { - "roles": ["readall"], - "permissions": ["TODO find some permissions", ""], - "action_groups": ["TODO find some action groups", ""], - } - complex_roles = json.dumps(roles) - self.second_database = DatabaseRequires( - self, "second-database", database_name, complex_roles - ) - self.framework.observe( - self.second_database.on.database_created, self._on_second_database_created + index_name = f'{self.app.name.replace("-", "_")}_second_opensearch' + # TODO change this to be "default" + complex_roles = json.dumps( + { + "roles": ["readall"], + "permissions": ["TODO find some permissions", ""], + "action_groups": ["TODO find some action groups", ""], + } ) - self.framework.observe( - self.second_database.on.endpoints_changed, self._on_second_database_endpoints_changed + self.second_opensearch = OpenSearchRequires( + self, "second-index", index_name, complex_roles ) self.framework.observe(self.on.run_request_action, self._on_run_request_action) @@ -80,15 +79,17 @@ def _on_update_status(self, _) -> None: def connection_check(self) -> bool: """Simple connection check to see if backend exists and we can connect to it.""" - relations = self.model.relations.get("first-database", []) + self.model.relations.get( - "second-database", [] + relations = self.model.relations.get("first-index", []) + self.model.relations.get( + "second-index", [] ) if not relations: return False + connected = True for relation in relations: if not self.smoke_check(relation.id): - return False - return True + connected = False + logger.error(f"relation {relation} didn't connect") + return connected def smoke_check(self, relation_id) -> bool: try: @@ -98,30 +99,29 @@ def smoke_check(self, relation_id) -> bool: logger.error(e) return False - # First database events observers. - def _on_first_database_created(self, event: DatabaseCreatedEvent) -> None: - """Event triggered when a database was created for this application.""" - logging.info(f"first database credentials: {event.username} {event.password}") - - def _on_first_database_endpoints_changed(self, event: DatabaseEndpointsChangedEvent) -> None: - """Event triggered when the opensearch endpoints change.""" - logger.info(f"first database endpoints have been changed to: {event.endpoints}") - - # Second database events observers. - def _on_second_database_created(self, event: DatabaseCreatedEvent) -> None: - """Event triggered when a database was created for this application.""" - logger.info(f"second database credentials: {event.username} {event.password}") + def _on_authentication_updated(self, event: AuthenticationEvent): + logger.error(event) + if event.tls != "True": + return - def _on_second_database_endpoints_changed(self, event: DatabaseEndpointsChangedEvent) -> None: - """Event triggered when the opensearch endpoints change.""" - logger.info(f"second database endpoints have been changed to: {event.endpoints}") + tls_ca = event.tls_ca + if not tls_ca: + tls_ca = self.first_opensearch.fetch_relation_data()[event.relation.id].get( + "tls_ca", None + ) + if not tls_ca: + event.defer() # We're waiting until we get a CA. + logger.error(f"writing cert to {CERT_PATH}.") + with open(CERT_PATH, "w") as f: + f.write(tls_ca) # ============== # Action hooks # ============== + def _on_run_request_action(self, event: ActionEvent): logger.info(event.params) - relation = self.first_database + relation = self.first_opensearch relation_id = event.params["relation-id"] databag = relation.fetch_relation_data()[relation_id] method = event.params["method"] @@ -158,16 +158,15 @@ def relation_request( payload: Optional[Dict[str, any]] = None, ) -> Union[Dict[str, any], List[any]]: """Make an HTTP request to a specific relation.""" - databag = self.first_database.fetch_relation_data()[relation_id] - logging.error(databag) + databag = self.first_opensearch.fetch_relation_data()[relation_id] username = databag.get("username") password = databag.get("password") - endpoints = databag.get("endpoints", "").split(",") + hosts = databag.get("endpoints", "").split(",") - if None in [username, password] or len(endpoints) == 0: + if None in [username, password] or not hosts: raise OpenSearchHttpError - host, port = endpoints[0].split(":") + host, port = hosts[0].split(":") return self.request( method, @@ -192,7 +191,6 @@ def request( """Make an HTTP request. TODO swap this over to a more normal opensearch client - Args: method: matching the known http methods. endpoint: relative to the base uri. @@ -211,7 +209,7 @@ def request( full_url = f"https://{host}:{port}/{endpoint}" request_kwargs = { - "verify": False, # TODO this should be a path to a cert once this relation has TLS. + "verify": CERT_PATH, "method": method.upper(), "url": full_url, "headers": {"Content-Type": "application/json", "Accept": "application/json"}, diff --git a/tests/integration/relations/opensearch_provider/helpers.py b/tests/integration/relations/opensearch_provider/helpers.py index 2cf63d66e..b9cb2ddc9 100644 --- a/tests/integration/relations/opensearch_provider/helpers.py +++ b/tests/integration/relations/opensearch_provider/helpers.py @@ -22,7 +22,7 @@ async def get_application_relation_data( Args: ops_test: The ops test framework instance - application_name: The name of the application + unit_name: The name of the unit relation_name: name of the relation to get connection data from key: key of data to be retrieved relation_id: id of the relation to get connection data from diff --git a/tests/integration/relations/opensearch_provider/test_opensearch_provider.py b/tests/integration/relations/opensearch_provider/test_opensearch_provider.py index 9fbdd442e..4f430ea58 100644 --- a/tests/integration/relations/opensearch_provider/test_opensearch_provider.py +++ b/tests/integration/relations/opensearch_provider/test_opensearch_provider.py @@ -15,9 +15,9 @@ from tests.integration.helpers import ( MODEL_CONFIG, SERIES, - UNIT_IDS, get_leader_unit_ip, http_request, + scale_application, ) from tests.integration.relations.opensearch_provider.helpers import ( get_application_relation_data, @@ -31,16 +31,14 @@ SECONDARY_CLIENT_APP_NAME = "secondary-application" TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" ALL_APPS = [OPENSEARCH_APP_NAME, TLS_CERTIFICATES_APP_NAME, CLIENT_APP_NAME] -FIRST_DATABASE_RELATION_NAME = "first-database" +FIRST_RELATION_NAME = "first-index" -NUM_UNITS = len(UNIT_IDS) +NUM_UNITS = 3 @pytest.mark.abort_on_fail -async def test_database_relation_with_charm_libraries( - ops_test: OpsTest, application_charm, opensearch_charm -): - """Test basic functionality of database relation interface.""" +async def test_create_relation(ops_test: OpsTest, application_charm, opensearch_charm): + """Test basic functionality of relation interface.""" # Deploy both charms (multiple units for each application to test that later they correctly # set data in the relation application databag using only the leader unit). await ops_test.model.set_config(MODEL_CONFIG) @@ -63,7 +61,7 @@ async def test_database_relation_with_charm_libraries( global client_relation client_relation = await ops_test.model.add_relation( - f"{OPENSEARCH_APP_NAME}:{ClientRelationName}", f"{CLIENT_APP_NAME}:first-database" + f"{OPENSEARCH_APP_NAME}:{ClientRelationName}", f"{CLIENT_APP_NAME}:first-index" ) wait_for_relation_joined_between(ops_test, OPENSEARCH_APP_NAME, CLIENT_APP_NAME) @@ -71,13 +69,13 @@ async def test_database_relation_with_charm_libraries( # This test shouldn't take so long await ops_test.model.wait_for_idle(apps=ALL_APPS, timeout=1200, status="active") - # await ops_test.model.block_until( - # lambda: ops_test.model.applications[OPENSEARCH_APP_NAME].status == "active", timeout=1000 - # ) +async def test_index_usage(ops_test: OpsTest): + """Check we can update and delete things. -async def test_database_usage(ops_test: OpsTest): - """Check we can update and delete things.""" + The client application authenticates using the cert provided in the index; if this is + invalid for any reason, the test will fail, so this test implicitly verifies that TLS works. + """ await run_request( ops_test, unit_name=ops_test.model.applications[CLIENT_APP_NAME].units[0].name, @@ -106,7 +104,7 @@ async def test_database_usage(ops_test: OpsTest): ) -async def test_database_bulk_usage(ops_test: OpsTest): +async def test_bulk_index_usage(ops_test: OpsTest): """Check we can update and delete things using bulk api.""" bulk_payload = """{ "index" : { "_index": "albums", "_id" : "2" } } {"artist": "Herbie Hancock", "genre": ["Jazz"], "title": "Head Hunters"} @@ -146,8 +144,8 @@ async def test_database_bulk_usage(ops_test: OpsTest): assert set(artists) == {"Herbie Hancock", "Lydian Collective", "Vulfpeck"} -async def test_database_version(ops_test: OpsTest): - """Check version is accurate.""" +async def test_version(ops_test: OpsTest): + """Check version reported in the databag is consistent with the version on the charm.""" run_version_request = await run_request( ops_test, unit_name=ops_test.model.applications[CLIENT_APP_NAME].units[0].name, @@ -155,10 +153,8 @@ async def test_database_version(ops_test: OpsTest): endpoint="/", relation_id=client_relation.id, ) - # Get the version of the database and compare with the information that - # was retrieved directly from the database. version = await get_application_relation_data( - ops_test, f"{CLIENT_APP_NAME}/0", FIRST_DATABASE_RELATION_NAME, "version" + ops_test, f"{CLIENT_APP_NAME}/0", FIRST_RELATION_NAME, "version" ) logging.info(run_version_request) logging.info(version) @@ -174,36 +170,90 @@ async def test_multiple_relations(ops_test: OpsTest, application_charm): application_name=SECONDARY_CLIENT_APP_NAME, ) + # Relate the new application and wait for them to exchange connection data. + await ops_test.model.add_relation( + f"{SECONDARY_CLIENT_APP_NAME}:{FIRST_RELATION_NAME}", OPENSEARCH_APP_NAME + ) + wait_for_relation_joined_between(ops_test, OPENSEARCH_APP_NAME, SECONDARY_CLIENT_APP_NAME) + async with ops_test.fast_forward(): - await asyncio.gather( - ops_test.model.wait_for_idle(status="active", apps=ALL_APPS), - ops_test.model.wait_for_idle(status="blocked", apps=[SECONDARY_CLIENT_APP_NAME]), + await ops_test.model.wait_for_idle( + status="active", apps=[SECONDARY_CLIENT_APP_NAME] + ALL_APPS, timeout=(60 * 20) ) - # Relate the new application with the database - # and wait for them exchanging some connection data. - await ops_test.model.add_relation( - f"{SECONDARY_CLIENT_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", OPENSEARCH_APP_NAME + +async def test_scaling(ops_test: OpsTest): + """Test that scaling correctly updates endpoints in databag. + + scale_application also contains a wait_for_idle check, including checking for active status. + """ + + async def rel_endpoints(app_name) -> str: + return await get_application_relation_data( + ops_test, f"{app_name}/0", FIRST_RELATION_NAME, "endpoints" + ) + + async def get_num_of_endpoints(app_name: str) -> int: + return len((await rel_endpoints(app_name)).split(",")) + + def get_num_of_units() -> int: + return len(ops_test.model.applications[OPENSEARCH_APP_NAME].units) + + # Test things are already working fine + assert await get_num_of_endpoints(CLIENT_APP_NAME) == get_num_of_units(), await rel_endpoints( + CLIENT_APP_NAME ) - wait_for_relation_joined_between(ops_test, OPENSEARCH_APP_NAME, SECONDARY_CLIENT_APP_NAME) + assert ( + await get_num_of_endpoints(SECONDARY_CLIENT_APP_NAME) == get_num_of_units() + ), await rel_endpoints(SECONDARY_CLIENT_APP_NAME) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + status="active", apps=[SECONDARY_CLIENT_APP_NAME] + ALL_APPS + ) + # Test scale down + # FIXME scale down seems to set endpoints to the removed unit, not the remaining units. + await scale_application(ops_test, OPENSEARCH_APP_NAME, get_num_of_units() - 1) async with ops_test.fast_forward(): await ops_test.model.wait_for_idle( status="active", apps=[SECONDARY_CLIENT_APP_NAME] + ALL_APPS ) + assert await get_num_of_endpoints(CLIENT_APP_NAME) == get_num_of_units(), await rel_endpoints( + CLIENT_APP_NAME + ) + assert ( + await get_num_of_endpoints(SECONDARY_CLIENT_APP_NAME) == get_num_of_units() + ), await rel_endpoints(SECONDARY_CLIENT_APP_NAME) + + # test scale back up again + await scale_application(ops_test, OPENSEARCH_APP_NAME, get_num_of_units() + 1) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + status="active", apps=[SECONDARY_CLIENT_APP_NAME] + ALL_APPS + ) + assert await get_num_of_endpoints(CLIENT_APP_NAME) == get_num_of_units(), await rel_endpoints( + CLIENT_APP_NAME + ) + assert ( + await get_num_of_endpoints(SECONDARY_CLIENT_APP_NAME) == get_num_of_units() + ), await rel_endpoints(SECONDARY_CLIENT_APP_NAME) async def test_relation_broken(ops_test: OpsTest): """Test that the user is removed when the relation is broken.""" # Retrieve the relation user. relation_user = await get_application_relation_data( - ops_test, f"{CLIENT_APP_NAME}/0", FIRST_DATABASE_RELATION_NAME, "username" + ops_test, f"{CLIENT_APP_NAME}/0", FIRST_RELATION_NAME, "username" ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + status="active", apps=[SECONDARY_CLIENT_APP_NAME] + ALL_APPS + ) # Break the relation. await ops_test.model.applications[OPENSEARCH_APP_NAME].remove_relation( f"{OPENSEARCH_APP_NAME}:{ClientRelationName}", - f"{CLIENT_APP_NAME}:{FIRST_DATABASE_RELATION_NAME}", + f"{CLIENT_APP_NAME}:{FIRST_RELATION_NAME}", ) async with ops_test.fast_forward(): @@ -228,3 +278,34 @@ async def test_relation_broken(ops_test: OpsTest): logger.error(relation_user) logger.error(users) assert relation_user not in users.keys() + + +async def test_data_persists_on_relation_rejoin(ops_test: OpsTest): + """Verify that if we recreate a relation, we can access the same index.""" + client_relation = await ops_test.model.add_relation( + f"{OPENSEARCH_APP_NAME}:{ClientRelationName}", f"{CLIENT_APP_NAME}:first-index" + ) + wait_for_relation_joined_between(ops_test, OPENSEARCH_APP_NAME, CLIENT_APP_NAME) + + async with ops_test.fast_forward(): + # This test shouldn't take so long + await ops_test.model.wait_for_idle( + apps=[SECONDARY_CLIENT_APP_NAME] + ALL_APPS, timeout=1200, status="active" + ) + + read_index_endpoint = "/albums/_search?q=Jazz" + run_bulk_read_index = await run_request( + ops_test, + unit_name=ops_test.model.applications[CLIENT_APP_NAME].units[0].name, + endpoint=read_index_endpoint, + method="GET", + relation_id=client_relation.id, + ) + results = json.loads(run_bulk_read_index["results"]) + logging.info(results) + assert results.get("timed_out") is False + assert results.get("hits", {}).get("total", {}).get("value") == 3 + artists = [ + hit.get("_source", {}).get("artist") for hit in results.get("hits", {}).get("hits", [{}]) + ] + assert set(artists) == {"Herbie Hancock", "Lydian Collective", "Vulfpeck"} diff --git a/tests/unit/lib/test_opensearch_base_charm.py b/tests/unit/lib/test_opensearch_base_charm.py index fa6de6aa5..82bef7aae 100644 --- a/tests/unit/lib/test_opensearch_base_charm.py +++ b/tests/unit/lib/test_opensearch_base_charm.py @@ -141,6 +141,7 @@ def test_on_start( self.peers_data.delete(Scope.APP, "security_index_initialised") _can_service_start.return_value = True self.harness.set_leader(True) + start.reset_mock() self.charm.on.start.emit() _get_nodes.assert_called() _set_node_conf.assert_called() @@ -150,7 +151,8 @@ def test_on_start( @patch(f"{BASE_LIB_PATH}.helper_security.cert_expiration_remaining_hours") @patch("ops.model.Model.get_relation") - def test_on_update_status(self, get_relation, cert_expiration_remaining_hours): + @patch("charms.opensearch.v0.opensearch_users.OpenSearchUserManager.remove_users_and_roles") + def test_on_update_status(self, _, get_relation, cert_expiration_remaining_hours): """Test on update status.""" with patch( f"{self.OPENSEARCH_DISTRO}.missing_sys_requirements" diff --git a/tests/unit/lib/test_opensearch_relation_provider.py b/tests/unit/lib/test_opensearch_relation_provider.py index 99cb91169..085f5095c 100644 --- a/tests/unit/lib/test_opensearch_relation_provider.py +++ b/tests/unit/lib/test_opensearch_relation_provider.py @@ -50,9 +50,9 @@ def setUp(self): "charms.opensearch.v0.opensearch_relation_provider.generate_hashed_password", return_value=("hashed_pw", "password"), ) - @patch("charms.data_platform_libs.v0.data_interfaces.DatabaseProvides.set_credentials") - @patch("charms.data_platform_libs.v0.data_interfaces.DatabaseProvides.set_version") - def test_on_database_requested( + @patch("charms.data_platform_libs.v0.data_interfaces.OpenSearchProvides.set_credentials") + @patch("charms.data_platform_libs.v0.data_interfaces.OpenSearchProvides.set_version") + def test_on_index_requested( self, _set_version, _set_credentials, @@ -60,8 +60,8 @@ def test_on_database_requested( _create_users, _opensearch_version, _is_node_up, - _init_admin, - _purge_users, + _, + __, ): event = MagicMock() event.relation.id = 1 @@ -69,22 +69,22 @@ def test_on_database_requested( hashed_pw, password = _gen_pw.return_value self.harness.set_leader(False) - self.opensearch_provider._on_database_requested(event) + self.opensearch_provider._on_index_requested(event) _is_node_up.assert_not_called() self.harness.set_leader(True) _is_node_up.return_value = False - self.opensearch_provider._on_database_requested(event) + self.opensearch_provider._on_index_requested(event) event.defer.assert_called() _is_node_up.return_value = True event.extra_user_roles = None - self.opensearch_provider._on_database_requested(event) + self.opensearch_provider._on_index_requested(event) self.assertIsInstance(self.unit.status, BlockedStatus) event.extra_user_roles = json.dumps({"roles": ["role"]}) self.unit.status = ActiveStatus() - self.opensearch_provider._on_database_requested(event) + self.opensearch_provider._on_index_requested(event) # no permissions or action groups in extra_user_roles, so we aren't creating a new role. _create_users.assert_called_with(username, hashed_pw, json.loads(event.extra_user_roles)) _set_credentials.assert_called_with(event.relation.id, username, password) @@ -94,7 +94,7 @@ def test_on_database_requested( _set_version.reset_mock() _create_users.side_effect = OpenSearchUserMgmtError() - self.opensearch_provider._on_database_requested(event) + self.opensearch_provider._on_index_requested(event) self.assertIsInstance(self.unit.status, BlockedStatus) _set_credentials.assert_not_called() _set_version.assert_not_called() @@ -132,6 +132,7 @@ def test_create_opensearch_users(self, _patch_user, _create_role, _create_user): patches = [{"op": "replace", "path": "/opendistro_security_roles", "value": roles}] _patch_user.assert_called_with(username, patches) + # @patch("charms.opensearch.v0.opensearch_relation_provider.unit_ip", return_value="1.1.1.2") def test_on_relation_departed(self): event = MagicMock() event.departing_unit = None @@ -156,6 +157,7 @@ def test_on_relation_departed(self): @patch("charm.OpenSearchOperatorCharm._purge_users") def test_on_relation_broken(self, _, __, _is_node_up, _remove_users, _unit_departing): event = MagicMock() + event.relation.id = 0 depart_flag = self.opensearch_provider._depart_flag(event.relation) self.harness.set_leader(False) @@ -176,14 +178,17 @@ def test_on_relation_broken(self, _, __, _is_node_up, _remove_users, _unit_depar assert not self.charm.peers_data.get(Scope.UNIT, depart_flag) _remove_users.assert_called_with(event.relation.id) - @patch("charms.data_platform_libs.v0.data_interfaces.DatabaseProvides.set_endpoints") - @patch( - "charms.opensearch.v0.opensearch_relation_provider.units_ips", - return_value={"1": "1.1.1.1", "2": "2.2.2.2"}, - ) - def test_update_endpoints(self, _ips, _set_endpoints): + @patch("charms.data_platform_libs.v0.data_interfaces.OpenSearchProvides.set_endpoints") + @patch("charm.OpenSearchOperatorCharm._get_nodes") + @patch("charm.OpenSearchOperatorCharm._put_admin_user") + @patch("charm.OpenSearchOperatorCharm._purge_users") + def test_update_endpoints(self, _, __, _nodes, _set_endpoints): + self.harness.set_leader(True) + node = MagicMock() + node.ip = "4.4.4.4" + _nodes.return_value = [node] relation = MagicMock() relation.id = 1 - endpoints = [f"{ip}:{self.charm.opensearch.port}" for ip in _ips.return_value.values()] + endpoints = [f"{node.ip}:{self.charm.opensearch.port}" for node in _nodes.return_value] self.opensearch_provider.update_endpoints(relation) _set_endpoints.assert_called_with(relation.id, ",".join(endpoints)) diff --git a/tests/unit/lib/test_opensearch_tls.py b/tests/unit/lib/test_opensearch_tls.py index 22325f731..c7d1afcb4 100644 --- a/tests/unit/lib/test_opensearch_tls.py +++ b/tests/unit/lib/test_opensearch_tls.py @@ -64,9 +64,10 @@ def test_find_secret(self): self.secret_store.put_object(Scope.APP, CertType.APP_ADMIN.val, {"csr": event_data_csr}) @patch("charms.opensearch.v0.opensearch_tls.OpenSearchTLS._request_certificate") + @patch("charms.rolling_ops.v0.rollingops.RollingOpsManager._update_locks") @patch("charm.OpenSearchOperatorCharm._put_admin_user") @patch("charm.OpenSearchOperatorCharm._purge_users") - def test_on_relation_joined_admin(self, _, _put_admin_user, _request_certificate): + def test_on_relation_joined_admin(self, _, __, _put_admin_user, _request_certificate): """Test on certificate relation joined event.""" event_mock = MagicMock() @@ -107,9 +108,10 @@ def test_on_relation_broken(self, on_tls_relation_broken): on_tls_relation_broken.assert_called_once() @patch("charms.opensearch.v0.opensearch_tls.OpenSearchTLS._request_certificate") + @patch("charms.rolling_ops.v0.rollingops.RollingOpsManager._update_locks") @patch("charm.OpenSearchOperatorCharm._put_admin_user") @patch("charm.OpenSearchOperatorCharm._purge_users") - def test_on_set_tls_private_key(self, _, _put_admin_user, _request_certificate): + def test_on_set_tls_private_key(self, _, __, _put_admin_user, _request_certificate): """Test _on_set_tls private key event.""" event_mock = MagicMock(params={"category": "app-admin"}) diff --git a/tests/unit/lib/test_opensearch_users.py b/tests/unit/lib/test_opensearch_users.py index 06c8759f5..f3b222c7f 100644 --- a/tests/unit/lib/test_opensearch_users.py +++ b/tests/unit/lib/test_opensearch_users.py @@ -92,7 +92,7 @@ def test_remove_user(self, _request): user_name = "username" with pytest.raises(OpenSearchUserMgmtError): self.mgr.remove_user(user_name) - request_args = ("DELETE", "/_plugins/_security/api/internalusers/username/") + request_args = ("DELETE", "/_plugins/_security/api/internalusers/username") self.opensearch.request.assert_called_with(*request_args) self.opensearch.request.reset_mock() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 90b1db361..865b92f3a 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1,7 +1,6 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -# -# Learn more about testing at: https://juju.is/docs/sdk/testing + import tempfile from os import listdir from os.path import isfile, join