From 785002f6aaee8aa03f03b8e030b435c20e286a91 Mon Sep 17 00:00:00 2001 From: Victor Cavichioli <79488234+VictorCavichioli@users.noreply.github.com> Date: Tue, 1 Oct 2024 08:45:08 -0300 Subject: [PATCH] Introduce DistributedJmxProxy and DistributedJmxProxyFactory (#727) --- CHANGES.md | 1 + .../providers/AgentJmxConnectionProvider.java | 12 + .../spring/RetrySchedulerService.java | 1 - .../DistributedJmxConnectionProviderImpl.java | 10 +- .../DistributedJmxConnectionProvider.java | 2 + core.impl/pom.xml | 6 + .../jmx/DistributedJmxProxyFactoryImpl.java | 548 ++++++++++++++++++ .../ecchronos/core/impl/jmx/package-info.java | 18 + .../impl/table/TableReferenceFactoryImpl.java | 161 +++++ .../core/impl/table/package-info.java | 18 + .../TestDistributedJmxProxyFactoryImpl.java | 176 ++++++ .../table/TestTableReferenceFactoryImpl.java | 357 ++++++++++++ .../core/jmx/DistributedJmxProxy.java | 149 +++++ .../core/jmx/DistributedJmxProxyFactory.java | 31 + .../ecchronos/core/jmx/package-info.java | 18 + .../core/repair/config/RepairOptions.java | 73 +++ .../core/repair/config/package-info.java | 18 + .../ecchronos/core/repair/package-info.java | 18 + docs/ARCHITECTURE.md | 3 +- pmd-rules.xml | 1 + .../utils/enums/repair/RepairParallelism.java | 43 ++ .../utils/enums/repair/package-info.java | 18 + 22 files changed, 1678 insertions(+), 4 deletions(-) create mode 100644 core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/DistributedJmxProxyFactoryImpl.java create mode 100644 core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/package-info.java create mode 100644 core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableReferenceFactoryImpl.java create mode 100644 core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/package-info.java create mode 100644 core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/TestDistributedJmxProxyFactoryImpl.java create mode 100644 core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TestTableReferenceFactoryImpl.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxy.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxyFactory.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/package-info.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/RepairOptions.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/package-info.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/package-info.java create mode 100644 utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/RepairParallelism.java create mode 100644 utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/package-info.java diff --git a/CHANGES.md b/CHANGES.md index 26dc1f11a..a6496a304 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Version 1.0.0 (Not yet Released) +* Create DistributedJmxProxy and DistributedJmxProxyFactory - Issue #715 * Create a New Maven Module "utils" for Common Code Reuse - Issue #720 * Implement ReplicationStateImpl to Manage and Cache Token Range to Replica Mappings - Issue #719 * Implement NodeResolverImpl to Resolve Nodes by IP Address and UUID - Issue #718 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java index bf551f8e8..a7b8c4097 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java @@ -133,6 +133,18 @@ public JMXConnector getJmxConnector(final UUID nodeID) return myDistributedJmxConnectionProviderImpl.getJmxConnector(nodeID); } + /** + * validate if the given JMXConnector is available. + * + * @param jmxConnector + * The jmxConnector to validate + * @return A boolean representing the node's connection status. + */ + @Override + public boolean isConnected(final JMXConnector jmxConnector) + { + return myDistributedJmxConnectionProviderImpl.isConnected(jmxConnector); + } /** * Closes the JMX connection associated with the specified node ID. * diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java index 22a43d3e0..e41470487 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java @@ -62,7 +62,6 @@ @Service public final class RetrySchedulerService implements DisposableBean { - private static final Logger LOG = LoggerFactory.getLogger(RetrySchedulerService.class); private static final String COLUMN_NODE_ID = "node_id"; private static final String COLUMN_NODE_STATUS = "node_status"; diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java index 88bbbb6d8..c40253a7c 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java @@ -48,7 +48,15 @@ public DistributedJmxConnectionProviderImpl( myJMXConnections = jmxConnections; } - private static boolean isConnected(final JMXConnector jmxConnector) + /** + * validate if the given JMXConnector is available. + * + * @param jmxConnector + * The jmxConnector to validate + * @return A boolean representing the node's connection status. + */ + @Override + public boolean isConnected(final JMXConnector jmxConnector) { try { diff --git a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java index 7fad4ba98..17961619a 100644 --- a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java +++ b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java @@ -27,6 +27,8 @@ public interface DistributedJmxConnectionProvider extends Closeable JMXConnector getJmxConnector(UUID nodeID); + boolean isConnected(JMXConnector jmxConnector); + @Override default void close() throws IOException { diff --git a/core.impl/pom.xml b/core.impl/pom.xml index cc6c80ee7..c200e7664 100644 --- a/core.impl/pom.xml +++ b/core.impl/pom.xml @@ -41,6 +41,12 @@ ${project.version} + + com.ericsson.bss.cassandra.ecchronos + data + ${project.version} + + com.datastax.oss diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/DistributedJmxProxyFactoryImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/DistributedJmxProxyFactoryImpl.java new file mode 100644 index 000000000..71e4739c5 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/DistributedJmxProxyFactoryImpl.java @@ -0,0 +1,548 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.jmx; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxy; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; + +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.sync.NodeStatus; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.ArrayList; +import javax.management.ObjectName; +import java.io.IOException; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.NotificationListener; +import javax.management.ReflectionException; +import javax.management.openmbean.CompositeData; +import javax.management.remote.JMXConnector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A factory creating JMX proxies to Cassandra. + */ +@SuppressWarnings({"PMD.ClassWithOnlyPrivateConstructorsShouldBeFinal", "checkstyle:finalclass"}) +public class DistributedJmxProxyFactoryImpl implements DistributedJmxProxyFactory +{ + private static final Logger LOG = LoggerFactory.getLogger(DistributedJmxProxyFactoryImpl.class); + private static final String SS_OBJ_NAME = "org.apache.cassandra.db:type=StorageService"; + private static final String RS_OBJ_NAME = "org.apache.cassandra.db:type=RepairService"; + private static final String LIVE_NODES_ATTRIBUTE = "LiveNodes"; + private static final String UNREACHABLE_NODES_ATTRIBUTE = "UnreachableNodes"; + private static final String FORCE_TERMINATE_ALL_REPAIR_SESSIONS_METHOD = "forceTerminateAllRepairSessions"; + private static final String REPAIR_ASYNC_METHOD = "repairAsync"; + private static final String REPAIR_STATS_METHOD = "getRepairStats"; + + private final DistributedJmxConnectionProvider myDistributedJmxConnectionProvider; + private final Map nodesMap; + private final EccNodesSync eccNodesSync; + + private DistributedJmxProxyFactoryImpl(final Builder builder) + { + myDistributedJmxConnectionProvider = builder.myDistributedJmxConnectionProvider; + nodesMap = builder.myNodesMap; + eccNodesSync = builder.myEccNodesSync; + } + + @Override + public DistributedJmxProxy connect() throws IOException + { + try + { + return new InternalDistributedJmxProxy( + myDistributedJmxConnectionProvider, + nodesMap, + eccNodesSync); + } + catch (MalformedObjectNameException e) + { + throw new IOException("Unable to get StorageService object", e); + } + } + + private static final class InternalDistributedJmxProxy implements DistributedJmxProxy + { + private final DistributedJmxConnectionProvider myDistributedJmxConnectionProvider; + private final Map myNodesMap; + private final EccNodesSync myEccNodesSync; + private final ObjectName myStorageServiceObject; + private final ObjectName myRepairServiceObject; + + private InternalDistributedJmxProxy( + final DistributedJmxConnectionProvider distributedJmxConnectionProvider, + final Map nodesMap, + final EccNodesSync eccNodesSync + ) throws MalformedObjectNameException + { + myDistributedJmxConnectionProvider = distributedJmxConnectionProvider; + myNodesMap = nodesMap; + myEccNodesSync = eccNodesSync; + myStorageServiceObject = new ObjectName(SS_OBJ_NAME); + myRepairServiceObject = new ObjectName(RS_OBJ_NAME); + } + + @Override + public void close() + { + // Should not close + } + + @Override + public void addStorageServiceListener(final UUID nodeID, final NotificationListener listener) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + nodeConnection.addConnectionNotificationListener(listener, null, null); + nodeConnection.getMBeanServerConnection().addNotificationListener(myStorageServiceObject, listener, null, null); + } + catch (InstanceNotFoundException | IOException e) + { + LOG.error("Unable to add StorageService listener in node {} with because of {}", nodeID, e.getMessage()); + } + } + else + { + markNodeAsUnavailable(nodeID); + } + } + + @SuppressWarnings("unchecked") + @Override + public List getLiveNodes(final UUID nodeID) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + return (List) nodeConnection + .getMBeanServerConnection() + .getAttribute(myStorageServiceObject, + LIVE_NODES_ATTRIBUTE); + } + catch (InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException + | AttributeNotFoundException e) + { + LOG.error("Unable to get live nodes for node {} because of {}", nodeID, e.getMessage()); + } + } + else + { + markNodeAsUnavailable(nodeID); + } + return Collections.emptyList(); + } + + @SuppressWarnings("unchecked") + @Override + public List getUnreachableNodes(final UUID nodeID) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + return (List) nodeConnection + .getMBeanServerConnection() + .getAttribute(myStorageServiceObject, + UNREACHABLE_NODES_ATTRIBUTE); + } + catch (InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException + | AttributeNotFoundException e) + { + LOG.error("Unable to get unreachable nodes for node {} because of {}", nodeID, e.getMessage()); + } + } + else + { + markNodeAsUnavailable(nodeID); + } + return Collections.emptyList(); + } + + @Override + public int repairAsync( + final UUID nodeID, + final String keyspace, + final Map options) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + return (int) nodeConnection + .getMBeanServerConnection().invoke(myStorageServiceObject, + REPAIR_ASYNC_METHOD, + new Object[] + { + keyspace, options + }, + new String[] + { + String.class.getName(), Map.class.getName() + }); + } + catch (InstanceNotFoundException | MBeanException | ReflectionException | IOException e) + { + LOG.error("Unable to repair node {} because of {}", nodeID, e.getMessage()); + } + } + else + { + markNodeAsUnavailable(nodeID); + } + return 0; + } + + /** + * Force terminate all repair sessions in all nodes. + */ + @Override + public void forceTerminateAllRepairSessions() + { + for ( + Map.Entry entry + : + myDistributedJmxConnectionProvider.getJmxConnections().entrySet()) + { + forceTerminateAllRepairSessionsInSpecificNode(entry.getKey()); + } + } + + @Override + public void forceTerminateAllRepairSessionsInSpecificNode(final UUID nodeID) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + nodeConnection + .getMBeanServerConnection().invoke(myStorageServiceObject, + FORCE_TERMINATE_ALL_REPAIR_SESSIONS_METHOD, + null, null); + } + catch (InstanceNotFoundException | MBeanException | ReflectionException | IOException e) + { + LOG.error("Unable to terminate repair sessions for node {} because of {}", nodeID, e.getMessage()); + } + } + else + { + LOG.error("Unable to terminate repair sessions for node {} because the connection is unavailable", nodeID); + markNodeAsUnavailable(nodeID); + } + + } + + /** + * Remove the storage service listener. + * + * @param listener + * The listener to remove. + */ + @Override + public void removeStorageServiceListener( + final UUID nodeID, + final NotificationListener listener) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + + if (isConnectionAvailable) + { + try + { + nodeConnection.removeConnectionNotificationListener(listener); + nodeConnection.getMBeanServerConnection().removeNotificationListener(myStorageServiceObject, listener); + } + catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) + { + LOG.error("Unable to remove StorageService listener for node {} because of {}", nodeID, e.getMessage()); + } + } + else + { + LOG.error("Unable to remove StorageService listener for node {} because the connection is unavailable", nodeID); + markNodeAsUnavailable(nodeID); + } + + } + + /** + * Get the live disk space used. + * + * @param tableReference + * The table to get the live disk space for. + * @return long + */ + @Override + public long liveDiskSpaceUsed( + final UUID nodeID, + final TableReference tableReference) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + + if (isConnectionAvailable) + { + try + { + ObjectName objectName + = new ObjectName(String + .format("org.apache.cassandra.metrics:type=Table,keyspace=%s,scope=%s,name=LiveDiskSpaceUsed", + tableReference.getKeyspace(), tableReference.getTable())); + + return (Long) nodeConnection + .getMBeanServerConnection().getAttribute(objectName, "Count"); + } + catch (AttributeNotFoundException + | InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException + | MalformedObjectNameException e) + { + LOG.error("Unable to retrieve disk space usage for table {} in node {} because of {}", tableReference, + nodeID, + e.getMessage()); + } + } + else + { + LOG.error("Unable to retrieve disk space usage for table {} in node {} because the connection is unavailable", + tableReference, + nodeID); + markNodeAsUnavailable(nodeID); + } + return 0; + } + + @SuppressWarnings("unchecked") + @Override + public long getMaxRepairedAt( + final UUID nodeID, + final TableReference tableReference) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + List args = new ArrayList<>(); + args.add(tableReference.getKeyspace()); + args.add(tableReference.getTable()); + List compositeDatas = (List) nodeConnection + .getMBeanServerConnection().invoke( + myRepairServiceObject, REPAIR_STATS_METHOD, + new Object[] + { + args, null + }, + new String[] + { + List.class.getName(), + String.class.getName() + }); + for (CompositeData data : compositeDatas) + { + return (long) data.getAll(new String[] {"maxRepaired"})[0]; + } + } + catch (InstanceNotFoundException | MBeanException | ReflectionException | IOException e) + { + LOG.error("Unable to get maxRepaired for table {} in node {} because of {}", tableReference, nodeID, e.getMessage()); + } + } + else + { + LOG.error("Unable to get maxRepaired for table {} in node {} because the connection is unavailable", + tableReference, + nodeID); + markNodeAsUnavailable(nodeID); + } + return 0; + } + + @Override + public double getPercentRepaired( + final UUID nodeID, + final TableReference tableReference) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + ObjectName objectName + = new ObjectName(String + .format("org.apache.cassandra.metrics:type=Table,keyspace=%s,scope=%s,name=PercentRepaired", + tableReference.getKeyspace(), tableReference.getTable())); + + return (double) nodeConnection + .getMBeanServerConnection().getAttribute(objectName, "Value"); + } + catch (AttributeNotFoundException + | InstanceNotFoundException + | MBeanException + | ReflectionException + | IOException + | MalformedObjectNameException e) + { + LOG.error("Unable to retrieve disk space usage for {} in node {}, because of {}", + tableReference, nodeID, e.getMessage()); + } + } + else + { + markNodeAsUnavailable(nodeID); + } + return 0.0; + } + + @Override + public String getNodeStatus(final UUID nodeID) + { + JMXConnector nodeConnection = myDistributedJmxConnectionProvider.getJmxConnector(nodeID); + boolean isConnectionAvailable = validateJmxConnection(nodeConnection); + if (isConnectionAvailable) + { + try + { + return (String) nodeConnection + .getMBeanServerConnection().getAttribute(myStorageServiceObject, "OperationMode"); + } + catch (InstanceNotFoundException + | AttributeNotFoundException + | MBeanException + | ReflectionException + | IOException e) + { + LOG.error("Unable to retrieve node status for {} because of {}", nodeID, e.getMessage()); + } + } + else + { + markNodeAsUnavailable(nodeID); + } + return "Unknown"; + } + + @Override + public boolean validateJmxConnection(final JMXConnector jmxConnector) + { + return myDistributedJmxConnectionProvider.isConnected(jmxConnector); + } + + private void markNodeAsUnavailable(final UUID nodeID) + { + LOG.error("Unable to get connection with node {}, marking as UNREACHABLE", nodeID); + myEccNodesSync.updateNodeStatus(NodeStatus.UNAVAILABLE, myNodesMap.get(nodeID).getDatacenter(), nodeID); + } + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private DistributedJmxConnectionProvider myDistributedJmxConnectionProvider; + private Map myNodesMap; + private EccNodesSync myEccNodesSync; + + /** + * Build with JMX connection provider. + * + * @param distributedJmxConnectionProvider The JMX connection provider + * @return Builder + */ + public Builder withJmxConnectionProvider(final DistributedJmxConnectionProvider distributedJmxConnectionProvider) + { + myDistributedJmxConnectionProvider = distributedJmxConnectionProvider; + return this; + } + + /** + * Build with Nodes map. + * + * @param nodesMap The Nodes map + * @return Builder + */ + public Builder withNodesMap(final Map nodesMap) + { + myNodesMap = nodesMap; + return this; + } + + /** + * Build with EccNodesSync. + * + * @param eccNodesSync The EccNodesSync + * @return Builder + */ + public Builder withEccNodesSync(final EccNodesSync eccNodesSync) + { + myEccNodesSync = eccNodesSync; + return this; + } + + /** + * Build. + * + * @return DistributedJmxProxyFactoryImpl + */ + public DistributedJmxProxyFactoryImpl build() + { + if (myDistributedJmxConnectionProvider == null) + { + throw new IllegalArgumentException("JMX Connection provider cannot be null"); + } + return new DistributedJmxProxyFactoryImpl(this); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/package-info.java new file mode 100644 index 000000000..9516d79fc --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementation and resources for jmx proxies. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.jmx; diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableReferenceFactoryImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableReferenceFactoryImpl.java new file mode 100644 index 000000000..3aba8c825 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableReferenceFactoryImpl.java @@ -0,0 +1,161 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.table; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.Metadata; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import com.google.common.base.Preconditions; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +/** + * A table reference factory using tables existing in Cassandra. + * Each unique table contains one specific table reference to avoid creating a lot of copies of table references. + */ +public class TableReferenceFactoryImpl implements TableReferenceFactory +{ + private final CqlSession session; + + public TableReferenceFactoryImpl(final CqlSession aSession) + { + this.session = Preconditions.checkNotNull(aSession, "Session must be set"); + } + + @Override + public final TableReference forTable(final String keyspace, final String table) + { + Optional keyspaceMetadata = Metadata.getKeyspace(session, keyspace); + if (keyspaceMetadata.isEmpty()) + { + return null; + } + Optional tableMetadata = Metadata.getTable(keyspaceMetadata.get(), table); + return tableMetadata.map(UuidTableReference::new).orElse(null); + + } + + @Override + public final TableReference forTable(final TableMetadata table) + { + return new UuidTableReference(table); + } + + @Override + public final Set forKeyspace(final String keyspace) throws EcChronosException + { + Set tableReferences = new HashSet<>(); + Optional keyspaceMetadata = Metadata.getKeyspace(session, keyspace); + if (!keyspaceMetadata.isPresent()) + { + throw new EcChronosException("Keyspace " + keyspace + " does not exist"); + } + for (TableMetadata table : keyspaceMetadata.get().getTables().values()) + { + tableReferences.add(new UuidTableReference(table)); + } + return tableReferences; + } + + @Override + public final Set forCluster() + { + Set tableReferences = new HashSet<>(); + for (KeyspaceMetadata keyspace : session.getMetadata().getKeyspaces().values()) + { + for (TableMetadata table : keyspace.getTables().values()) + { + tableReferences.add(new UuidTableReference(table)); + } + } + return tableReferences; + } + + class UuidTableReference implements TableReference + { + private final UUID uuid; + private final String keyspace; + private final String table; + private final int gcGraceSeconds; + + UuidTableReference(final TableMetadata tableMetadata) + { + uuid = tableMetadata.getId().get(); + keyspace = tableMetadata.getKeyspace().asInternal(); + table = tableMetadata.getName().asInternal(); + gcGraceSeconds = (int) tableMetadata.getOptions().get(CqlIdentifier.fromInternal("gc_grace_seconds")); + } + + @Override + public UUID getId() + { + return uuid; + } + + @Override + public String getKeyspace() + { + return keyspace; + } + + @Override + public String getTable() + { + return table; + } + + @Override + public int getGcGraceSeconds() + { + return gcGraceSeconds; + } + + @Override + public String toString() + { + return keyspace + "." + table; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + UuidTableReference that = (UuidTableReference) o; + return uuid.equals(that.uuid) && keyspace.equals(that.keyspace) && table.equals(that.table); + } + + @Override + public int hashCode() + { + return Objects.hash(uuid, keyspace, table); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/package-info.java new file mode 100644 index 000000000..5bb8bff9a --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementation and resources for tables and keyspace. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.table; diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/TestDistributedJmxProxyFactoryImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/TestDistributedJmxProxyFactoryImpl.java new file mode 100644 index 000000000..6656a0e49 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/TestDistributedJmxProxyFactoryImpl.java @@ -0,0 +1,176 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.jmx; + +import com.datastax.oss.driver.api.core.metadata.Node; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxy; +import java.util.*; +import javax.management.*; +import javax.management.openmbean.CompositeData; +import javax.management.remote.JMXConnector; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestDistributedJmxProxyFactoryImpl +{ + + @Mock + private DistributedJmxConnectionProvider mockConnectionProvider; + + @Mock + private JMXConnector mockConnector; + + @Mock + private MBeanServerConnection mockMBeanServerConnection; + + @Mock + private EccNodesSync mockEccNodesSync; + + private DistributedJmxProxy distributedJmxProxy; + + private final UUID nodeId = UUID.randomUUID(); + private final Map mockNodesMap = new HashMap<>(); + + @Before + public void setUp() throws Exception + { + mockNodesMap.put(nodeId, mock(Node.class)); + + when(mockConnectionProvider.getJmxConnector(nodeId)).thenReturn(mockConnector); + when(mockConnector.getMBeanServerConnection()).thenReturn(mockMBeanServerConnection); + + distributedJmxProxy = DistributedJmxProxyFactoryImpl.builder() + .withJmxConnectionProvider(mockConnectionProvider) + .withNodesMap(mockNodesMap) + .withEccNodesSync(mockEccNodesSync).build().connect(); + + when(distributedJmxProxy.validateJmxConnection(mockConnector)).thenReturn(true); + } + + @Test + public void testGetLiveNodes() throws Exception + { + List expectedLiveNodes = Arrays.asList("127.0.0.1", "192.168.0.1"); + + when(mockMBeanServerConnection.getAttribute(any(ObjectName.class), any(String.class))) + .thenReturn(expectedLiveNodes); + + List liveNodes = distributedJmxProxy.getLiveNodes(nodeId); + + assertEquals(expectedLiveNodes, liveNodes); + verify(mockMBeanServerConnection).getAttribute(any(ObjectName.class), eq("LiveNodes")); + } + + @Test + public void testGetUnreachableNodes() throws Exception + { + List expectedUnreachableNodes = Arrays.asList("10.0.0.1", "10.0.0.2"); + + when(mockMBeanServerConnection.getAttribute(any(ObjectName.class), eq("UnreachableNodes"))) + .thenReturn(expectedUnreachableNodes); + + List unreachableNodes = distributedJmxProxy.getUnreachableNodes(nodeId); + + assertEquals(expectedUnreachableNodes, unreachableNodes); + verify(mockMBeanServerConnection).getAttribute(any(ObjectName.class), eq("UnreachableNodes")); + } + + @Test + public void testRepairAsync() throws Exception + { + String keyspace = "test_keyspace"; + Map options = new HashMap<>(); + options.put("option1", "value1"); + + int expectedRepairId = 42; + + when(mockMBeanServerConnection.invoke(any(ObjectName.class), eq("repairAsync"), + any(Object[].class), any(String[].class))).thenReturn(expectedRepairId); + + int repairId = distributedJmxProxy.repairAsync(nodeId, keyspace, options); + + assertEquals(expectedRepairId, repairId); + verify(mockMBeanServerConnection).invoke(any(ObjectName.class), eq("repairAsync"), + any(Object[].class), any(String[].class)); + } + + @Test + public void testForceTerminateAllRepairSessionsInSpecificNode() throws Exception + { + + distributedJmxProxy.forceTerminateAllRepairSessionsInSpecificNode(nodeId); + + verify(mockMBeanServerConnection).invoke(any(ObjectName.class), eq("forceTerminateAllRepairSessions"), + isNull(), isNull()); + } + + @Test + public void testLiveDiskSpaceUsed() throws Exception + { + TableReference mockTableReference = mock(TableReference.class); + when(mockTableReference.getKeyspace()).thenReturn("test_keyspace"); + when(mockTableReference.getTable()).thenReturn("test_table"); + long expectedDiskSpaceUsed = 1024L; + + when(mockMBeanServerConnection.getAttribute(any(ObjectName.class), eq("Count"))).thenReturn(expectedDiskSpaceUsed); + + long diskSpaceUsed = distributedJmxProxy.liveDiskSpaceUsed(nodeId, mockTableReference); + + assertEquals(expectedDiskSpaceUsed, diskSpaceUsed); + verify(mockMBeanServerConnection).getAttribute(any(ObjectName.class), eq("Count")); + } + + @Test + public void testGetMaxRepairedAt() throws Exception + { + TableReference mockTableReference = mock(TableReference.class); + when(mockTableReference.getKeyspace()).thenReturn("test_keyspace"); + when(mockTableReference.getTable()).thenReturn("test_table"); + + // Mocking the return of MBeanServerConnection.invoke() to return a list with CompositeData + List mockCompositeDataList = new ArrayList<>(); + CompositeData mockCompositeData = mock(CompositeData.class); + + // Adding the CompositeData mock object to the list + mockCompositeDataList.add(mockCompositeData); + + // Configuring the behavior for getAll() on CompositeData + when(mockCompositeData.getAll(any(String[].class))).thenReturn(new Object[] { 123456L }); + + // Mocking the invoke method to return the mockCompositeDataList + when(mockMBeanServerConnection.invoke(any(ObjectName.class), eq("getRepairStats"), + any(Object[].class), any(String[].class))).thenReturn(mockCompositeDataList); + + // Now calling the method under test + long maxRepairedAt = distributedJmxProxy.getMaxRepairedAt(nodeId, mockTableReference); + + // Assert that the value returned matches the mocked value + assertEquals(123456L, maxRepairedAt); + + // Verifying interactions with the mock objects + verify(mockMBeanServerConnection).invoke(any(ObjectName.class), eq("getRepairStats"), + any(Object[].class), any(String[].class)); + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TestTableReferenceFactoryImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TestTableReferenceFactoryImpl.java new file mode 100644 index 000000000..c3c0f6e7a --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TestTableReferenceFactoryImpl.java @@ -0,0 +1,357 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.table; + +import com.datastax.oss.driver.internal.core.util.Strings; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class TestTableReferenceFactoryImpl +{ + @Mock + private Metadata mockMetadata; + + @Mock + private CqlSession mockCqlSession; + + private TableReferenceFactory tableReferenceFactory; + + private Map mockedKeyspaces = new HashMap<>(); + + @Before + public void setup() + { + when(mockCqlSession.getMetadata()).thenReturn(mockMetadata); + tableReferenceFactory = new TableReferenceFactoryImpl(mockCqlSession); + } + + @Test + public void testGetKeyspaceAndTableName() + { + mockTable("keyspace1", "table1"); + + TableReference tableReference = tableReferenceFactory.forTable("keyspace1", "table1"); + + assertThat(tableReference.getKeyspace()).isEqualTo("keyspace1"); + assertThat(tableReference.getTable()).isEqualTo("table1"); + } + + @Test + public void testGetKeyspaceAndTableNameWithCamelCase() + { + mockTable("keyspaceWithCamelCase1", "tableWithCamelCase1"); + + TableReference tableReference = tableReferenceFactory.forTable("keyspaceWithCamelCase1", "tableWithCamelCase1"); + + assertThat(tableReference.getKeyspace()).isEqualTo("keyspaceWithCamelCase1"); + assertThat(tableReference.getTable()).isEqualTo("tableWithCamelCase1"); + } + + @Test + public void testNewTableIsNotEqual() + { + mockTable("keyspace1", "table1"); + + TableReference tableReference1 = tableReferenceFactory.forTable("keyspace1", "table1"); + + mockTable("keyspace1", "table1"); // New table uuid + TableReference tableReference2 = tableReferenceFactory.forTable("keyspace1", "table1"); + + assertThat(tableReference1).isNotEqualTo(tableReference2); + } + + @Test + public void testSameTableIsEqual() + { + mockTable("keyspace1", "table1"); + + TableReference tableReference1 = tableReferenceFactory.forTable("keyspace1", "table1"); + TableReference tableReference2 = tableReferenceFactory.forTable("keyspace1", "table1"); + + assertThat(tableReference1).isEqualTo(tableReference2); + } + + @Test + public void testDifferentKeyspacesNotEqual() + { + mockTable("keyspace1", "table1"); + mockTable("keyspace2", "table1"); + + TableReference tableReference1 = tableReferenceFactory.forTable("keyspace1", "table1"); + TableReference tableReference2 = tableReferenceFactory.forTable("keyspace2", "table1"); + + assertThat(tableReference1).isNotEqualTo(tableReference2); + assertThat(tableReference1.hashCode()).isNotEqualTo(tableReference2.hashCode()); + } + + @Test + public void testDifferentTablesNotEqual() + { + mockTable("keyspace1", "table1"); + mockTable("keyspace1", "table2"); + + TableReference tableReference1 = tableReferenceFactory.forTable("keyspace1", "table1"); + TableReference tableReference2 = tableReferenceFactory.forTable("keyspace1", "table2"); + + assertThat(tableReference1).isNotEqualTo(tableReference2); + assertThat(tableReference1.hashCode()).isNotEqualTo(tableReference2.hashCode()); + } + + @Test + public void testForKeyspace() throws EcChronosException + { + Set tables = new HashSet<>(); + tables.add("table1"); + tables.add("table2"); + mockKeyspace("keyspace111", tables); + mockKeyspace("keyspace222", Collections.singleton("table1")); + Set tableReferences = tableReferenceFactory.forKeyspace("keyspace111"); + assertThat(tableReferences).hasSize(2); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("keyspace111")) + .collect(Collectors.toList())).isNotEmpty(); + } + + @Test + public void testForKeyspaceKeyspaceWithCamelCase() throws EcChronosException + { + Set tables = new HashSet<>(); + tables.add("table1"); + tables.add("table2"); + mockKeyspace("keyspaceWithCamelCase111", tables); + mockKeyspace("keyspace222", Collections.singleton("table1")); + Set tableReferences = tableReferenceFactory.forKeyspace("keyspaceWithCamelCase111"); + assertThat(tableReferences).hasSize(2); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("keyspaceWithCamelCase111")) + .collect(Collectors.toList())).isNotEmpty(); + } + + @Test + public void testForKeyspaceNoTables() throws EcChronosException + { + mockEmptyKeyspace("keyspace_empty"); + Set tableReferences = tableReferenceFactory.forKeyspace("keyspace_empty"); + assertThat(tableReferences).hasSize(0); + } + + @Test (expected = EcChronosException.class) + public void testForKeyspaceDoesNotExist() throws EcChronosException + { + tableReferenceFactory.forKeyspace("keyspaceEmpty"); + } + + @Test + public void testForCluster() + { + Set firstKsTables = new HashSet<>(); + firstKsTables.add("table1"); + firstKsTables.add("table2"); + mockKeyspace("firstks", firstKsTables); + mockKeyspace("secondks", Collections.singleton("table1")); + mockKeyspace("thirdks", Collections.singleton("table1")); + + when(mockMetadata.getKeyspaces()).thenReturn(mockedKeyspaces); + Set tableReferences = tableReferenceFactory.forCluster(); + assertThat(tableReferences).isNotEmpty(); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("firstks")) + .collect(Collectors.toList())).hasSize(2); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("secondks")) + .collect(Collectors.toList())).hasSize(1); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("thirdks")) + .collect(Collectors.toList())).hasSize(1); + } + + @Test + public void testForClusterWithCamelCase() + { + Set firstKsTables = new HashSet<>(); + firstKsTables.add("table1"); + firstKsTables.add("tablEE2"); + mockKeyspace("firstKs", firstKsTables); + mockKeyspace("secondks", Collections.singleton("table1")); + mockKeyspace("thirdKs", Collections.singleton("table1")); + + when(mockMetadata.getKeyspaces()).thenReturn(mockedKeyspaces); + Set tableReferences = tableReferenceFactory.forCluster(); + assertThat(tableReferences).isNotEmpty(); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("firstKs")) + .collect(Collectors.toList())).hasSize(2); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("secondks")) + .collect(Collectors.toList())).hasSize(1); + assertThat(tableReferences.stream() + .filter(t -> t.getKeyspace().equals("thirdKs")) + .collect(Collectors.toList())).hasSize(1); + } + + private void mockKeyspace(String keyspace, Set tables) + { + CqlIdentifier keySpaceIdentifier = CqlIdentifier.fromInternal(keyspace); + KeyspaceMetadata keyspaceMetadata = mockedKeyspaces.computeIfAbsent(keySpaceIdentifier, k -> { + KeyspaceMetadata mockedKeyspace = mock(KeyspaceMetadata.class); + when(mockedKeyspace.getName()).thenReturn(keySpaceIdentifier); + Map tableMetadatas = new HashMap<>(); + for (String table : tables) + { + TableMetadata tableMetadata = mock(TableMetadata.class); + when(tableMetadata.getId()).thenReturn(Optional.of(UUID.randomUUID())); + when(tableMetadata.getName()).thenReturn(CqlIdentifier.fromInternal(table)); + when(tableMetadata.getKeyspace()).thenReturn(keySpaceIdentifier); + Map options = new HashMap<>(); + options.put(CqlIdentifier.fromInternal("gc_grace_seconds"), MockTableReferenceFactory.DEFAULT_GC_GRACE_SECONDS); + when(tableMetadata.getOptions()).thenReturn(options); + tableMetadatas.put(CqlIdentifier.fromInternal(table), tableMetadata); + when(mockedKeyspace.getTable(eq(table))).thenReturn(Optional.of(tableMetadata)); + } + when(mockedKeyspace.getTables()).thenReturn(tableMetadatas); + return mockedKeyspace; + }); + if (Strings.needsDoubleQuotes(keyspace)) + { + when(mockMetadata.getKeyspace(eq("\""+keyspace+"\""))).thenReturn(Optional.of(keyspaceMetadata)); + } + else + { + when(mockMetadata.getKeyspace(eq(keyspace))).thenReturn(Optional.of(keyspaceMetadata)); + } + } + + @Test + public void testNullWithoutKeyspace() + { + assertThat(tableReferenceFactory.forTable("keyspace1", "table1")).isNull(); + } + + @Test + public void testNullWithoutTable() + { + mockEmptyKeyspace("keyspace1"); + + assertThat(tableReferenceFactory.forTable("keyspace1", "table1")).isNull(); + } + + private void mockEmptyKeyspace(String keyspace) + { + KeyspaceMetadata keyspaceMetadata = mock(KeyspaceMetadata.class); + when(keyspaceMetadata.getName()).thenReturn(CqlIdentifier.fromInternal(keyspace)); + + CqlIdentifier keySpaceIdentifier = CqlIdentifier.fromInternal(keyspace); + mockedKeyspaces.put(keySpaceIdentifier, keyspaceMetadata); + + when(mockMetadata.getKeyspace(eq(keyspace))).thenReturn(Optional.of(keyspaceMetadata)); + } + + @Test + public void testTableDoesNotExist() + { + TableMetadata tableMetadata = mockRemovedTable("keyspace1", "table1"); + + assertThat(tableReferenceFactory.forTable("keyspace1", "table1")).isNull(); + + TableReference tableReference = tableReferenceFactory.forTable(tableMetadata); + assertThat(tableReference.getKeyspace()).isEqualTo(tableMetadata.getKeyspace().asInternal()); + assertThat(tableReference.getTable()).isEqualTo(tableMetadata.getName().asInternal()); + assertThat(tableReference.getId()).isEqualTo(tableMetadata.getId().get()); + } + + private TableMetadata mockRemovedTable(String keyspace, String table) + { + CqlIdentifier keySpaceIdentifier = CqlIdentifier.fromInternal(keyspace); + KeyspaceMetadata keyspaceMetadata = mockedKeyspaces.computeIfAbsent(keySpaceIdentifier, k -> { + KeyspaceMetadata mockedKeyspace = mock(KeyspaceMetadata.class); + when(mockedKeyspace.getName()).thenReturn(keySpaceIdentifier); + return mockedKeyspace; + }); + + TableMetadata tableMetadata = mock(TableMetadata.class); + when(tableMetadata.getId()).thenReturn(Optional.of(UUID.randomUUID())); + when(tableMetadata.getName()).thenReturn(CqlIdentifier.fromInternal(table)); + Map options = new HashMap<>(); + options.put(CqlIdentifier.fromInternal("gc_grace_seconds"), MockTableReferenceFactory.DEFAULT_GC_GRACE_SECONDS); + when(tableMetadata.getOptions()).thenReturn(options); + doReturn(keyspaceMetadata.getName()).when(tableMetadata).getKeyspace(); + + when(keyspaceMetadata.getTable(eq(table))).thenReturn(Optional.empty()); + when(mockMetadata.getKeyspace(eq(keyspace))).thenReturn(Optional.of(keyspaceMetadata)); + return tableMetadata; + } + + private void mockTable(String keyspace, String table) + { + CqlIdentifier keySpaceIdentifier = CqlIdentifier.fromInternal(keyspace); + KeyspaceMetadata keyspaceMetadata = mockedKeyspaces.computeIfAbsent(keySpaceIdentifier, k -> { + KeyspaceMetadata mockedKeyspace = mock(KeyspaceMetadata.class); + when(mockedKeyspace.getName()).thenReturn(keySpaceIdentifier); + return mockedKeyspace; + }); + + TableMetadata tableMetadata = mock(TableMetadata.class); + when(tableMetadata.getId()).thenReturn(Optional.of(UUID.randomUUID())); + when(tableMetadata.getName()).thenReturn(CqlIdentifier.fromInternal(table)); + Map options = new HashMap<>(); + options.put(CqlIdentifier.fromInternal("gc_grace_seconds"), MockTableReferenceFactory.DEFAULT_GC_GRACE_SECONDS); + when(tableMetadata.getOptions()).thenReturn(options); + doReturn(keyspaceMetadata.getName()).when(tableMetadata).getKeyspace(); + + if (Strings.needsDoubleQuotes(table)) + { + when(keyspaceMetadata.getTable(eq("\""+table+"\""))).thenReturn(Optional.of(tableMetadata)); + } + else + { + when(keyspaceMetadata.getTable(eq(table))).thenReturn(Optional.of(tableMetadata)); + } + if (Strings.needsDoubleQuotes(keyspace)) + { + when(mockMetadata.getKeyspace(eq("\""+keyspace+"\""))).thenReturn(Optional.of(keyspaceMetadata)); + } + else + { + when(mockMetadata.getKeyspace(eq(keyspace))).thenReturn(Optional.of(keyspaceMetadata)); + } + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxy.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxy.java new file mode 100644 index 000000000..8b5d9d245 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxy.java @@ -0,0 +1,149 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.jmx; + +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import java.util.UUID; +import javax.management.NotificationListener; +import javax.management.remote.JMXConnector; + +/** + * Cassandra JMX proxy interface used to interact with a Cassandra node using JMX. + */ +public interface DistributedJmxProxy extends Closeable +{ + /** + * Add a listener to the storage service interface. + * + * @param nodeID + * The nodeID to get the JMXConnector + * @param listener + * The listener to add. + * @see #removeStorageServiceListener(UUID, NotificationListener) + */ + void addStorageServiceListener(UUID nodeID, NotificationListener listener); + + /** + * Get a list of textual representations of IP addresses of the current live nodes. + * + * @param nodeID + * The nodeID to get the JMXConnector + * @return A list of the live nodes. + * @see #getUnreachableNodes(UUID) + */ + List getLiveNodes(UUID nodeID); + + /** + * Get a list of textual representations of IP addresses of the current unreachable nodes. + * + * @return A list of the unreachable nodes. + * @see #getLiveNodes(UUID) + */ + List getUnreachableNodes(UUID nodeID); + + /** + * Perform a repair using the provided keyspace and options. + * + * @param keyspace + * The keyspace to repair. + * @param options + * The options for the repair. + * @return a positive value if a repair was started, zero otherwise. + * + * @see com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairOptions + */ + int repairAsync(UUID nodeID, String keyspace, Map options); + + /** + * Force the termination of all repairs session on all the nodes. + *

+ * This will not terminate repairs on other nodes but will affect other nodes running repair. + */ + void forceTerminateAllRepairSessions(); + + /** + * Force the termination of all repair session on the specified node. + *

+ * @param nodeID + * The nodeID to get the JMXConnector + * This will not terminate repairs on other nodes but will affect other nodes running repair. + */ + void forceTerminateAllRepairSessionsInSpecificNode(UUID nodeID); + /** + * Remove a listener from the storage service interface. + * @param nodeID + * The nodeID to get the JMXConnector + * @param listener + * The listener to remove. + * + * @see #addStorageServiceListener(UUID, NotificationListener) + */ + void removeStorageServiceListener(UUID nodeID, NotificationListener listener); + + /** + * Get the live disk space used for the provided table. + * @param nodeID + * The nodeID to get the JMXConnector + * @param tableReference + * The table to get the live disk space for. + * @return The live disk space used by the provided table. + */ + long liveDiskSpaceUsed(UUID nodeID, TableReference tableReference); + + /** + * Get max repaired at for the provided table. + * Only usable when running incremental repairs. + * + * @param nodeID + * The nodeID to get the JMXConnector + * @param tableReference The table to get max repaired at for. + * @return Max repaired at or 0 if it cannot be determined. + */ + long getMaxRepairedAt(UUID nodeID, TableReference tableReference); + + /** + * Gets repaired ratio for a specific table. + * Only usable when running incremental repairs. + * + * @param nodeID + * The nodeID to get the JMXConnector + * @param tableReference The table to get repaired ratio for. + * @return The repaired ratio or 0 if it cannot be determined. + */ + double getPercentRepaired(UUID nodeID, TableReference tableReference); + + /** + * Retrieves the current operational status of the local Cassandra node via JMX. + * Returns a string indicating the node's state (e.g., "NORMAL", "JOINING", "LEAVING", "MOVING") + * or "Unknown" if the status is undeterminable. + * + * @param nodeID + * The nodeID to get the JMXConnector + * @return A string representing the node's status. + */ + String getNodeStatus(UUID nodeID); + + /** + * validate if the given JMXConnector is available. + * + * @param jmxConnector + * The jmxConnector to validate + * @return A boolean representing the node's connection status. + */ + boolean validateJmxConnection(JMXConnector jmxConnector); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxyFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxyFactory.java new file mode 100644 index 000000000..6e682787d --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/DistributedJmxProxyFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.jmx; + +import java.io.IOException; + +public interface DistributedJmxProxyFactory +{ + /** + * Connect to the local Cassandra node and get a proxy instance. + *

+ * The returned {@link DistributedJmxProxy} must be closed by the caller. + * + * @return The connected {@link DistributedJmxProxy}. + * @throws IOException Thrown when unable to connect. + */ + DistributedJmxProxy connect() throws IOException; +} + diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/package-info.java new file mode 100644 index 000000000..a4b451edf --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/jmx/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the Interfaces and resources for jmx operations. + */ +package com.ericsson.bss.cassandra.ecchronos.core.jmx; diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/RepairOptions.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/RepairOptions.java new file mode 100644 index 000000000..cab2fd6e1 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/RepairOptions.java @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair.config; + +/** + * The repair options available for the repair. + */ +@SuppressWarnings({"PMD.DataClass", "checkstyle:hideutilityclassconstructor"}) +public class RepairOptions +{ + /** + * Default constructor. + */ + public RepairOptions() + { + } + + /** + * The {@link com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairParallelism repair parallelism} to use for the repair. + *

+ * Possible values are defined in {@link com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairParallelism} + */ + public static final String PARALLELISM_KEY = "parallelism"; + + /** + * If the repair should be on the primary range of the node. + *

+ * Possible values: true | false + */ + public static final String PRIMARY_RANGE_KEY = "primaryRange"; + + /** + * If the repair should be incremental. + *

+ * Possible values: true | false + */ + public static final String INCREMENTAL_KEY = "incremental"; + + /** + * If the repair should be on a certain list of ranges. + *

+ * If this option is used the repair will not be incremental. + *

+ * The values should be of the format: [startToken1]:[endToken1],[startToken2]:[endToken2] + */ + public static final String RANGES_KEY = "ranges"; + + /** + * The tables that should be repaired. + */ + public static final String COLUMNFAMILIES_KEY = "columnFamilies"; + + /** + * The hosts that should be repaired. + *

+ * If this option is used the repair will not be incremental. + *

+ * The values should be of the format: [ip1],[ip2] + */ + public static final String HOSTS_KEY = "hosts"; +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/package-info.java new file mode 100644 index 000000000..ebb772b6c --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/config/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the configs for repair operations. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair.config; diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/package-info.java new file mode 100644 index 000000000..043ff631e --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementations and resources for repair operations. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair; diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 58d55d713..055ce6cfe 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -42,8 +42,7 @@ ecChronos is built to be continuously repairing data in the background. Each ecC Once ecChronos establishes its initial connection with the `contactPoints`, it must register its control over the nodes based on the `type` property, whether JMX or CQL, to make it clear that a single instance will be responsible for managing multiple nodes. Then it would be possible to keep track of what was the last time that the ecChronos instances was able to connect with a node, also for others ecChronos instances keep track about each other's health. -If type is `datacenterAware`, ecChronos will register its control over all the nodes in the specified datacenter; The `rackAware` declares ecChronos is responsible just for a sub-set of racks in the declared list; The `hostAware` funcionality declares ecChronos is resposible just for the specified hosts list. When connection.cql.agent.enabled is true, it must use the AgentNativeConnectionProvider class as default provider. - +If type is `datacenterAware`, ecChronos will register its control over all the nodes in the specified datacenter; The `rackAware` declares ecChronos is responsible just for a sub-set of racks in the declared list; The `hostAware` functionality declares ecChronos is responsible just for the specified hosts list. Configuration is available on ecc.yml in the below format. ```yaml diff --git a/pmd-rules.xml b/pmd-rules.xml index 6485e7866..62530e641 100644 --- a/pmd-rules.xml +++ b/pmd-rules.xml @@ -42,6 +42,7 @@ + diff --git a/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/RepairParallelism.java b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/RepairParallelism.java new file mode 100644 index 000000000..5fc6234bf --- /dev/null +++ b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/RepairParallelism.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.utils.enums.repair; + +/** + * The type of repair parallelism used. + */ +public enum RepairParallelism +{ + /** + * All nodes are repaired in parallel. + */ + PARALLEL("parallel"); + + private final String myName; + + RepairParallelism(final String name) + { + myName = name; + } + + /** + * Get the name of the repair parallelism. + * + * @return The repair parallelism name. + */ + public String getName() + { + return myName; + } +} diff --git a/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/package-info.java b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/package-info.java new file mode 100644 index 000000000..5936b5108 --- /dev/null +++ b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/enums/repair/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the enums related with repair operations. + */ +package com.ericsson.bss.cassandra.ecchronos.utils.enums.repair;