Skip to content

Commit

Permalink
Ericsson#699 Reload nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Chandler committed Oct 11, 2024
1 parent 7fa26e1 commit a511275
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,31 @@ enum NodeChangeType
UPDATE
}

private final Node node;
private final NodeChangeType type;
private final Node myNode;
private final NodeChangeType myType;

public NodeChangeRecord(Node node, NodeChangeType type)
public NodeChangeRecord(final Node node, final NodeChangeType type)
{
this.node = node;
this.type = type;
this.myNode = node;
this.myType = type;
}

/***
* returns the node that has changed
*
* @return
*/
public Node getNode()
{
return node;
return myNode;
}

/***
* Returns the change type can be either INSERT, DELETE or UPDATE
* @return
*/
public NodeChangeType getType()
{
return type;
return myType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public class NodeComparator implements Comparator<Node>
@Override
public int compare(final Node firstNode, final Node secondNode)
{
return firstNode.getHostId().compareTo( secondNode.getHostId());
return firstNode.getHostId().compareTo(secondNode.getHostId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;


public class NodeListComparator {

private static final Logger LOG = LoggerFactory.getLogger(NodeListComparator.class);

/***
* Compares 2 lists of nodes, finds new nodes, removed nodes and nodes where the ip address has changed
* The lists are sorted before comparison, so lists can be in any order
* Compares 2 lists of nodes, finds new nodes, removed nodes and nodes where the ip address has changed.
* The lists are sorted before comparison, so lists can be in any order.
* @param oldNodes
* @param newNodes
* @return a list of NodeChangeRecord items, zero items in the list indicate the 2 lists are the same.
*/
List<NodeChangeRecord> compareNodeLists(List<Node> oldNodes, List<Node> newNodes )
List<NodeChangeRecord> compareNodeLists(final List<Node> oldNodes, final List<Node> newNodes)
{
List<NodeChangeRecord> changesList= new LinkedList<NodeChangeRecord>();
List<NodeChangeRecord> changesList = new LinkedList<NodeChangeRecord>();
NodeComparator nodeComparator = new NodeComparator();
oldNodes.sort(nodeComparator);
newNodes.sort(nodeComparator);
Expand All @@ -47,15 +50,16 @@ List<NodeChangeRecord> compareNodeLists(List<Node> oldNodes, List<Node> newNodes
oldNode = getNode(oldIterator);
newNode = getNode(newIterator);

while ( oldNode != null ){

while (oldNode != null)
{
if (newNode == null)
{
LOG.info("Node has been removed, Node id: {} ", oldNode.getHostId());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE ));
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE));
oldNode = getNode(oldIterator);
}
else {
else
{
if (oldNode.getHostId().equals(newNode.getHostId()))
{
// same host id, now check the ipaddress is still the same
Expand Down Expand Up @@ -84,16 +88,17 @@ List<NodeChangeRecord> compareNodeLists(List<Node> oldNodes, List<Node> newNodes
}
}
}
while ( newNode != null) {

while ( newNode != null)
{
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT ));
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
newNode = getNode(newIterator);
}
return changesList;
}

private Node getNode(Iterator<Node> iterator) {
private Node getNode(Iterator<Node> iterator)
{
Node node;
if ( iterator.hasNext())
node = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.PostConstruct;

import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,32 +72,36 @@ public void startScheduler()
{
long reLoadIntervalInMills = reLoadInterval.getInterval(TimeUnit.MILLISECONDS);
LOG.info("Starting ReloadNodesService with reLoadInterval={} ms", reLoadIntervalInMills);
myScheduler.scheduleWithFixedDelay(this::reloadNodes, reLoadIntervalInMills,reLoadIntervalInMills, TimeUnit.MILLISECONDS);
myScheduler.scheduleWithFixedDelay(this::reloadNodes, reLoadIntervalInMills,reLoadIntervalInMills, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
void reloadNodes() {
void reloadNodes()
{
List<Node> oldNodes = myDistributedNativeConnectionProvider.getNodes();
List<Node> newNodes = myDistributedNativeConnectionProvider.reloadNodes();
CqlSession cqlSession = myDistributedNativeConnectionProvider.getCqlSession();
List<NodeChangeRecord> nodeChangeList = nodeListComparator.compareNodeLists(oldNodes,newNodes);
if (!nodeChangeList.isEmpty()){
if (!nodeChangeList.isEmpty())
{
myDistributedNativeConnectionProvider.setNodes(newNodes);
Iterator<NodeChangeRecord> iterator = nodeChangeList.iterator();
while (iterator.hasNext()){
while (iterator.hasNext())
{
NodeChangeRecord nodeChangeRecord = iterator.next();
if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.INSERT){
myEccNodesSync.verifyAcquireNode(nodeChangeRecord.getNode());
try {
try
{
myJmxConnectionProvider.add(nodeChangeRecord.getNode());
} catch (IOException e) {
LOG.info("Node {} JMX connection failed", nodeChangeRecord.getNode().getHostId() );
}

}
if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.DELETE){
myEccNodesSync.deleteNodeStatus(nodeChangeRecord.getNode().getDatacenter(),nodeChangeRecord.getNode().getHostId());
try {
try
{
myJmxConnectionProvider.close(nodeChangeRecord.getNode().getHostId());
}
catch (IOException e )
Expand All @@ -108,129 +111,17 @@ void reloadNodes() {
}
}
}




/*
List<Node> unavailableNodes = findUnavailableNodes();
if (unavailableNodes.isEmpty())
{
return;
}
unavailableNodes.forEach(this::retryConnectionForNode);
*/
}

/*
private List<Node> findUnavailableNodes()
{
List<Node> unavailableNodes = new ArrayList<>();
ResultSet resultSet = myEccNodesSync.getResultSet();
for (Row row : resultSet)
{
UUID nodeId = row.getUuid(COLUMN_NODE_ID);
String status = Objects.requireNonNull(row.getString(COLUMN_NODE_STATUS)).toUpperCase(Locale.ENGLISH);
if (NodeStatus.UNAVAILABLE.name().equals(status))
{
myDistributedNativeConnectionProvider.getNodes()
.stream()
.filter(node -> Objects.equals(node.getHostId(), nodeId))
.findFirst()
.ifPresent(unavailableNodes::add);
}
}
return unavailableNodes;
}
private void retryConnectionForNode(final Node node)
{
UUID nodeId = node.getHostId();
for (int attempt = 1; attempt <= retryBackoffStrategy.getMaxAttempts(); attempt++)
{
if (tryReconnectToNode(node, nodeId, attempt))
{
return; // Successfully reconnected, exit method
}
}
markNodeUnreachable(node, nodeId);
}
private boolean tryReconnectToNode(final Node node, final UUID nodeId, final int attempt)
{
long delayMillis = retryBackoffStrategy.calculateDelay(attempt);
LOG.warn("Attempting to reconnect to node: {}, attempt: {}", nodeId, attempt);
if (establishConnectionToNode(node))
{
LOG.info("Successfully reconnected to node: {}", nodeId);
myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId);
return true;
}
else
{
LOG.warn("Failed to reconnect to node: {}, next retry in {} ms", nodeId, delayMillis);
retryBackoffStrategy.sleepBeforeNextRetry(delayMillis);
return false;
}
}
private void markNodeUnreachable(final Node node, final UUID nodeId)
{
LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId);
myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId);
}

private boolean establishConnectionToNode(final Node node)
{
UUID nodeId = node.getHostId();
JMXConnector jmxConnector = myJmxConnectionProvider.getJmxConnector(nodeId);
boolean isConnected = jmxConnector != null && isConnected(jmxConnector);
if (isConnected)
{
myJmxConnectionProvider.getJmxConnections().put(nodeId, jmxConnector);
LOG.info("Node {} connected successfully.", nodeId);
}
else
{
LOG.warn("Failed to connect to node {}.", nodeId);
}
return isConnected;
}
private boolean isConnected(final JMXConnector jmxConnector)
{
try
{
jmxConnector.getConnectionId();
return true;
}
catch (IOException e)
{
LOG.error("Error while checking connection for JMX connector", e);
return false;
}
}
*/
/***
* Shutsdown the scheduler
*/
@Override
public void destroy()
{
LOG.info("Shutting down RetrySchedulerService...");
RetryServiceShutdownManager.shutdownExecutorService(myScheduler, DEFAULT_SCHEDULER_AWAIT_TERMINATION_IN_SECONDS, TimeUnit.SECONDS);
LOG.info("RetrySchedulerService shut down complete.");
}

@VisibleForTesting
ScheduledExecutorService getMyScheduler()
{
return myScheduler;
}
}

0 comments on commit a511275

Please sign in to comment.