diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java index 7cc16003a..4c1969a69 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java @@ -15,6 +15,7 @@ package com.ericsson.bss.cassandra.ecchronos.application.config; import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; +import com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory.LockFactoryConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig; @@ -28,6 +29,7 @@ public class Config private RunPolicyConfig myRunPolicyConfig = new RunPolicyConfig(); private SchedulerConfig mySchedulerConfig = new SchedulerConfig(); private RestServerConfig myRestServerConfig = new RestServerConfig(); + private LockFactoryConfig myLockFactoryConfig = new LockFactoryConfig(); @JsonProperty("connection") public final ConnectionConfig getConnectionConfig() @@ -119,4 +121,19 @@ public final void setRestServerConfig(final RestServerConfig restServerConfig) myRestServerConfig = restServerConfig; } } + + @JsonProperty("lock_factory") + public final LockFactoryConfig getLockFactory() + { + return myLockFactoryConfig; + } + + @JsonProperty("lock_factory") + public final void setLockFactoryConfig(final LockFactoryConfig lockFactoryConfig) + { + if (lockFactoryConfig != null) + { + myLockFactoryConfig = lockFactoryConfig; + } + } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java index 1eaf52ce6..3b48809e0 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/providers/AgentNativeConnectionProvider.java @@ -95,7 +95,8 @@ public AgentNativeConnectionProvider( .withAuthProvider(authProvider) .withSslEngineFactory(sslEngineFactory) .withSchemaChangeListener(defaultRepairConfigurationProvider) - .withNodeStateListener(defaultRepairConfigurationProvider); + .withNodeStateListener(defaultRepairConfigurationProvider) + .withRemoteRouting(agentConnectionConfig.getRemoteRouting()); LOG.info("Preparing Agent Connection Config"); nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig); LOG.info("Establishing Connection With Nodes"); diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 4d19449e7..466866b0f 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -28,6 +28,13 @@ connection: ## (instanceName: unique identifier), that will be used ## as ecchronos_id (partition key in nodes_sync table). instanceName: unique_identifier + ## + ## Allow routing requests directly to a remote datacenter. + ## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter. + ## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible. + ## If remote routing is disabled, instead SERIAL consistency will be used for those request. + ## + remoteRouting: true ## Define the Agent strategy, it can be ## - datacenterAware; ## - rackAware; and diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java index 8089448cc..baf9e9ad2 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java @@ -317,5 +317,11 @@ public void testInstanceName() { assertThat(nativeConnection.getAgentConnectionConfig().getInstanceName()).isEqualTo("unique_identifier"); } + + @Test + public void testRemoteRouting() + { + assertThat(nativeConnection.getAgentConnectionConfig().getRemoteRouting()).isEqualTo(false); + } } diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/TestCasLockFactoryConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/TestCasLockFactoryConfig.java new file mode 100644 index 000000000..94e231560 --- /dev/null +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/TestCasLockFactoryConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory; + +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCasLockFactoryConfig +{ + @Test + public void testCasLockFactoryConfigWithProvidedValue() throws IOException + { + CasLockFactoryConfig casLockFactoryConfig = getCasLockFactoryConfig("all_set.yml"); + assertThat(casLockFactoryConfig.getKeyspaceName()).isEqualTo("ecc"); + assertThat(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()).isEqualTo(100L); + } + + @Test + public void testCasLockFactoryConfigDefaultValue() throws IOException + { + CasLockFactoryConfig casLockFactoryConfig = getCasLockFactoryConfig("nothing_set.yml"); + assertThat(casLockFactoryConfig.getKeyspaceName()).isEqualTo("ecchronos"); + assertThat(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()).isEqualTo(30L); + } + + private CasLockFactoryConfig getCasLockFactoryConfig(final String fileName) throws IOException + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + File file = new File(classLoader.getResource(fileName).getFile()); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + Config config = mapper.readValue(file, Config.class); + return config.getLockFactory().getCasLockFactoryConfig(); + } +} diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index 218c19722..b5a7977c9 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -42,6 +42,7 @@ connection: port: 9042 - host: 127.0.0.4 port: 9042 + remoteRouting: false provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider connectionDelay: time: 45 @@ -90,4 +91,10 @@ scheduler: rest_server: host: 127.0.0.2 - port: 8081 \ No newline at end of file + port: 8081 + +lock_factory: + cas: + keyspace: ecc + cache_expiry_time_in_seconds: 100 + consistencySerial: "LOCAL" \ No newline at end of file diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java index 7a9af5f8c..1c11e3028 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedNativeBuilder.java @@ -74,6 +74,7 @@ public class DistributedNativeBuilder private SslEngineFactory mySslEngineFactory = null; private SchemaChangeListener mySchemaChangeListener = null; private NodeStateListener myNodeStateListener = null; + private boolean myRemoteRouting; /** * Sets the initial contact points for the distributed native connection. @@ -88,6 +89,20 @@ public final DistributedNativeBuilder withInitialContactPoints(final List<InetSo return this; } + /** + * Determines if routing requests directly to a remote datacenter is allowed. + * Sets the remote routing mode of the agent for the distributed native connection. + * + * @param remoteRouting + * the remote routing mode for agent {@link boolean}. + * @return the current instance of {@link DistributedNativeBuilder}. + */ + public final DistributedNativeBuilder withRemoteRouting(final boolean remoteRouting) + { + myRemoteRouting = remoteRouting; + return this; + } + /** * Sets the type of the agent for the distributed native connection. * @@ -244,7 +259,7 @@ public final DistributedNativeConnectionProviderImpl build() LOG.info("Requesting Nodes List"); List<Node> nodesList = createNodesList(session); LOG.info("Nodes list was created with success"); - return new DistributedNativeConnectionProviderImpl(session, nodesList); + return new DistributedNativeConnectionProviderImpl(session, nodesList, myRemoteRouting); } private List<Node> createNodesList(final CqlSession session) diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java index 8d4ccebd3..7d550915f 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedNativeConnectionProviderImpl.java @@ -26,6 +26,7 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ { private final CqlSession mySession; private final List<Node> myNodes; + private final boolean myRemoteRouting; /** * Constructs a new {@code DistributedNativeConnectionProviderImpl} with the specified {@link CqlSession} and list @@ -37,12 +38,13 @@ public class DistributedNativeConnectionProviderImpl implements DistributedNativ * the list of {@link Node} instances representing the nodes in the cluster. */ public DistributedNativeConnectionProviderImpl( - final CqlSession session, - final List<Node> nodesList - ) + final CqlSession session, + final List<Node> nodesList, + final boolean remoteRouting) { mySession = session; myNodes = nodesList; + myRemoteRouting = remoteRouting; } /** @@ -79,6 +81,17 @@ public void close() throws IOException mySession.close(); } + /** + * Determines if routing requests directly to a remote datacenter is allowed. + * + * @return {@code true} if remote routing is enabled. + */ + @Override + public boolean getRemoteRouting() + { + return myRemoteRouting; + } + /** * Creates a new instance of {@link DistributedNativeBuilder} for building * {@link DistributedNativeConnectionProviderImpl} objects. diff --git a/core.impl/pom.xml b/core.impl/pom.xml index d42db13e7..96e109b55 100644 --- a/core.impl/pom.xml +++ b/core.impl/pom.xml @@ -99,6 +99,28 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>cassandra</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java new file mode 100644 index 000000000..9b8c8f158 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/AbstractCassandraContainerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl; + +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import java.net.InetSocketAddress; + +import java.util.List; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.utility.DockerImageName; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Node; + +public class AbstractCassandraContainerTest +{ + protected static CqlSession mySession; + + private static DistributedNativeConnectionProvider myNativeConnectionProvider; + private static CassandraContainer<?> node; + + @SuppressWarnings ("resource") + @BeforeClass + public static void setUpCluster() + { + // This is set as an environment variable ('it.cassandra.version') in maven using the '-D' flag. + String cassandraVersion = System.getProperty("it.cassandra.version"); + if (cassandraVersion == null) + { + // No environment version set, just use latest. + cassandraVersion = "latest"; + } + node = new CassandraContainer<>(DockerImageName.parse("cassandra:" + cassandraVersion)) + .withExposedPorts(9042, 7000, 7199) + .withEnv("CASSANDRA_DC", "DC1") + .withEnv("CASSANDRA_ENDPOINT_SNITCH", "GossipingPropertyFileSnitch") + .withEnv("CASSANDRA_CLUSTER_NAME", "TestCluster") + .withEnv("JMX_PORT", "7199"); + node.start(); + String containerIpAddress = node.getHost(); + Integer containerPort = node.getMappedPort(9042); + + mySession = CqlSession.builder() + .addContactPoint(new InetSocketAddress(containerIpAddress, containerPort)) + .withLocalDatacenter("DC1") + .build(); + + List<Node> nodesList = mySession.getMetadata().getNodes().values().stream().toList(); + myNativeConnectionProvider = new DistributedNativeConnectionProvider() + { + @Override + public CqlSession getCqlSession() + { + return mySession; + } + + @Override + public List<Node> getNodes() + { + return nodesList; + } + + @Override + public boolean getRemoteRouting() + { + return true; + } + }; + } + + @AfterClass + public static void tearDownCluster() + { + if (mySession != null) + { + mySession.close(); + } + node.stop(); + } + + public static DistributedNativeConnectionProvider getNativeConnectionProvider() + { + return myNativeConnectionProvider; + } + + public static CassandraContainer<?> getContainerNode() + { + return node; + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java new file mode 100644 index 000000000..ac87d1f83 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java @@ -0,0 +1,613 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.AbstractCassandraContainerTest; +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.core.locks.HostStates; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ReflectionException; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric; +import com.datastax.oss.driver.api.core.metrics.Metrics; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; + +import net.jcip.annotations.NotThreadSafe; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@NotThreadSafe +@RunWith(Parameterized.class) +public class TestCASLockFactory extends AbstractCassandraContainerTest +{ + @Parameterized.Parameters + public static Collection<String> keyspaceNames() { + return Arrays.asList("ecchronos", "anotherkeyspace"); + } + + private static final String TABLE_LOCK = "lock"; + private static final String TABLE_LOCK_PRIORITY = "lock_priority"; + + private static final String DATA_CENTER = "DC1"; + private static CASLockFactory myLockFactory; + private static PreparedStatement myLockStatement; + private static PreparedStatement myRemoveLockStatement; + private static PreparedStatement myCompeteStatement; + private static PreparedStatement myGetPrioritiesStatement; + + private static HostStates hostStates; + + @Parameterized.Parameter + public String myKeyspaceName; + + @Before + public void startup() + { + mySession.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", myKeyspaceName)); + mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.lock (resource text, node uuid, metadata map<text,text>, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName)); + mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.lock_priority (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName)); + + hostStates = mock(HostStates.class); + when(hostStates.isUp(any(Node.class))).thenReturn(true); + Node node = mock(Node.class); + when(node.getHostId()).thenReturn(UUID.randomUUID()); + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(node) + .build(); + + myLockStatement = mySession.prepare(QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK) + .value("resource", bindMarker()) + .value("node", bindMarker()) + .value("metadata", bindMarker()) + .ifNotExists() + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) + .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)); + myRemoveLockStatement = mySession.prepare(String.format("DELETE FROM %s.%s WHERE resource=? IF EXISTS", myKeyspaceName, TABLE_LOCK)); + myCompeteStatement = mySession.prepare(String.format("INSERT INTO %s.%s (resource, node, priority) VALUES (?, ?, ?)", myKeyspaceName, TABLE_LOCK_PRIORITY)); + myGetPrioritiesStatement = mySession.prepare(String.format("SELECT * FROM %s.%s WHERE resource=?", myKeyspaceName, TABLE_LOCK_PRIORITY)); + } + + @After + public void testCleanup() + { + execute(SimpleStatement.newInstance( + String.format("DELETE FROM %s.%s WHERE resource='%s'", myKeyspaceName, TABLE_LOCK_PRIORITY, "lock"))); + execute(myRemoveLockStatement.bind("lock")); + myLockFactory.close(); + } + + @Test + public void testGetDefaultTimeToLiveFromLockTable() throws LockException + { + String alterLockTable = String.format("ALTER TABLE %s.%s WITH default_time_to_live = 1200;", myKeyspaceName, TABLE_LOCK); + mySession.execute(alterLockTable); + Node node = mock(Node.class); + when(node.getHostId()).thenReturn(UUID.randomUUID()); + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(node) + .build(); + assertThat(myLockFactory.getCasLockFactoryCacheContext().getFailedLockRetryAttempts()).isEqualTo(9); + assertThat(myLockFactory.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds()).isEqualTo(120); + } + + @Test + public void testGetLock() throws LockException + { + try (LockFactory.DistributedLock lock = myLockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<String, String>())) + { + } + + assertPriorityListEmpty("lock"); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testGetGlobalLock() throws LockException + { + try (LockFactory.DistributedLock lock = myLockFactory.tryLock(null, "lock", 1, new HashMap<String, String>())) + { + } + assertPriorityListEmpty("lock"); + assertThat(myLockFactory.getCachedFailure(null, "lock")).isEmpty(); + } + + @Test + public void testGlobalLockTakenThrowsException() + { + execute(myLockStatement.bind("lock", UUID.randomUUID(), new HashMap<>())); + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(null, "lock", 1, new HashMap<>())); + assertPrioritiesInList("lock", 1); + assertThat(myLockFactory.getCachedFailure(null, "lock")).isNotEmpty(); + } + + @Test + public void testGlobalLockTakenIsCachedOnSecondTry() throws AttributeNotFoundException, InstanceNotFoundException, MalformedObjectNameException, MBeanException, ReflectionException, UnsupportedOperationException, IOException, InterruptedException + { + execute(myLockStatement.bind("lock", UUID.randomUUID(), new HashMap<>())); + InternalDriverContext driverContext = (InternalDriverContext) mySession.getContext(); + //Check that no in-flight queries exist, we want all previous queries to complete before we proceed + Optional<Node> connectedNode = driverContext.getPoolManager().getPools().keySet().stream().findFirst(); + while (getInFlightQueries(connectedNode.get()) != 0) + { + Thread.sleep(100); + } + long expectedLockReadCount = getReadCount(TABLE_LOCK) + 2; // We do a read due to CAS and execCommandOnContainer + long expectedLockWriteCount = getWriteCount(TABLE_LOCK) + 1; // No writes as the lock is already held + long expectedLockPriorityReadCount = getReadCount(TABLE_LOCK_PRIORITY) + 2; // We read the priorities + long expectedLockPriorityWriteCount = getWriteCount(TABLE_LOCK_PRIORITY) + 1; // We update our local priority once + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(null, "lock", 2, new HashMap<>())); + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(null, "lock", 1, new HashMap<>())); + + assertThat(getReadCount(TABLE_LOCK_PRIORITY)).isEqualTo(expectedLockPriorityReadCount); + assertThat(getWriteCount(TABLE_LOCK_PRIORITY)).isEqualTo(expectedLockPriorityWriteCount); + + assertThat(getReadCount(TABLE_LOCK)).isEqualTo(expectedLockReadCount); + assertThat(getWriteCount(TABLE_LOCK)).isEqualTo(expectedLockWriteCount); + assertPrioritiesInList("lock", 2); + assertThat(myLockFactory.getCachedFailure(null, "lock")).isNotEmpty(); + } + + private int getInFlightQueries(Node node) + { + int inFlightQueries = 0; + Optional<Metrics> metrics = mySession.getMetrics(); + if (metrics.isPresent()) + { + Optional<Metric> inFlight = metrics.get().getNodeMetric(node, DefaultNodeMetric.IN_FLIGHT); + if (inFlight.isPresent()) + { + inFlightQueries = (int) ((Gauge) inFlight.get()).getValue(); + } + } + return inFlightQueries; + } + + @Test + public void testGetLockWithLowerPriority() + { + execute(myCompeteStatement.bind("lock", UUID.randomUUID(), 2)); + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())); + assertPrioritiesInList("lock", 1, 2); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isNotEmpty(); + } + + @Test + public void testGetAlreadyTakenLock() + { + execute(myLockStatement.bind("lock", UUID.randomUUID(), new HashMap<>())); + + assertThatExceptionOfType(LockException.class).isThrownBy(() -> myLockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())); + assertPrioritiesInList("lock", 1); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isNotEmpty(); + } + + @Test + public void testGetLockWithLocallyHigherPriority() throws LockException + { + UUID localHostId = getNativeConnectionProvider().getNodes().get(0).getHostId(); + execute(myCompeteStatement.bind("lock", localHostId, 2)); + CASLockFactory lockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(getNativeConnectionProvider().getNodes().get(0)) + .build(); + + try (LockFactory.DistributedLock lock = lockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())) + { + } + + assertPrioritiesInList("lock", 2); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testGetLockWithLocallyLowerPriority() throws LockException + { + UUID localHostId = getNativeConnectionProvider().getNodes().get(0).getHostId(); + execute(myCompeteStatement.bind("lock", localHostId, 1)); + CASLockFactory lockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withNode(getNativeConnectionProvider().getNodes().get(0)) + .build(); + try (LockFactory.DistributedLock lock = lockFactory.tryLock(DATA_CENTER, "lock", 2, new HashMap<>())) + { + } + + assertPriorityListEmpty("lock"); + assertThat(lockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testReadMetadata() throws LockException + { + Map<String, String> expectedMetadata = new HashMap<>(); + expectedMetadata.put("data", "something"); + + try (LockFactory.DistributedLock lock = myLockFactory.tryLock(DATA_CENTER, "lock", 1, expectedMetadata)) + { + Map<String, String> actualMetadata = myLockFactory.getLockMetadata(DATA_CENTER, "lock"); + + assertThat(actualMetadata).isEqualTo(expectedMetadata); + } + + assertPriorityListEmpty("lock"); + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testInterruptCasLockUpdate() throws InterruptedException + { + Map<String, String> metadata = new HashMap<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + + try + { + Future<?> future = executorService.submit( + new CASLock( + DATA_CENTER, + "lock", + 1, + metadata, + myLockFactory.getHostId(), + myLockFactory.getCasLockStatement() + ) + ); + + Thread.sleep(100); + + future.cancel(true); + + executorService.shutdown(); + assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); + } + finally + { + if (!executorService.isShutdown()) + { + executorService.shutdownNow(); + } + } + } + + @Test + public void testFailedLockRetryAttempts() + { + Map<String, String> metadata = new HashMap<>(); + try (CASLock lockUpdateTask = new CASLock( + DATA_CENTER, + "lock", + 1, + metadata, + myLockFactory.getHostId(), + myLockFactory.getCasLockStatement() + )) + { + for (int i = 0; i < 10; i++) + { + lockUpdateTask.run(); + assertThat(lockUpdateTask.getFailedAttempts()).isEqualTo(i + 1); + } + + execute(myLockStatement.bind("lock", myLockFactory.getHostId(), new HashMap<>())); + lockUpdateTask.run(); + assertThat(lockUpdateTask.getFailedAttempts()).isEqualTo(0); + } + + assertThat(myLockFactory.getCachedFailure(DATA_CENTER, "lock")).isEmpty(); + } + + @Test + public void testActivateWithoutKeyspaceCausesIllegalStateException() + { + mySession.execute(String.format("DROP KEYSPACE %s", myKeyspaceName)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + + mySession.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", myKeyspaceName)); + mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, metadata map<text,text>, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK)); + mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK_PRIORITY)); } + + @Test + public void testActivateWithoutLockTableCausesIllegalStateException() + { + mySession.execute(String.format("DROP TABLE %s.%s", myKeyspaceName, TABLE_LOCK)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + + mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, metadata map<text,text>, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK)); + } + + @Test + public void testActivateWithoutLockPriorityTableCausesIllegalStateException() + { + mySession.execute(String.format("DROP TABLE %s.%s", myKeyspaceName, TABLE_LOCK_PRIORITY)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + + mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK_PRIORITY)); + } + + @Test + public void testActivateWithoutCassandraCausesIllegalStateException() + { + // mock + CqlSession session = mock(CqlSession.class); + + doThrow(AllNodesFailedException.class).when(session).getMetadata(); + + // test + assertThatExceptionOfType(AllNodesFailedException.class) + .isThrownBy(() -> new CASLockFactoryBuilder() + .withNativeConnectionProvider(new DistributedNativeConnectionProvider() + { + @Override + public CqlSession getCqlSession() + { + return session; + } + + @Override + public List<Node> getNodes() + { + return null; + } + + @Override + public boolean getRemoteRouting() + { + return true; + } + }) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .build()); + } + + @Test + public void testRemoteRoutingTrueWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + when(connectionProviderMock.getRemoteRouting()).thenReturn(true); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testRemoteRoutingFalseWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + when(connectionProviderMock.getRemoteRouting()).thenReturn(false); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testLocalSerialConsistency() + { + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.LOCAL) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testSerialConsistency() + { + DistributedNativeConnectionProvider connectionProviderMock = mock(DistributedNativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + when(nodeMock.getHostId()).thenReturn(UUID.randomUUID()); + when(connectionProviderMock.getCqlSession()).thenReturn(mySession); + when(connectionProviderMock.getNodes()).thenReturn(Arrays.asList(nodeMock)); + + myLockFactory = new CASLockFactoryBuilder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.SERIAL) + .withNode(nodeMock) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + private void assertPriorityListEmpty(String resource) + { + assertThat(getPriorities(resource)).isEmpty(); + } + + private void assertPrioritiesInList(String resource, Integer... priorities) + { + assertThat(getPriorities(resource)).containsExactlyInAnyOrder(priorities); + } + + private Set<Integer> getPriorities(String resource) + { + ResultSet resultSet = execute(myGetPrioritiesStatement.bind(resource)); + List<Row> rows = resultSet.all(); + + return rows.stream().map(r -> r.getInt("priority")).collect(Collectors.toSet()); + } + + private ResultSet execute(Statement statement) + { + return mySession.execute(statement); + } + + private long getReadCount(String tableName) throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException, MalformedObjectNameException, UnsupportedOperationException, InterruptedException + { + return getReadCountFromTableStats(tableName); + } + + private long getWriteCount(String tableName) throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException, MalformedObjectNameException, UnsupportedOperationException, InterruptedException + { + return getWriteCountFromTableStats(tableName); + } + + private long getReadCountFromTableStats(String tableName) throws UnsupportedOperationException, IOException, InterruptedException + { + String tableStatsOutput = getContainerNode().execInContainer("nodetool", "tablestats", myKeyspaceName + "." + tableName).getStdout(); + long readCount = 0; + Pattern readCountPattern = Pattern.compile("Read Count:\\s+(\\d+)"); + Matcher readCountMatcher = readCountPattern.matcher(tableStatsOutput); + + if (readCountMatcher.find()) + { + readCount = Long.parseLong(readCountMatcher.group(1)); + } + + return readCount; + } + + private long getWriteCountFromTableStats(String tableName) throws UnsupportedOperationException, IOException, InterruptedException + { + String tableStatsOutput = getContainerNode().execInContainer("nodetool", "tablestats", myKeyspaceName + "." + tableName).getStdout(); + long writeCount = 0; + Pattern writeCountPattern = Pattern.compile("Write Count:\\s+(\\d+)"); + Matcher writeCountMatcher = writeCountPattern.matcher(tableStatsOutput); + + if (writeCountMatcher.find()) + { + writeCount = Long.parseLong(writeCountMatcher.group(1)); + } + + return writeCount; + } + +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCache.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCache.java new file mode 100644 index 000000000..9aad10bb5 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCache.java @@ -0,0 +1,165 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestLockCache +{ + private static final String DATA_CENTER = "DC1"; + private static final String RESOURCE = "RepairResource-91e32362-7af4-11e9-8f9e-2a86e4085a59-1"; + private static final int PRIORITY = 1; + private static final Map<String, String> METADATA = new HashMap<>(); + + @Mock + private LockCache.LockSupplier mockedLockSupplier; + + private LockCache myLockCache; + + @Before + public void setup() + { + myLockCache = new LockCache(mockedLockSupplier, 30L); + } + + @Test + public void testGetLock() throws LockException + { + DistributedLock expectedLock = doReturnLockOnGetLock(); + + assertGetLockRetrievesExpectedLock(expectedLock); + } + + @Test + public void testGetThrowingLockIsCached() throws LockException + { + LockException expectedExcetion = doThrowOnGetLock(); + + assertGetLockThrowsException(expectedExcetion); + + // Reset return type, locking should still throw + doReturnLockOnGetLock(); + + assertGetLockThrowsException(expectedExcetion); + } + + @Test + public void testGetMultipleLocks() throws LockException + { + String otherResource = "RepairResource-b2e33e60-7af6-11e9-8f9e-2a86e4085a59-1"; + + DistributedLock expectedLock = doReturnLockOnGetLock(RESOURCE); + DistributedLock expectedOtherLock = doReturnLockOnGetLock(otherResource); + + assertGetLockRetrievesExpectedLock(RESOURCE, expectedLock); + assertGetLockRetrievesExpectedLock(otherResource, expectedOtherLock); + } + + @Test + public void testGetOtherLockAfterThrowingOnAnotherResource() throws LockException + { + String otherResource = "RepairResource-b2e33e60-7af6-11e9-8f9e-2a86e4085a59-1"; + + LockException expectedException = doThrowOnGetLock(RESOURCE); + DistributedLock expectedOtherLock = doReturnLockOnGetLock(otherResource); + + assertGetLockThrowsException(RESOURCE, expectedException); + assertGetLockRetrievesExpectedLock(otherResource, expectedOtherLock); + } + + @Test + public void testGetLockAfterCachedExceptionHasExpired() throws LockException, InterruptedException + { + myLockCache = new LockCache(mockedLockSupplier, 20, TimeUnit.MILLISECONDS); + + LockException expectedException = doThrowOnGetLock(); + assertGetLockThrowsException(expectedException); + + Thread.sleep(20); + + DistributedLock expectedLock = doReturnLockOnGetLock(); + assertGetLockRetrievesExpectedLock(expectedLock); + } + + @Test + public void testEqualsContract() + { + EqualsVerifier.forClass(LockCache.LockKey.class).usingGetClass().verify(); + } + + private void assertGetLockRetrievesExpectedLock(DistributedLock expectedLock) throws LockException + { + assertGetLockRetrievesExpectedLock(RESOURCE, expectedLock); + } + + private void assertGetLockRetrievesExpectedLock(String resource, DistributedLock expectedLock) throws LockException + { + assertThat(myLockCache.getLock(DATA_CENTER, resource, PRIORITY, METADATA)).isSameAs(expectedLock); + assertThat(myLockCache.getCachedFailure(DATA_CENTER, resource)).isEmpty(); + } + + private void assertGetLockThrowsException(LockException expectedException) + { + assertGetLockThrowsException(RESOURCE, expectedException); + } + + private void assertGetLockThrowsException(String resource, LockException expectedException) + { + assertThatThrownBy(() -> myLockCache.getLock(DATA_CENTER, resource, PRIORITY, METADATA)).isSameAs(expectedException); + assertThat(myLockCache.getCachedFailure(DATA_CENTER, resource)).isNotEmpty(); + } + + private DistributedLock doReturnLockOnGetLock() throws LockException + { + return doReturnLockOnGetLock(RESOURCE); + } + + private DistributedLock doReturnLockOnGetLock(String resource) throws LockException + { + DistributedLock expectedLock = mock(DistributedLock.class); + when(mockedLockSupplier.getLock(eq(DATA_CENTER), eq(resource), eq(PRIORITY), eq(METADATA))).thenReturn(expectedLock); + return expectedLock; + } + + private LockException doThrowOnGetLock() throws LockException + { + return doThrowOnGetLock(RESOURCE); + } + + private LockException doThrowOnGetLock(String resource) throws LockException + { + LockException expectedException = new LockException(""); + when(mockedLockSupplier.getLock(eq(DATA_CENTER), eq(resource), eq(PRIORITY), eq(METADATA))).thenThrow(expectedException); + return expectedException; + } +} \ No newline at end of file diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java new file mode 100644 index 000000000..160096b44 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; + +import static org.assertj.core.api.Assertions.assertThat; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import java.util.ArrayList; +import java.util.List; +import org.junit.Test; + +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; + +public class TestLockCollection +{ + + @Test + public void testCloseAllLocks() + { + List<DummyLock> locks = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + locks.add(new DummyLock()); + } + + new LockCollection(locks).close(); + + for (DummyLock lock : locks) + { + assertThat(lock.closed).isTrue(); + } + } + + @Test + public void testCloseAllLocksOneThrowing() + { + List<DistributedLock> locks = new ArrayList<>(); + for (int i = 0; i < 4; i++) + { + locks.add(new DummyLock()); + } + + locks.add(new ThrowingLock()); + + for (int i = 0; i < 5; i++) + { + locks.add(new DummyLock()); + } + + new LockCollection(locks).close(); + + for (DistributedLock lock : locks) + { + if (lock instanceof DummyLock) + { + assertThat(((DummyLock) lock).closed).isTrue(); + } + } + } + + private class ThrowingLock implements DistributedLock + { + @Override + public void close() + { + throw new IllegalStateException(); + } + } +} + +class DummyLock implements LockFactory.DistributedLock +{ + public volatile boolean closed = false; + + @Override + public void close() + { + closed = true; + } +}