diff --git a/.gitignore b/.gitignore index 190182f35..94278c9f2 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ dependency-reduced-pom.xml *htmlcov application/statistics/ statistics/ +.vscode/ + diff --git a/CHANGES.md b/CHANGES.md index 7466dc352..659593904 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,7 +3,7 @@ ## Version 1.0.0 (Not yet Released) * Cassandra-Based Distributed Locks #741 -* Update nodes when cluster changes, nodes removed or added #699 +* Hot Reload of Nodes List - Issue #699 * Investigate Creation of RepairScheduler and ScheduleManager #714 * Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740 * Implement RepairGroup Class for Managing and Executing Repair Tasks - Issue #738 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java index 277676a94..7ad8de2e1 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java @@ -56,5 +56,4 @@ public final String toString() { return String.format("Connection(cql=%s, jmx=%s)", myCqlConnection, myJmxConnection); } - } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java index a7b8c4097..3a9000469 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentJmxConnectionProvider.java @@ -14,6 +14,7 @@ */ package com.ericsson.bss.cassandra.ecchronos.application.providers; +import com.datastax.oss.driver.api.core.metadata.Node; import com.ericsson.bss.cassandra.ecchronos.application.config.security.Credentials; import com.ericsson.bss.cassandra.ecchronos.application.config.security.JmxTLSConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.security.Security; @@ -159,6 +160,18 @@ public void close(final UUID nodeID) throws IOException myDistributedJmxConnectionProviderImpl.close(nodeID); } + /** + * Creates a new connection a node. + * @param node + * + * @throws IOException + */ + @Override + public void add(final Node node) throws IOException + { + myDistributedJmxConnectionProviderImpl.add(node); + } + /** * Closes all JMX connections managed by this provider. * diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java index 5073e7c23..ff5f4ab42 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java @@ -85,7 +85,7 @@ public AgentNativeConnectionProvider( sslEngineFactory = certificateHandler; } - DistributedNativeBuilder nativeConnectionBuilder = + DistributedNativeBuilder nativeConnectionBuilder = DistributedNativeConnectionProviderImpl.builder() .withInitialContactPoints(resolveInitialContactPoints(agentConnectionConfig.getContactPoints())) .withAgentType(agentConnectionConfig.getType()) @@ -94,6 +94,7 @@ public AgentNativeConnectionProvider( .withSslEngineFactory(sslEngineFactory) .withSchemaChangeListener(defaultRepairConfigurationProvider) .withNodeStateListener(defaultRepairConfigurationProvider); + LOG.info("Preparing Agent Connection Config"); nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig); LOG.info("Establishing Connection With Nodes"); @@ -268,4 +269,32 @@ public void close() throws IOException { myDistributedNativeConnectionProviderImpl.close(); } + /** + * Add a nw node to the list of nodes. + * @param myNode + */ + @Override + public void addNode(final Node myNode) + { + myDistributedNativeConnectionProviderImpl.addNode(myNode); + } + /** + * Remove node for the list of nodes. + * @param myNode + */ + @Override + public void removeNode(final Node myNode) + { + myDistributedNativeConnectionProviderImpl.removeNode(myNode); + } + /** + * Checks the node is on the list of specified dc's/racks/nodes. + * @param node + * @return + */ + @Override + public Boolean confirmNodeValid(final Node node) + { + return myDistributedNativeConnectionProviderImpl.confirmNodeValid(node); + } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java index 58f5f507b..9e2376e35 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java @@ -81,6 +81,9 @@ public ECChronos( .withNodesList(nativeConnectionProvider.getNodes()) .withReplicatedTableProvider(myECChronosInternals.getReplicatedTableProvider()) .withRepairConfiguration(repairConfigurationProvider::get) + .withEccNodesSync(eccNodesSync) + .withJmxConnectionProvider(jmxConnectionProvider) + .withDistributedNativeConnectionProvider(nativeConnectionProvider) .withTableReferenceFactory(myECChronosInternals.getTableReferenceFactory())); myECChronosInternals.addRunPolicy(myTimeBasedRunPolicy); diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java index 57849c890..ec12b7969 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java @@ -131,7 +131,8 @@ public final DistributedJmxConnectionProviderImpl build() throws IOException createConnections(); return new DistributedJmxConnectionProviderImpl( myNodesList, - myJMXConnections + myJMXConnections, + this ); } @@ -152,7 +153,12 @@ private void createConnections() throws IOException } } - private void reconnect(final Node node) throws IOException, EcChronosException + /*** + * Creates a JMXconnection to the host. + * @param node + * @throws EcChronosException + */ + public void reconnect(final Node node) throws EcChronosException { try { @@ -246,7 +252,16 @@ private Integer getJMXPort(final Node node) .setNode(node) .build(); Row row = mySession.execute(simpleStatement).one(); - if (row != null) + if ((row == null) || (row.getString("value") == null)) + { + simpleStatement = SimpleStatement + .builder("SELECT value FROM system_views.system_properties WHERE name = 'cassandra.jmx.local.port';") + .setNode(node) + .build(); + row = mySession.execute(simpleStatement).one(); + + } + if ((row != null) && (row.getString("value") != null)) { return Integer.parseInt(Objects.requireNonNull(row.getString("value"))); } diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java index 7a9af5f8c..dbf78ef50 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java @@ -244,10 +244,15 @@ public final DistributedNativeConnectionProviderImpl build() LOG.info("Requesting Nodes List"); List nodesList = createNodesList(session); LOG.info("Nodes list was created with success"); - return new DistributedNativeConnectionProviderImpl(session, nodesList); + return new DistributedNativeConnectionProviderImpl(session, nodesList, this); } - private List createNodesList(final CqlSession session) + /** + * Creates a list of nodes based on the connection type, reads the node list from the database. + * @param session the connection information to the database + * @return list of nodes + */ + public List createNodesList(final CqlSession session) { List tmpNodeList = new ArrayList<>(); switch (myType) @@ -269,6 +274,49 @@ private List createNodesList(final CqlSession session) return tmpNodeList; } + /** + * Checks the node is on the list of specified dc's/racks/nodes. + * @param node + * @return + */ + public Boolean confirmNodeValid(final Node node) + { + switch (myType) + { + case datacenterAware: + return confirmDatacenterNodeValid(node, myDatacenterAware); + case rackAware: + return confirmRackNodeValid(node, myRackAware); + case hostAware: + return confirmHostNodeValid(node, myHostAware); + + default: + } + return false; + } + + private Boolean confirmDatacenterNodeValid(final Node node, final List datacenterNames) + { + return (datacenterNames.contains(node.getDatacenter())); + } + + private Boolean confirmRackNodeValid(final Node node, final List> rackInfo) + { + Set> racksInfoSet = new HashSet<>(rackInfo); + Map tmpRackInfo = new HashMap<>(); + tmpRackInfo.put("datacenterName", node.getDatacenter()); + tmpRackInfo.put("rackName", node.getRack()); + return (racksInfoSet.contains(tmpRackInfo)); + } + + private Boolean confirmHostNodeValid(final Node node, final List hostsInfo) + { + Set hostsInfoSet = new HashSet<>(hostsInfo); + + InetSocketAddress tmpAddress = (InetSocketAddress) node.getEndPoint().resolve(); + return (hostsInfoSet.contains(tmpAddress)); + } + private CqlSession createSession(final DistributedNativeBuilder builder) { CqlSessionBuilder sessionBuilder = fromBuilder(builder); diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java index 816fd18d8..ea08bf2fe 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java @@ -25,11 +25,16 @@ import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedJmxBuilder; import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DistributedJmxConnectionProviderImpl implements DistributedJmxConnectionProvider { + private static final Logger LOG = LoggerFactory.getLogger(DistributedJmxConnectionProviderImpl.class); private final List myNodesList; private final ConcurrentHashMap myJMXConnections; + private final DistributedJmxBuilder myDistributedJmxBuilder; /** * Constructs a DistributedJmxConnectionProviderImpl with the specified list of nodes and JMX connections. @@ -41,11 +46,13 @@ public class DistributedJmxConnectionProviderImpl implements DistributedJmxConne */ public DistributedJmxConnectionProviderImpl( final List nodesList, - final ConcurrentHashMap jmxConnections + final ConcurrentHashMap jmxConnections, + final DistributedJmxBuilder distributedJmxBuilder ) { myNodesList = nodesList; myJMXConnections = jmxConnections; + myDistributedJmxBuilder = distributedJmxBuilder; } /** @@ -145,4 +152,23 @@ public void close(final UUID nodeID) throws IOException myJMXConnections.get(nodeID).close(); } + /** + * Add a node and create a JMXconnection. + * @param node + * @throws IOException + */ + @Override + public void add(final Node node) throws IOException + { + try + { + myDistributedJmxBuilder.reconnect(node); + } + catch (EcChronosException e) + { + LOG.warn("Unable to connect with node {} connection refused: {}", node.getHostId(), e.getMessage()); + } + + } + } diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java index 191d5cde3..7c48a049b 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java @@ -26,6 +26,7 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ { private final CqlSession mySession; private final List myNodes; + private final DistributedNativeBuilder myDistributedNativeBuilder; /** * Constructs a new {@code DistributedNativeConnectionProviderImpl} with the specified {@link CqlSession} and list @@ -36,11 +37,15 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ * @param nodesList * the list of {@link Node} instances representing the nodes in the cluster. */ - public DistributedNativeConnectionProviderImpl(final CqlSession session, - final List nodesList) + public DistributedNativeConnectionProviderImpl( + final CqlSession session, + final List nodesList, + final DistributedNativeBuilder distributedNativeBuilder + ) { mySession = session; myNodes = nodesList; + myDistributedNativeBuilder = distributedNativeBuilder; } /** @@ -65,6 +70,8 @@ public List getNodes() return myNodes; } + + /** * Closes the {@link CqlSession} associated with this connection provider. * @@ -88,4 +95,35 @@ public static DistributedNativeBuilder builder() return new DistributedNativeBuilder(); } + /** + * Add a nw node to the list of nodes. + * @param node + */ + @Override + public void addNode(final Node node) + { + myNodes.add(node); + } + + /** + * Remove node for the list of nodes. + * @param node + */ + @Override + public void removeNode(final Node node) + { + myNodes.remove(node); + } + + /** + * Checks the node is on the list of specified dc's/racks/nodes. + * @param node + * @return + */ + @Override + public Boolean confirmNodeValid(final Node node) + { + return myDistributedNativeBuilder.confirmNodeValid(node); + } + } diff --git a/connection.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/TestNodeInclusion.java b/connection.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/TestNodeInclusion.java new file mode 100644 index 000000000..89381248e --- /dev/null +++ b/connection.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/TestNodeInclusion.java @@ -0,0 +1,162 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.connection.impl.providers; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.ContactEndPoint; +import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.net.InetSocketAddress; +import java.util.*; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.*; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class TestNodeInclusion +{ + @Mock + private CqlSession mySessionMock; + + @Mock + private Metadata myMetadataMock; + + private final Map myNodes = new HashMap<>(); + + @Mock + private Node mockNodeDC1Rack1; + + @Mock + private Node mockNodeDC1Rack2; + + @Mock + private Node mockNodeDC2Rack1; + + @Mock + private Node mockNodeDC2Rack2; + + private final ContactEndPoint endPointNodeDC1Rack1 = new ContactEndPoint("127.0.0.1", 9042); + + private final ContactEndPoint endPointNodeDC1rack2 = new ContactEndPoint("127.0.0.2", 9042); + + private final ContactEndPoint endPointNodeDC2Rack1 = new ContactEndPoint("127.0.0.3", 9042); + + private final ContactEndPoint endPointNodeDC2Rack2 = new ContactEndPoint("127.0.0.4", 9042); + + private final List contactPoints = new ArrayList<>(); + + @Before + public void setup() + { + contactPoints.add(new InetSocketAddress("127.0.0.1", 9042)); + contactPoints.add(new InetSocketAddress("127.0.0.2", 9042)); + + when(mockNodeDC1Rack1.getDatacenter()).thenReturn("datacenter1"); + when(mockNodeDC1Rack2.getDatacenter()).thenReturn("datacenter1"); + when(mockNodeDC2Rack1.getDatacenter()).thenReturn("datacenter2"); + when(mockNodeDC2Rack2.getDatacenter()).thenReturn("datacenter2"); + + when(mockNodeDC1Rack1.getRack()).thenReturn("rack1"); + when(mockNodeDC1Rack2.getRack()).thenReturn("rack2"); + when(mockNodeDC2Rack1.getRack()).thenReturn("rack1"); + when(mockNodeDC2Rack2.getRack()).thenReturn("rack2"); + + when(mockNodeDC1Rack1.getEndPoint()).thenReturn(endPointNodeDC1Rack1); + when(mockNodeDC1Rack2.getEndPoint()).thenReturn(endPointNodeDC1rack2); + when(mockNodeDC2Rack1.getEndPoint()).thenReturn(endPointNodeDC2Rack1); + when(mockNodeDC2Rack2.getEndPoint()).thenReturn(endPointNodeDC2Rack2); + + when(mockNodeDC1Rack1.getState()).thenReturn(NodeState.UP); + when(mockNodeDC1Rack2.getState()).thenReturn(NodeState.UP); + when(mockNodeDC2Rack1.getState()).thenReturn(NodeState.UP); + when(mockNodeDC2Rack2.getState()).thenReturn(NodeState.UP); + + myNodes.put(UUID.randomUUID(), mockNodeDC1Rack1); + myNodes.put(UUID.randomUUID(), mockNodeDC1Rack2); + myNodes.put(UUID.randomUUID(), mockNodeDC2Rack1); + myNodes.put(UUID.randomUUID(), mockNodeDC2Rack2); + + when(myMetadataMock.getNodes()).thenReturn(myNodes); + when(mySessionMock.getMetadata()).thenReturn(myMetadataMock); + } + + @Test + public void testValidNodesDatacenterAware() + { + List datacentersInfo = new ArrayList<>(); + datacentersInfo.add("datacenter1"); + + DistributedNativeBuilder provider = DistributedNativeConnectionProviderImpl.builder() + .withInitialContactPoints(contactPoints) + .withAgentType(ConnectionType.datacenterAware) + .withDatacenterAware(datacentersInfo); + + assertTrue(provider.confirmNodeValid(mockNodeDC1Rack1)); + assertTrue(provider.confirmNodeValid(mockNodeDC1Rack2)); + assertFalse(provider.confirmNodeValid(mockNodeDC2Rack1)); + assertFalse(provider.confirmNodeValid(mockNodeDC2Rack2)); + } + @Test + public void testValidNodesRackAware() + { + List> rackList = new ArrayList<>(); + Map rackInfo = new HashMap<>(); + rackInfo.put("datacenterName", "datacenter1"); + rackInfo.put("rackName", "rack1"); + rackList.add(rackInfo); + + DistributedNativeBuilder provider = DistributedNativeConnectionProviderImpl.builder() + .withInitialContactPoints(contactPoints) + .withAgentType(ConnectionType.rackAware) + .withRackAware(rackList); + + + assertTrue(provider.confirmNodeValid(mockNodeDC1Rack1)); + assertFalse(provider.confirmNodeValid(mockNodeDC1Rack2)); + assertFalse(provider.confirmNodeValid(mockNodeDC2Rack1)); + assertFalse(provider.confirmNodeValid(mockNodeDC2Rack2)); + + } + @Test + public void testResolveHostAware() + { + List hostList = new ArrayList<>(); + hostList.add(new InetSocketAddress("127.0.0.1", 9042)); + hostList.add(new InetSocketAddress("127.0.0.2", 9042)); + hostList.add(new InetSocketAddress("127.0.0.3", 9042)); + DistributedNativeBuilder provider = DistributedNativeConnectionProviderImpl.builder() + .withInitialContactPoints(contactPoints) + .withAgentType(ConnectionType.hostAware) + .withHostAware(hostList); + + assertTrue(provider.confirmNodeValid(mockNodeDC1Rack1)); + assertTrue(provider.confirmNodeValid(mockNodeDC1Rack2)); + assertTrue(provider.confirmNodeValid(mockNodeDC2Rack1)); + assertFalse(provider.confirmNodeValid(mockNodeDC2Rack2)); + + } + + +} \ No newline at end of file diff --git a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java index 17961619a..c34f43c75 100644 --- a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java +++ b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedJmxConnectionProvider.java @@ -14,6 +14,8 @@ */ package com.ericsson.bss.cassandra.ecchronos.connection; +import com.datastax.oss.driver.api.core.metadata.Node; + import java.io.Closeable; import java.io.IOException; import java.util.UUID; @@ -35,4 +37,6 @@ default void close() throws IOException } void close(UUID nodeID) throws IOException; + + void add(Node node) throws IOException; } diff --git a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java index 45dfb2ac2..6dde865e6 100644 --- a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java +++ b/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/DistributedNativeConnectionProvider.java @@ -27,8 +27,13 @@ public interface DistributedNativeConnectionProvider extends Closeable List getNodes(); + @Override default void close() throws IOException { } + void addNode(Node myNode); + + void removeNode(Node myNode); + Boolean confirmNodeValid(Node node); } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/NodeAddedAction.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/NodeAddedAction.java new file mode 100644 index 000000000..aa5efac22 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/NodeAddedAction.java @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.refresh; + +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import com.datastax.oss.driver.api.core.metadata.Node; + +public class NodeAddedAction implements Callable +{ + private static final Logger LOG = LoggerFactory.getLogger(NodeAddedAction.class); + + private final EccNodesSync myEccNodesSync; + private final DistributedJmxConnectionProvider myJmxConnectionProvider; + private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider; + private final Node myNode; + + public NodeAddedAction(final EccNodesSync eccNodesSync, final DistributedJmxConnectionProvider jmxConnectionProvider, final DistributedNativeConnectionProvider distributedNativeConnectionProvider, final Node node) + { + myEccNodesSync = eccNodesSync; + myJmxConnectionProvider = jmxConnectionProvider; + myDistributedNativeConnectionProvider = distributedNativeConnectionProvider; + myNode = node; + } + + /** + * Adds the node. + * @return + */ + @Override + public Boolean call() + { + Boolean result = true; + + if (!myDistributedNativeConnectionProvider.confirmNodeValid(myNode)) + { + return result; + } + + LOG.info("Node Up {}", myNode.getHostId()); + myEccNodesSync.verifyAcquireNode(myNode); + try + { + myJmxConnectionProvider.add(myNode); + } + catch (IOException e) + { + LOG.warn("Node {} JMX connection failed", myNode.getHostId()); + result = false; + } + myDistributedNativeConnectionProvider.addNode(myNode); + + return result; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/NodeRemovedAction.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/NodeRemovedAction.java new file mode 100644 index 000000000..a8723d2b0 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/NodeRemovedAction.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.refresh; + +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import com.datastax.oss.driver.api.core.metadata.Node; + +public class NodeRemovedAction implements Callable +{ + private static final Logger LOG = LoggerFactory.getLogger(NodeRemovedAction.class); + + private final EccNodesSync myEccNodesSync; + private final DistributedJmxConnectionProvider myJmxConnectionProvider; + private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider; + private final Node myNode; + + public NodeRemovedAction(final EccNodesSync eccNodesSync, final DistributedJmxConnectionProvider jmxConnectionProvider, final DistributedNativeConnectionProvider distributedNativeConnectionProvider, final Node node) + { + myEccNodesSync = eccNodesSync; + myJmxConnectionProvider = jmxConnectionProvider; + myNode = node; + myDistributedNativeConnectionProvider = distributedNativeConnectionProvider; + } + + /** + * Removes the node. + * @return + */ + @Override + public Boolean call() + { + Boolean result = true; + + if (!myDistributedNativeConnectionProvider.confirmNodeValid(myNode)) + { + return result; + } + + LOG.info("Node Removed {}", myNode.getHostId()); + myEccNodesSync.deleteNodeStatus(myNode.getDatacenter(), myNode.getHostId()); + try + { + myJmxConnectionProvider.close(myNode.getHostId()); + } + catch (IOException e) + { + LOG.warn("Node {} JMX connection removal failed", myNode.getHostId()); + result = false; + } + + myDistributedNativeConnectionProvider.removeNode(myNode); + + return result; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/package-info.java new file mode 100644 index 000000000..f06799cde --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/refresh/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementations and resources for repair operations. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.refresh; diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/DefaultRepairConfigurationProvider.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/DefaultRepairConfigurationProvider.java index 180784159..5b2eca338 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/DefaultRepairConfigurationProvider.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/DefaultRepairConfigurationProvider.java @@ -14,6 +14,10 @@ */ package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.refresh.NodeAddedAction; +import com.ericsson.bss.cassandra.ecchronos.core.impl.refresh.NodeRemovedAction; import com.ericsson.bss.cassandra.ecchronos.core.metadata.Metadata; import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RepairScheduler; @@ -23,6 +27,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.HashSet; @@ -41,6 +47,7 @@ import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.type.UserDefinedType; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +60,7 @@ public class DefaultRepairConfigurationProvider extends NodeStateListenerBase implements SchemaChangeListener { private static final Logger LOG = LoggerFactory.getLogger(DefaultRepairConfigurationProvider.class); + private static final Integer NO_OF_THREADS = 1; private CqlSession mySession; private List myNodes; @@ -60,13 +68,17 @@ public class DefaultRepairConfigurationProvider extends NodeStateListenerBase im private RepairScheduler myRepairScheduler; private Function> myRepairConfigurationFunction; private TableReferenceFactory myTableReferenceFactory; + private final ExecutorService myService; + private EccNodesSync myEccNodesSync; + private DistributedJmxConnectionProvider myJmxConnectionProvider; + private DistributedNativeConnectionProvider myAgentNativeConnectionProvider; /** * Default constructor. */ public DefaultRepairConfigurationProvider() { - //NOOP + myService = Executors.newFixedThreadPool(NO_OF_THREADS); } private DefaultRepairConfigurationProvider(final Builder builder) @@ -78,8 +90,12 @@ private DefaultRepairConfigurationProvider(final Builder builder) myRepairConfigurationFunction = builder.myRepairConfigurationFunction; myTableReferenceFactory = Preconditions.checkNotNull(builder.myTableReferenceFactory, "Table reference factory must be set"); + myEccNodesSync = builder.myEccNodesSync; + myJmxConnectionProvider = builder.myJmxConnectionProvider; + myAgentNativeConnectionProvider = builder.myAgentNativeConnectionProvider; setupConfiguration(); + myService = Executors.newFixedThreadPool(NO_OF_THREADS); } /** @@ -96,6 +112,9 @@ public void fromBuilder(final Builder builder) myRepairConfigurationFunction = builder.myRepairConfigurationFunction; myTableReferenceFactory = Preconditions.checkNotNull(builder.myTableReferenceFactory, "Table reference factory must be set"); + myEccNodesSync = builder.myEccNodesSync; + myJmxConnectionProvider = builder.myJmxConnectionProvider; + myAgentNativeConnectionProvider = builder.myAgentNativeConnectionProvider; setupConfiguration(); } @@ -440,6 +459,30 @@ public void onDown(final Node node) setupConfiguration(); } + /** + * Callback for when a new node is added to the cluster. + * @param node + */ + @Override + public void onAdd(final Node node) + { + LOG.info("Node added {}", node.getHostId()); + NodeAddedAction callable = new NodeAddedAction(myEccNodesSync, myJmxConnectionProvider, myAgentNativeConnectionProvider, node); + myService.submit(callable); + } + + /** + * callback for when a node is removed from the cluster. + * @param node + */ + @Override + public void onRemove(final Node node) + { + LOG.info("Node removed {}", node.getHostId()); + NodeRemovedAction callable = new NodeRemovedAction(myEccNodesSync, myJmxConnectionProvider, myAgentNativeConnectionProvider, node); + myService.submit(callable); + } + /** * This will go through all the configuration, given mySession is set, otherwise it will just silently * return. @@ -477,6 +520,10 @@ public static class Builder private RepairScheduler myRepairScheduler; private Function> myRepairConfigurationFunction; private TableReferenceFactory myTableReferenceFactory; + private EccNodesSync myEccNodesSync; + private DistributedJmxConnectionProvider myJmxConnectionProvider; + private DistributedNativeConnectionProvider myAgentNativeConnectionProvider; + /** * Build with session. @@ -563,6 +610,39 @@ public Builder withNodesList(final List nodesList) return this; } + /** + * Build with EccNodesSync. + * @param eccNodesSync + * @return Builder with EccNodesSync + */ + public Builder withEccNodesSync(final EccNodesSync eccNodesSync) + { + myEccNodesSync = eccNodesSync; + return this; + } + + /** + * Build with DistributedNativeConnectionProvider. + * @param agentNativeConnectionProvider + * @return + */ + public Builder withDistributedNativeConnectionProvider(final DistributedNativeConnectionProvider agentNativeConnectionProvider) + { + myAgentNativeConnectionProvider = agentNativeConnectionProvider; + return this; + } + + /** + * Build with DistributedJmxConnectionProvider. + * @param jmxConnectionProvider + * @return Builder with DistributedJmxConnectionProvider + */ + public Builder withJmxConnectionProvider(final DistributedJmxConnectionProvider jmxConnectionProvider) + { + myJmxConnectionProvider = jmxConnectionProvider; + return this; + } + /** * Build. * diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java index d5d9c21d0..db7a495e2 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java @@ -73,6 +73,22 @@ public List getNodes() { return nodesList; } + + @Override + public void addNode(Node myNode) + { + } + + @Override + public void removeNode(Node myNode) + { + } + + @Override + public Boolean confirmNodeValid(Node node) + { + return false; + } }; } diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java index da0e1d1cc..1b5913c68 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java @@ -445,6 +445,21 @@ public List getNodes() { return null; } + @Override + public void addNode(Node myNode) + { + } + + @Override + public void removeNode(Node myNode) + { + } + + @Override + public Boolean confirmNodeValid(Node node) + { + return false; + } }) .withHostStates(hostStates) .withStatementDecorator(s -> s) diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java index 960c05e30..5e4539905 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java @@ -69,6 +69,7 @@ public final class EccNodesSync private final PreparedStatement myCreateStatement; private final PreparedStatement myUpdateStatusStatement; private final PreparedStatement mySelectStatusStatement; + private final PreparedStatement myDeleteStatement; private final Long connectionDelayValue; private final ChronoUnit connectionDelayUnit; @@ -87,6 +88,12 @@ private EccNodesSync(final Builder builder) throws UnknownHostException .value(COLUMN_NODE_ID, bindMarker()) .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + myDeleteStatement = mySession.prepare(QueryBuilder.deleteFrom(KEYSPACE_NAME, TABLE_NAME) + .whereColumn(COLUMN_ECCHRONOS_ID).isEqualTo(bindMarker()) + .whereColumn(COLUMN_DC_NAME).isEqualTo(bindMarker()) + .whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); myUpdateStatusStatement = mySession.prepare(QueryBuilder.update(KEYSPACE_NAME, TABLE_NAME) .setColumn(COLUMN_NODE_STATUS, bindMarker()) .setColumn(COLUMN_LAST_CONNECTION, bindMarker()) @@ -186,7 +193,22 @@ public ResultSet updateNodeStatus( } return tmpResultSet; } - + public ResultSet deleteNodeStatus( + final String datacenterName, + final UUID nodeID + ) + { + ResultSet tmpResultSet = deleteNodeStateStatement(datacenterName, nodeID); + if (tmpResultSet.wasApplied()) + { + LOG.info("Node {} successfully deleted", nodeID); + } + else + { + LOG.error("Unable to delete node {}", nodeID); + } + return tmpResultSet; + } private ResultSet updateNodeStateStatement( final NodeStatus nodeStatus, final String datacenterName, @@ -203,6 +225,18 @@ private ResultSet updateNodeStateStatement( ); return execute(updateNodeStatus); } + private ResultSet deleteNodeStateStatement( + final String datacenterName, + final UUID nodeID + ) + { + BoundStatement deleteNodeStatus = myDeleteStatement.bind( + ecChronosID, + datacenterName, + nodeID + ); + return execute(deleteNodeStatus); + } @VisibleForTesting public ResultSet verifyInsertNodeInfo( diff --git a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java index 681a95426..ce2dd65f4 100644 --- a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java +++ b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/utils/AbstractCassandraTest.java @@ -60,6 +60,21 @@ public List getNodes() { return nodesList; } + + @Override + public void addNode(Node myNode) { + nodesList.add(myNode); + } + + @Override + public void removeNode(Node myNode) { + nodesList.remove(myNode); + } + + @Override + public Boolean confirmNodeValid(Node node) { + return false; + } }; }