Skip to content

Commit

Permalink
Ericsson#699 Reload nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Chandler committed Oct 11, 2024
1 parent a511275 commit ed87b66
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import com.datastax.oss.driver.api.core.metadata.Node;

public class NodeChangeRecord {
public class NodeChangeRecord
{
enum NodeChangeType
{
INSERT,
Expand All @@ -35,7 +36,7 @@ public NodeChangeRecord(final Node node, final NodeChangeType type)
}

/***
* returns the node that has changed
* returns the node that has changed.
*
* @return
*/
Expand All @@ -45,7 +46,7 @@ public Node getNode()
}

/***
* Returns the change type can be either INSERT, DELETE or UPDATE
* Returns the change type can be either INSERT, DELETE or UPDATE.
* @return
*/
public NodeChangeType getType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.List;


public class NodeListComparator {

public class NodeListComparator
{
private static final Logger LOG = LoggerFactory.getLogger(NodeListComparator.class);

/***
Expand Down Expand Up @@ -65,7 +65,7 @@ List<NodeChangeRecord> compareNodeLists(final List<Node> oldNodes, final List<No
// same host id, now check the ipaddress is still the same
if (!oldNode.getListenAddress().equals(newNode.getListenAddress()))
{
LOG.info("Node id {}, has a different ipaddress, it was {}, it is now {} ", oldNode.getHostId(), oldNode.getListenAddress(), newNode.getListenAddress() );
LOG.info("Node id {}, has a different ipaddress, it was {}, it is now {} ", oldNode.getHostId(), oldNode.getListenAddress(), newNode.getListenAddress());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.UPDATE));
}
oldNode = getNode(oldIterator);
Expand All @@ -88,22 +88,24 @@ List<NodeChangeRecord> compareNodeLists(final List<Node> oldNodes, final List<No
}
}
}
while ( newNode != null)
while (newNode != null)
{
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT ));
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT));
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
newNode = getNode(newIterator);
}
return changesList;
}

private Node getNode(Iterator<Node> iterator)
private Node getNode(final Iterator<Node> iterator)
{
Node node;
if ( iterator.hasNext())
if ( iterator.hasNext()) {
node = iterator.next();
else
}
else {
node = null;
}
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void startScheduler()
{
long reLoadIntervalInMills = reLoadInterval.getInterval(TimeUnit.MILLISECONDS);
LOG.info("Starting ReloadNodesService with reLoadInterval={} ms", reLoadIntervalInMills);
myScheduler.scheduleWithFixedDelay(this::reloadNodes, reLoadIntervalInMills,reLoadIntervalInMills, TimeUnit.MILLISECONDS);
myScheduler.scheduleWithFixedDelay(this::reloadNodes, reLoadIntervalInMills, reLoadIntervalInMills, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
Expand All @@ -81,25 +81,27 @@ void reloadNodes()
List<Node> oldNodes = myDistributedNativeConnectionProvider.getNodes();
List<Node> newNodes = myDistributedNativeConnectionProvider.reloadNodes();
CqlSession cqlSession = myDistributedNativeConnectionProvider.getCqlSession();
List<NodeChangeRecord> nodeChangeList = nodeListComparator.compareNodeLists(oldNodes,newNodes);
List<NodeChangeRecord> nodeChangeList = nodeListComparator.compareNodeLists(oldNodes, newNodes);
if (!nodeChangeList.isEmpty())
{
myDistributedNativeConnectionProvider.setNodes(newNodes);
Iterator<NodeChangeRecord> iterator = nodeChangeList.iterator();
while (iterator.hasNext())
{
NodeChangeRecord nodeChangeRecord = iterator.next();
if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.INSERT){
if (nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.INSERT)
{
myEccNodesSync.verifyAcquireNode(nodeChangeRecord.getNode());
try
{
myJmxConnectionProvider.add(nodeChangeRecord.getNode());
} catch (IOException e) {
LOG.info("Node {} JMX connection failed", nodeChangeRecord.getNode().getHostId() );
} catch (IOException e)
{
LOG.info("Node {} JMX connection failed", nodeChangeRecord.getNode().getHostId());
}
}
if ( nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.DELETE){
myEccNodesSync.deleteNodeStatus(nodeChangeRecord.getNode().getDatacenter(),nodeChangeRecord.getNode().getHostId());
myEccNodesSync.deleteNodeStatus(nodeChangeRecord.getNode().getDatacenter(), nodeChangeRecord.getNode().getHostId());
try
{
myJmxConnectionProvider.close(nodeChangeRecord.getNode().getHostId());
Expand All @@ -114,7 +116,7 @@ void reloadNodes()
}

/***
* Shutsdown the scheduler
* Shutsdown the scheduler.
*/
@Override
public void destroy()
Expand Down

0 comments on commit ed87b66

Please sign in to comment.