Skip to content

Commit

Permalink
Merge branch 'agent/master' into distributedlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
SajidRiaz138 authored and sajid riaz committed Nov 6, 2024
2 parents 06150a1 + 71f92c8 commit 43e57d4
Show file tree
Hide file tree
Showing 21 changed files with 682 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ dependency-reduced-pom.xml
*htmlcov
application/statistics/
statistics/
.vscode/

2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Version 1.0.0 (Not yet Released)

* Cassandra-Based Distributed Locks #741
* Update nodes when cluster changes, nodes removed or added #699
* Hot Reload of Nodes List - Issue #699
* Investigate Creation of RepairScheduler and ScheduleManager #714
* Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740
* Implement RepairGroup Class for Managing and Executing Repair Tasks - Issue #738
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,4 @@ public final String toString()
{
return String.format("Connection(cql=%s, jmx=%s)", myCqlConnection, myJmxConnection);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.providers;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.Credentials;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.JmxTLSConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.Security;
Expand Down Expand Up @@ -159,6 +160,18 @@ public void close(final UUID nodeID) throws IOException
myDistributedJmxConnectionProviderImpl.close(nodeID);
}

/**
* Creates a new connection a node.
* @param node
*
* @throws IOException
*/
@Override
public void add(final Node node) throws IOException
{
myDistributedJmxConnectionProviderImpl.add(node);
}

/**
* Closes all JMX connections managed by this provider.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public AgentNativeConnectionProvider(
sslEngineFactory = certificateHandler;
}

DistributedNativeBuilder nativeConnectionBuilder =
DistributedNativeBuilder nativeConnectionBuilder =
DistributedNativeConnectionProviderImpl.builder()
.withInitialContactPoints(resolveInitialContactPoints(agentConnectionConfig.getContactPoints()))
.withAgentType(agentConnectionConfig.getType())
Expand All @@ -94,6 +94,7 @@ public AgentNativeConnectionProvider(
.withSslEngineFactory(sslEngineFactory)
.withSchemaChangeListener(defaultRepairConfigurationProvider)
.withNodeStateListener(defaultRepairConfigurationProvider);

LOG.info("Preparing Agent Connection Config");
nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig);
LOG.info("Establishing Connection With Nodes");
Expand Down Expand Up @@ -268,4 +269,32 @@ public void close() throws IOException
{
myDistributedNativeConnectionProviderImpl.close();
}
/**
* Add a nw node to the list of nodes.
* @param myNode
*/
@Override
public void addNode(final Node myNode)
{
myDistributedNativeConnectionProviderImpl.addNode(myNode);
}
/**
* Remove node for the list of nodes.
* @param myNode
*/
@Override
public void removeNode(final Node myNode)
{
myDistributedNativeConnectionProviderImpl.removeNode(myNode);
}
/**
* Checks the node is on the list of specified dc's/racks/nodes.
* @param node
* @return
*/
@Override
public Boolean confirmNodeValid(final Node node)
{
return myDistributedNativeConnectionProviderImpl.confirmNodeValid(node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public ECChronos(
.withNodesList(nativeConnectionProvider.getNodes())
.withReplicatedTableProvider(myECChronosInternals.getReplicatedTableProvider())
.withRepairConfiguration(repairConfigurationProvider::get)
.withEccNodesSync(eccNodesSync)
.withJmxConnectionProvider(jmxConnectionProvider)
.withDistributedNativeConnectionProvider(nativeConnectionProvider)
.withTableReferenceFactory(myECChronosInternals.getTableReferenceFactory()));

myECChronosInternals.addRunPolicy(myTimeBasedRunPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public final DistributedJmxConnectionProviderImpl build() throws IOException
createConnections();
return new DistributedJmxConnectionProviderImpl(
myNodesList,
myJMXConnections
myJMXConnections,
this
);
}

Expand All @@ -152,7 +153,12 @@ private void createConnections() throws IOException
}
}

private void reconnect(final Node node) throws IOException, EcChronosException
/***
* Creates a JMXconnection to the host.
* @param node
* @throws EcChronosException
*/
public void reconnect(final Node node) throws EcChronosException
{
try
{
Expand Down Expand Up @@ -246,7 +252,16 @@ private Integer getJMXPort(final Node node)
.setNode(node)
.build();
Row row = mySession.execute(simpleStatement).one();
if (row != null)
if ((row == null) || (row.getString("value") == null))
{
simpleStatement = SimpleStatement
.builder("SELECT value FROM system_views.system_properties WHERE name = 'cassandra.jmx.local.port';")
.setNode(node)
.build();
row = mySession.execute(simpleStatement).one();

}
if ((row != null) && (row.getString("value") != null))
{
return Integer.parseInt(Objects.requireNonNull(row.getString("value")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,15 @@ public final DistributedNativeConnectionProviderImpl build()
LOG.info("Requesting Nodes List");
List<Node> nodesList = createNodesList(session);
LOG.info("Nodes list was created with success");
return new DistributedNativeConnectionProviderImpl(session, nodesList);
return new DistributedNativeConnectionProviderImpl(session, nodesList, this);
}

private List<Node> createNodesList(final CqlSession session)
/**
* Creates a list of nodes based on the connection type, reads the node list from the database.
* @param session the connection information to the database
* @return list of nodes
*/
public List<Node> createNodesList(final CqlSession session)
{
List<Node> tmpNodeList = new ArrayList<>();
switch (myType)
Expand All @@ -269,6 +274,49 @@ private List<Node> createNodesList(final CqlSession session)
return tmpNodeList;
}

/**
* Checks the node is on the list of specified dc's/racks/nodes.
* @param node
* @return
*/
public Boolean confirmNodeValid(final Node node)
{
switch (myType)
{
case datacenterAware:
return confirmDatacenterNodeValid(node, myDatacenterAware);
case rackAware:
return confirmRackNodeValid(node, myRackAware);
case hostAware:
return confirmHostNodeValid(node, myHostAware);

default:
}
return false;
}

private Boolean confirmDatacenterNodeValid(final Node node, final List<String> datacenterNames)
{
return (datacenterNames.contains(node.getDatacenter()));
}

private Boolean confirmRackNodeValid(final Node node, final List<Map<String, String>> rackInfo)
{
Set<Map<String, String>> racksInfoSet = new HashSet<>(rackInfo);
Map<String, String> tmpRackInfo = new HashMap<>();
tmpRackInfo.put("datacenterName", node.getDatacenter());
tmpRackInfo.put("rackName", node.getRack());
return (racksInfoSet.contains(tmpRackInfo));
}

private Boolean confirmHostNodeValid(final Node node, final List<InetSocketAddress> hostsInfo)
{
Set<InetSocketAddress> hostsInfoSet = new HashSet<>(hostsInfo);

InetSocketAddress tmpAddress = (InetSocketAddress) node.getEndPoint().resolve();
return (hostsInfoSet.contains(tmpAddress));
}

private CqlSession createSession(final DistributedNativeBuilder builder)
{
CqlSessionBuilder sessionBuilder = fromBuilder(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedJmxBuilder;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedJmxConnectionProviderImpl implements DistributedJmxConnectionProvider
{
private static final Logger LOG = LoggerFactory.getLogger(DistributedJmxConnectionProviderImpl.class);
private final List<Node> myNodesList;
private final ConcurrentHashMap<UUID, JMXConnector> myJMXConnections;
private final DistributedJmxBuilder myDistributedJmxBuilder;

/**
* Constructs a DistributedJmxConnectionProviderImpl with the specified list of nodes and JMX connections.
Expand All @@ -41,11 +46,13 @@ public class DistributedJmxConnectionProviderImpl implements DistributedJmxConne
*/
public DistributedJmxConnectionProviderImpl(
final List<Node> nodesList,
final ConcurrentHashMap<UUID, JMXConnector> jmxConnections
final ConcurrentHashMap<UUID, JMXConnector> jmxConnections,
final DistributedJmxBuilder distributedJmxBuilder
)
{
myNodesList = nodesList;
myJMXConnections = jmxConnections;
myDistributedJmxBuilder = distributedJmxBuilder;
}

/**
Expand Down Expand Up @@ -145,4 +152,23 @@ public void close(final UUID nodeID) throws IOException
myJMXConnections.get(nodeID).close();
}

/**
* Add a node and create a JMXconnection.
* @param node
* @throws IOException
*/
@Override
public void add(final Node node) throws IOException
{
try
{
myDistributedJmxBuilder.reconnect(node);
}
catch (EcChronosException e)
{
LOG.warn("Unable to connect with node {} connection refused: {}", node.getHostId(), e.getMessage());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ
{
private final CqlSession mySession;
private final List<Node> myNodes;
private final DistributedNativeBuilder myDistributedNativeBuilder;

/**
* Constructs a new {@code DistributedNativeConnectionProviderImpl} with the specified {@link CqlSession} and list
Expand All @@ -36,11 +37,15 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ
* @param nodesList
* the list of {@link Node} instances representing the nodes in the cluster.
*/
public DistributedNativeConnectionProviderImpl(final CqlSession session,
final List<Node> nodesList)
public DistributedNativeConnectionProviderImpl(
final CqlSession session,
final List<Node> nodesList,
final DistributedNativeBuilder distributedNativeBuilder
)
{
mySession = session;
myNodes = nodesList;
myDistributedNativeBuilder = distributedNativeBuilder;
}

/**
Expand All @@ -65,6 +70,8 @@ public List<Node> getNodes()
return myNodes;
}



/**
* Closes the {@link CqlSession} associated with this connection provider.
*
Expand All @@ -88,4 +95,35 @@ public static DistributedNativeBuilder builder()
return new DistributedNativeBuilder();
}

/**
* Add a nw node to the list of nodes.
* @param node
*/
@Override
public void addNode(final Node node)
{
myNodes.add(node);
}

/**
* Remove node for the list of nodes.
* @param node
*/
@Override
public void removeNode(final Node node)
{
myNodes.remove(node);
}

/**
* Checks the node is on the list of specified dc's/racks/nodes.
* @param node
* @return
*/
@Override
public Boolean confirmNodeValid(final Node node)
{
return myDistributedNativeBuilder.confirmNodeValid(node);
}

}
Loading

0 comments on commit 43e57d4

Please sign in to comment.