Skip to content

Commit

Permalink
Cassandra based distributed locking mechanism # 741
Browse files Browse the repository at this point in the history
- Cassandra tables called lock and lock_priority,
 to manage task execution and synchronization across
 multiple nodes.
  • Loading branch information
sajid riaz committed Nov 4, 2024
1 parent 1122325 commit eea5f4d
Show file tree
Hide file tree
Showing 27 changed files with 2,151 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class AgentConnectionConfig
private HostAware myHostAware = new HostAware();
private Class<? extends DefaultLoadBalancingPolicy> myDatacenterAwarePolicy = DataCenterAwarePolicy.class;
private String myInstanceName;
private boolean myRemoteRouting = true;

/**
* Default constructor for AgentConnectionConfig.
Expand All @@ -47,6 +48,17 @@ public AgentConnectionConfig()

}

@JsonProperty("remoteRouting")
public boolean getRemoteRouting()
{
return myRemoteRouting;
}

@JsonProperty("remoteRouting")
public void setRemoteRouting(final boolean remoteRouting)
{
myRemoteRouting = remoteRouting;
}
/**
* Gets unique ecchronos instance name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
* Contains configurations related to outbound connections (CQL and JMX).
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory;

import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Locale;

public class CasLockFactoryConfig
{
private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L;
private static final String DEFAULT_KEYSPACE_NAME = "ecchronos";
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencySerial = ConsistencyType.DEFAULT;

public final long getFailureCacheExpiryTimeInSeconds()
{
return myExpiryTimeInSeconds;
}

@JsonProperty ("cache_expiry_time_in_seconds")
public final void setFailureCacheExpiryTimeInSeconds(final long expiryTimeInSeconds)
{
myExpiryTimeInSeconds = expiryTimeInSeconds;
}

public final String getKeyspaceName()
{
return myKeyspaceName;
}

@JsonProperty ("keyspace")
public final void setKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
}

@JsonProperty ("consistencySerial")
public final ConsistencyType getConsistencySerial()
{
return myConsistencySerial;
}

@JsonProperty ("consistencySerial")
public final void setConsistencySerial(final String consistencySerial)
{
myConsistencySerial = ConsistencyType.valueOf(consistencySerial.toUpperCase(Locale.US));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory;

import com.fasterxml.jackson.annotation.JsonProperty;

public class LockFactoryConfig
{
private CasLockFactoryConfig myCasLockFactoryConfig = new CasLockFactoryConfig();

@JsonProperty("cas")
public final CasLockFactoryConfig getCasLockFactoryConfig()
{
return myCasLockFactoryConfig;
}

@JsonProperty("cas")
public final void setCasLockFactoryConfig(final CasLockFactoryConfig casLockFactoryConfig)
{
myCasLockFactoryConfig = casLockFactoryConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Contains configurations related to lock factory.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory;
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AgentNativeConnectionProvider implements DistributedNativeConnectio
private static final Logger LOG = LoggerFactory.getLogger(AgentNativeConnectionProvider.class);

private final DistributedNativeConnectionProviderImpl myDistributedNativeConnectionProviderImpl;
private final boolean myRemoteRouting;

/**
* Constructs an {@code AgentNativeConnectionProvider} with the specified configuration, security supplier, and
Expand All @@ -62,14 +63,15 @@ public class AgentNativeConnectionProvider implements DistributedNativeConnectio
* the handler for managing SSL/TLS certificates.
*/
public AgentNativeConnectionProvider(
final Config config,
final Supplier<Security.CqlSecurity> cqlSecuritySupplier,
final CertificateHandler certificateHandler,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
final Config config,
final Supplier<Security.CqlSecurity> cqlSecuritySupplier,
final CertificateHandler certificateHandler,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider)
{
AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig().getCqlConnection()
AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig()
.getCqlConnection()
.getAgentConnectionConfig();
myRemoteRouting = agentConnectionConfig.getRemoteRouting();
Security.CqlSecurity cqlSecurity = cqlSecuritySupplier.get();
boolean authEnabled = cqlSecurity.getCqlCredentials().isEnabled();
boolean tlsEnabled = cqlSecurity.getCqlTlsConfig().isEnabled();
Expand Down Expand Up @@ -111,25 +113,24 @@ public AgentNativeConnectionProvider(
* @return the configured {@link DistributedNativeBuilder}.
*/
public final DistributedNativeBuilder resolveAgentProviderBuilder(
final DistributedNativeBuilder builder,
final AgentConnectionConfig agentConnectionConfig
)
final DistributedNativeBuilder builder,
final AgentConnectionConfig agentConnectionConfig)
{
switch (agentConnectionConfig.getType())
{
case datacenterAware:
LOG.info("Using DatacenterAware as Agent Config");
return builder.withDatacenterAware(resolveDatacenterAware(
agentConnectionConfig.getDatacenterAware()));
case rackAware:
LOG.info("Using RackAware as Agent Config");
return builder.withRackAware(resolveRackAware(
agentConnectionConfig.getRackAware()));
case hostAware:
LOG.info("Using HostAware as Agent Config");
return builder.withHostAware(resolveHostAware(
agentConnectionConfig.getHostAware()));
default:
case datacenterAware:
LOG.info("Using DatacenterAware as Agent Config");
return builder.withDatacenterAware(resolveDatacenterAware(
agentConnectionConfig.getDatacenterAware()));
case rackAware:
LOG.info("Using RackAware as Agent Config");
return builder.withRackAware(resolveRackAware(
agentConnectionConfig.getRackAware()));
case hostAware:
LOG.info("Using HostAware as Agent Config");
return builder.withHostAware(resolveHostAware(
agentConnectionConfig.getHostAware()));
default:
}
return builder;
}
Expand All @@ -142,8 +143,7 @@ public final DistributedNativeBuilder resolveAgentProviderBuilder(
* @return a list of {@link InetSocketAddress} representing the resolved contact points.
*/
public final List<InetSocketAddress> resolveInitialContactPoints(
final Map<String, AgentConnectionConfig.Host> contactPoints
)
final Map<String, AgentConnectionConfig.Host> contactPoints)
{
List<InetSocketAddress> resolvedContactPoints = new ArrayList<>();
for (AgentConnectionConfig.Host host : contactPoints.values())
Expand All @@ -165,11 +165,7 @@ public final List<InetSocketAddress> resolveInitialContactPoints(
public final List<String> resolveDatacenterAware(final AgentConnectionConfig.DatacenterAware datacenterAware)
{
List<String> datacenterNames = new ArrayList<>();
for
(
AgentConnectionConfig.DatacenterAware.Datacenter datacenter
:
datacenterAware.getDatacenters().values())
for (AgentConnectionConfig.DatacenterAware.Datacenter datacenter : datacenterAware.getDatacenters().values())
{
datacenterNames.add(datacenter.getName());
}
Expand All @@ -186,12 +182,7 @@ public final List<String> resolveDatacenterAware(final AgentConnectionConfig.Dat
public final List<Map<String, String>> resolveRackAware(final AgentConnectionConfig.RackAware rackAware)
{
List<Map<String, String>> rackList = new ArrayList<>();
for
(
AgentConnectionConfig.RackAware.Rack rack
:
rackAware.getRacks().values()
)
for (AgentConnectionConfig.RackAware.Rack rack : rackAware.getRacks().values())
{
Map<String, String> rackInfo = new HashMap<>();
rackInfo.put("datacenterName", rack.getDatacenterName());
Expand All @@ -211,12 +202,7 @@ public final List<Map<String, String>> resolveRackAware(final AgentConnectionCon
public final List<InetSocketAddress> resolveHostAware(final AgentConnectionConfig.HostAware hostAware)
{
List<InetSocketAddress> resolvedHosts = new ArrayList<>();
for
(
AgentConnectionConfig.Host host
:
hostAware.getHosts().values()
)
for (AgentConnectionConfig.Host host : hostAware.getHosts().values())
{
InetSocketAddress tmpAddress = new InetSocketAddress(host.getHost(), host.getPort());
resolvedHosts.add(tmpAddress);
Expand All @@ -237,8 +223,8 @@ public final List<InetSocketAddress> resolveHostAware(final AgentConnectionConfi
* if the connection is in an illegal state.
*/
public final DistributedNativeConnectionProviderImpl tryEstablishConnection(
final DistributedNativeBuilder builder
) throws AllNodesFailedException, IllegalStateException
final DistributedNativeBuilder builder) throws AllNodesFailedException,
IllegalStateException
{
try
{
Expand Down Expand Up @@ -284,4 +270,15 @@ public void close() throws IOException
{
myDistributedNativeConnectionProviderImpl.close();
}

/**
* Determines if routing requests directly to a remote datacenter is allowed.
*
* @return {@code true} if remote routing is enabled.
*/
@Override
public boolean getRemoteRouting()
{
return myRemoteRouting;
}
}
41 changes: 41 additions & 0 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ connection:
connectionDelay:
time: 45
unit: MINUTES
##
## Allow routing requests directly to a remote datacenter.
## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter.
## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible.
## If remote routing is disabled, instead SERIAL consistency will be used for those request.
##
remoteRouting: true
jmx:
##
## The class used to provide JMX connections to Apache Cassandra.
Expand Down Expand Up @@ -165,6 +172,15 @@ repair:
priority:
granularity_unit: HOURS
##
## Specifies the type of lock to use for repairs.
## "vnode" will lock each node involved in a repair individually and increase the number of
## parallel repairs that can run in a single data center.
## "datacenter" will lock each data center involved in a repair and only allow a single repair per data center.
## "datacenter_and_vnode" will combine both options and allow a smooth transition between them without allowing
## multiple repairs to run concurrently on a single node.
##
lock_type: vnode
##
## Specifies the unwind ratio to smooth out the load that repairs generate.
## This value is a ratio between 0 -> 100% of the execution time of a repair session.
##
Expand Down Expand Up @@ -261,3 +277,28 @@ rest_server:
##
host: localhost
port: 8080

lock_factory:
cas:
##
## The keyspace used for the CAS lock factory tables.
##
keyspace: ecchronos
##
## The number of seconds until the lock failure cache expires.
## If an attempt to secure a lock is unsuccessful,
## all subsequent attempts will be failed until
## the cache expiration time is reached.
##
cache_expiry_time_in_seconds: 30
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public interface DistributedNativeConnectionProvider extends Closeable
default void close() throws IOException
{
}

default boolean getRemoteRouting()
{
return true;
}
}
Loading

0 comments on commit eea5f4d

Please sign in to comment.