Skip to content

Commit

Permalink
Cassandra based distributed locking mechanism # 741
Browse files Browse the repository at this point in the history
- Cassandra tables called lock and lock_priority,
 to manage task execution and synchronization across
 multiple nodes.
  • Loading branch information
sajid riaz committed Nov 5, 2024
1 parent eea5f4d commit 62343cb
Show file tree
Hide file tree
Showing 13 changed files with 1,120 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.application.config;

import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory.LockFactoryConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig;
Expand All @@ -28,6 +29,7 @@ public class Config
private RunPolicyConfig myRunPolicyConfig = new RunPolicyConfig();
private SchedulerConfig mySchedulerConfig = new SchedulerConfig();
private RestServerConfig myRestServerConfig = new RestServerConfig();
private LockFactoryConfig myLockFactoryConfig = new LockFactoryConfig();

@JsonProperty("connection")
public final ConnectionConfig getConnectionConfig()
Expand Down Expand Up @@ -119,4 +121,19 @@ public final void setRestServerConfig(final RestServerConfig restServerConfig)
myRestServerConfig = restServerConfig;
}
}

@JsonProperty("lock_factory")
public final LockFactoryConfig getLockFactory()
{
return myLockFactoryConfig;
}

@JsonProperty("lock_factory")
public final void setLockFactoryConfig(final LockFactoryConfig lockFactoryConfig)
{
if (lockFactoryConfig != null)
{
myLockFactoryConfig = lockFactoryConfig;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public AgentNativeConnectionProvider(
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory)
.withSchemaChangeListener(defaultRepairConfigurationProvider)
.withNodeStateListener(defaultRepairConfigurationProvider);
.withNodeStateListener(defaultRepairConfigurationProvider)
.withRemoteRouting(agentConnectionConfig.getRemoteRouting());
LOG.info("Preparing Agent Connection Config");
nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig);
LOG.info("Establishing Connection With Nodes");
Expand Down
7 changes: 7 additions & 0 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ connection:
## (instanceName: unique identifier), that will be used
## as ecchronos_id (partition key in nodes_sync table).
instanceName: unique_identifier
##
## Allow routing requests directly to a remote datacenter.
## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter.
## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible.
## If remote routing is disabled, instead SERIAL consistency will be used for those request.
##
remoteRouting: true
## Define the Agent strategy, it can be
## - datacenterAware;
## - rackAware; and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,11 @@ public void testInstanceName()
{
assertThat(nativeConnection.getAgentConnectionConfig().getInstanceName()).isEqualTo("unique_identifier");
}

@Test
public void testRemoteRouting()
{
assertThat(nativeConnection.getAgentConnectionConfig().getRemoteRouting()).isEqualTo(false);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory;

import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.junit.Test;

import java.io.File;
import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;

public class TestCasLockFactoryConfig
{
@Test
public void testCasLockFactoryConfigWithProvidedValue() throws IOException
{
CasLockFactoryConfig casLockFactoryConfig = getCasLockFactoryConfig("all_set.yml");
assertThat(casLockFactoryConfig.getKeyspaceName()).isEqualTo("ecc");
assertThat(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()).isEqualTo(100L);
}

@Test
public void testCasLockFactoryConfigDefaultValue() throws IOException
{
CasLockFactoryConfig casLockFactoryConfig = getCasLockFactoryConfig("nothing_set.yml");
assertThat(casLockFactoryConfig.getKeyspaceName()).isEqualTo("ecchronos");
assertThat(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()).isEqualTo(30L);
}

private CasLockFactoryConfig getCasLockFactoryConfig(final String fileName) throws IOException
{
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File file = new File(classLoader.getResource(fileName).getFile());
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Config config = mapper.readValue(file, Config.class);
return config.getLockFactory().getCasLockFactoryConfig();
}
}
9 changes: 8 additions & 1 deletion application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ connection:
port: 9042
- host: 127.0.0.4
port: 9042
remoteRouting: false
provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider
connectionDelay:
time: 45
Expand Down Expand Up @@ -90,4 +91,10 @@ scheduler:

rest_server:
host: 127.0.0.2
port: 8081
port: 8081

lock_factory:
cas:
keyspace: ecc
cache_expiry_time_in_seconds: 100
consistencySerial: "LOCAL"
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class DistributedNativeBuilder
private SslEngineFactory mySslEngineFactory = null;
private SchemaChangeListener mySchemaChangeListener = null;
private NodeStateListener myNodeStateListener = null;
private boolean myRemoteRouting;

/**
* Sets the initial contact points for the distributed native connection.
Expand All @@ -88,6 +89,20 @@ public final DistributedNativeBuilder withInitialContactPoints(final List<InetSo
return this;
}

/**
* Determines if routing requests directly to a remote datacenter is allowed.
* Sets the remote routing mode of the agent for the distributed native connection.
*
* @param remoteRouting
* the remote routing mode for agent {@link boolean}.
* @return the current instance of {@link DistributedNativeBuilder}.
*/
public final DistributedNativeBuilder withRemoteRouting(final boolean remoteRouting)
{
myRemoteRouting = remoteRouting;
return this;
}

/**
* Sets the type of the agent for the distributed native connection.
*
Expand Down Expand Up @@ -244,7 +259,7 @@ public final DistributedNativeConnectionProviderImpl build()
LOG.info("Requesting Nodes List");
List<Node> nodesList = createNodesList(session);
LOG.info("Nodes list was created with success");
return new DistributedNativeConnectionProviderImpl(session, nodesList);
return new DistributedNativeConnectionProviderImpl(session, nodesList, myRemoteRouting);
}

private List<Node> createNodesList(final CqlSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ
{
private final CqlSession mySession;
private final List<Node> myNodes;
private final boolean myRemoteRouting;

/**
* Constructs a new {@code DistributedNativeConnectionProviderImpl} with the specified {@link CqlSession} and list
Expand All @@ -37,12 +38,13 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ
* the list of {@link Node} instances representing the nodes in the cluster.
*/
public DistributedNativeConnectionProviderImpl(
final CqlSession session,
final List<Node> nodesList
)
final CqlSession session,
final List<Node> nodesList,
final boolean remoteRouting)
{
mySession = session;
myNodes = nodesList;
myRemoteRouting = remoteRouting;
}

/**
Expand Down Expand Up @@ -79,6 +81,17 @@ public void close() throws IOException
mySession.close();
}

/**
* Determines if routing requests directly to a remote datacenter is allowed.
*
* @return {@code true} if remote routing is enabled.
*/
@Override
public boolean getRemoteRouting()
{
return myRemoteRouting;
}

/**
* Creates a new instance of {@link DistributedNativeBuilder} for building
* {@link DistributedNativeConnectionProviderImpl} objects.
Expand Down
22 changes: 22 additions & 0 deletions core.impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,28 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl;

import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import java.net.InetSocketAddress;

import java.util.List;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.utility.DockerImageName;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Node;

public class AbstractCassandraContainerTest
{
protected static CqlSession mySession;

private static DistributedNativeConnectionProvider myNativeConnectionProvider;
private static CassandraContainer<?> node;

@SuppressWarnings ("resource")
@BeforeClass
public static void setUpCluster()
{
// This is set as an environment variable ('it.cassandra.version') in maven using the '-D' flag.
String cassandraVersion = System.getProperty("it.cassandra.version");
if (cassandraVersion == null)
{
// No environment version set, just use latest.
cassandraVersion = "latest";
}
node = new CassandraContainer<>(DockerImageName.parse("cassandra:" + cassandraVersion))
.withExposedPorts(9042, 7000, 7199)
.withEnv("CASSANDRA_DC", "DC1")
.withEnv("CASSANDRA_ENDPOINT_SNITCH", "GossipingPropertyFileSnitch")
.withEnv("CASSANDRA_CLUSTER_NAME", "TestCluster")
.withEnv("JMX_PORT", "7199");
node.start();
String containerIpAddress = node.getHost();
Integer containerPort = node.getMappedPort(9042);

mySession = CqlSession.builder()
.addContactPoint(new InetSocketAddress(containerIpAddress, containerPort))
.withLocalDatacenter("DC1")
.build();

List<Node> nodesList = mySession.getMetadata().getNodes().values().stream().toList();
myNativeConnectionProvider = new DistributedNativeConnectionProvider()
{
@Override
public CqlSession getCqlSession()
{
return mySession;
}

@Override
public List<Node> getNodes()
{
return nodesList;
}

@Override
public boolean getRemoteRouting()
{
return true;
}
};
}

@AfterClass
public static void tearDownCluster()
{
if (mySession != null)
{
mySession.close();
}
node.stop();
}

public static DistributedNativeConnectionProvider getNativeConnectionProvider()
{
return myNativeConnectionProvider;
}

public static CassandraContainer<?> getContainerNode()
{
return node;
}
}
Loading

0 comments on commit 62343cb

Please sign in to comment.