diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeChangeRecord.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeChangeRecord.java index 51e3e259..bb2ebd62 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeChangeRecord.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeChangeRecord.java @@ -25,21 +25,31 @@ enum NodeChangeType UPDATE } - private final Node node; - private final NodeChangeType type; + private final Node myNode; + private final NodeChangeType myType; - public NodeChangeRecord(Node node, NodeChangeType type) + public NodeChangeRecord(final Node node, final NodeChangeType type) { - this.node = node; - this.type = type; + this.myNode = node; + this.myType = type; } + /*** + * returns the node that has changed + * + * @return + */ public Node getNode() { - return node; + return myNode; } + + /*** + * Returns the change type can be either INSERT, DELETE or UPDATE + * @return + */ public NodeChangeType getType() { - return type; + return myType; } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeComparator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeComparator.java index 83369ee3..8d6bfbbd 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeComparator.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeComparator.java @@ -34,6 +34,6 @@ public class NodeComparator implements Comparator @Override public int compare(final Node firstNode, final Node secondNode) { - return firstNode.getHostId().compareTo( secondNode.getHostId()); + return firstNode.getHostId().compareTo(secondNode.getHostId()); } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeListComparator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeListComparator.java index e119fdfa..cebe5d8e 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeListComparator.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/NodeListComparator.java @@ -19,22 +19,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + public class NodeListComparator { private static final Logger LOG = LoggerFactory.getLogger(NodeListComparator.class); /*** - * Compares 2 lists of nodes, finds new nodes, removed nodes and nodes where the ip address has changed - * The lists are sorted before comparison, so lists can be in any order + * Compares 2 lists of nodes, finds new nodes, removed nodes and nodes where the ip address has changed. + * The lists are sorted before comparison, so lists can be in any order. * @param oldNodes * @param newNodes * @return a list of NodeChangeRecord items, zero items in the list indicate the 2 lists are the same. */ - List compareNodeLists(List oldNodes, List newNodes ) + List compareNodeLists(final List oldNodes, final List newNodes) { - List changesList= new LinkedList(); + List changesList = new LinkedList(); NodeComparator nodeComparator = new NodeComparator(); oldNodes.sort(nodeComparator); newNodes.sort(nodeComparator); @@ -47,15 +50,16 @@ List compareNodeLists(List oldNodes, List newNodes oldNode = getNode(oldIterator); newNode = getNode(newIterator); - while ( oldNode != null ){ - + while (oldNode != null) + { if (newNode == null) { LOG.info("Node has been removed, Node id: {} ", oldNode.getHostId()); - changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE )); + changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE)); oldNode = getNode(oldIterator); } - else { + else + { if (oldNode.getHostId().equals(newNode.getHostId())) { // same host id, now check the ipaddress is still the same @@ -84,8 +88,8 @@ List compareNodeLists(List oldNodes, List newNodes } } } - while ( newNode != null) { - + while ( newNode != null) + { changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT )); LOG.info("Node has been added, Node id: {}", newNode.getHostId()); newNode = getNode(newIterator); @@ -93,7 +97,8 @@ List compareNodeLists(List oldNodes, List newNodes return changesList; } - private Node getNode(Iterator iterator) { + private Node getNode(Iterator iterator) + { Node node; if ( iterator.hasNext()) node = iterator.next(); diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ReloadNodesService.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ReloadNodesService.java index 34ce6183..d773c98f 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ReloadNodesService.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ReloadNodesService.java @@ -23,14 +23,13 @@ import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.PostConstruct; -import java.util.*; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.stereotype.Service; - import java.io.IOException; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -73,32 +72,36 @@ public void startScheduler() { long reLoadIntervalInMills = reLoadInterval.getInterval(TimeUnit.MILLISECONDS); LOG.info("Starting ReloadNodesService with reLoadInterval={} ms", reLoadIntervalInMills); - myScheduler.scheduleWithFixedDelay(this::reloadNodes, reLoadIntervalInMills,reLoadIntervalInMills, TimeUnit.MILLISECONDS); + myScheduler.scheduleWithFixedDelay(this::reloadNodes, reLoadIntervalInMills,reLoadIntervalInMills, TimeUnit.MILLISECONDS); } @VisibleForTesting - void reloadNodes() { + void reloadNodes() + { List oldNodes = myDistributedNativeConnectionProvider.getNodes(); List newNodes = myDistributedNativeConnectionProvider.reloadNodes(); CqlSession cqlSession = myDistributedNativeConnectionProvider.getCqlSession(); List nodeChangeList = nodeListComparator.compareNodeLists(oldNodes,newNodes); - if (!nodeChangeList.isEmpty()){ + if (!nodeChangeList.isEmpty()) + { myDistributedNativeConnectionProvider.setNodes(newNodes); Iterator iterator = nodeChangeList.iterator(); - while (iterator.hasNext()){ + while (iterator.hasNext()) + { NodeChangeRecord nodeChangeRecord = iterator.next(); if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.INSERT){ myEccNodesSync.verifyAcquireNode(nodeChangeRecord.getNode()); - try { + try + { myJmxConnectionProvider.add(nodeChangeRecord.getNode()); } catch (IOException e) { LOG.info("Node {} JMX connection failed", nodeChangeRecord.getNode().getHostId() ); } - } if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.DELETE){ myEccNodesSync.deleteNodeStatus(nodeChangeRecord.getNode().getDatacenter(),nodeChangeRecord.getNode().getHostId()); - try { + try + { myJmxConnectionProvider.close(nodeChangeRecord.getNode().getHostId()); } catch (IOException e ) @@ -108,117 +111,11 @@ void reloadNodes() { } } } - - - - - /* - List unavailableNodes = findUnavailableNodes(); - - if (unavailableNodes.isEmpty()) - { - return; - } - - unavailableNodes.forEach(this::retryConnectionForNode); - */ - } - -/* - private List findUnavailableNodes() - { - List unavailableNodes = new ArrayList<>(); - ResultSet resultSet = myEccNodesSync.getResultSet(); - - for (Row row : resultSet) - { - UUID nodeId = row.getUuid(COLUMN_NODE_ID); - String status = Objects.requireNonNull(row.getString(COLUMN_NODE_STATUS)).toUpperCase(Locale.ENGLISH); - - if (NodeStatus.UNAVAILABLE.name().equals(status)) - { - myDistributedNativeConnectionProvider.getNodes() - .stream() - .filter(node -> Objects.equals(node.getHostId(), nodeId)) - .findFirst() - .ifPresent(unavailableNodes::add); - } - } - - return unavailableNodes; - } - - private void retryConnectionForNode(final Node node) - { - UUID nodeId = node.getHostId(); - for (int attempt = 1; attempt <= retryBackoffStrategy.getMaxAttempts(); attempt++) - { - if (tryReconnectToNode(node, nodeId, attempt)) - { - return; // Successfully reconnected, exit method - } - } - markNodeUnreachable(node, nodeId); - } - - private boolean tryReconnectToNode(final Node node, final UUID nodeId, final int attempt) - { - long delayMillis = retryBackoffStrategy.calculateDelay(attempt); - LOG.warn("Attempting to reconnect to node: {}, attempt: {}", nodeId, attempt); - - if (establishConnectionToNode(node)) - { - LOG.info("Successfully reconnected to node: {}", nodeId); - myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId); - return true; - } - else - { - LOG.warn("Failed to reconnect to node: {}, next retry in {} ms", nodeId, delayMillis); - retryBackoffStrategy.sleepBeforeNextRetry(delayMillis); - return false; - } - } - - private void markNodeUnreachable(final Node node, final UUID nodeId) - { - LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId); - myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId); } - private boolean establishConnectionToNode(final Node node) - { - UUID nodeId = node.getHostId(); - JMXConnector jmxConnector = myJmxConnectionProvider.getJmxConnector(nodeId); - boolean isConnected = jmxConnector != null && isConnected(jmxConnector); - - if (isConnected) - { - myJmxConnectionProvider.getJmxConnections().put(nodeId, jmxConnector); - LOG.info("Node {} connected successfully.", nodeId); - } - else - { - LOG.warn("Failed to connect to node {}.", nodeId); - } - - return isConnected; - } - - private boolean isConnected(final JMXConnector jmxConnector) - { - try - { - jmxConnector.getConnectionId(); - return true; - } - catch (IOException e) - { - LOG.error("Error while checking connection for JMX connector", e); - return false; - } - } -*/ + /*** + * Shutsdown the scheduler + */ @Override public void destroy() { @@ -226,11 +123,5 @@ public void destroy() RetryServiceShutdownManager.shutdownExecutorService(myScheduler, DEFAULT_SCHEDULER_AWAIT_TERMINATION_IN_SECONDS, TimeUnit.SECONDS); LOG.info("RetrySchedulerService shut down complete."); } - - @VisibleForTesting - ScheduledExecutorService getMyScheduler() - { - return myScheduler; - } }