Skip to content

Commit

Permalink
Introduce NodeResolver and ReplicationState (#723)
Browse files Browse the repository at this point in the history
* Introduce NodeResolver and ReplicationState

* Fix PMD Violation
  • Loading branch information
VictorCavichioli authored Sep 20, 2024
1 parent b5178cc commit 5abdc18
Show file tree
Hide file tree
Showing 28 changed files with 2,317 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Version 1.0.0 (Not yet Release)

* Implement ReplicationStateImpl to Manage and Cache Token Range to Replica Mappings - Issue #719
* Implement NodeResolverImpl to Resolve Nodes by IP Address and UUID - Issue #718
* Specify Interval for Next Connection - Issue #674
* Retry Policy for Jmx Connection - Issue #700
* Update Architecture and Tests Documentations to Add the Agent Features and The cassandra-test-image - Issue #707
Expand Down
12 changes: 12 additions & 0 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>core.impl</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Spring boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.datastax.oss.driver.api.core.CqlSession;
import com.ericsson.bss.cassandra.ecchronos.application.config.Interval;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.CqlTLSConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.ReloadingCertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.data.exceptions.EcChronosException;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;

Expand Down Expand Up @@ -220,6 +225,22 @@ public RetrySchedulerService retrySchedulerService(final Config config,
return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider);
}

@Bean
public NodeResolver nodeResolver(final DistributedNativeConnectionProvider distributedNativeConnectionProvider)
{
CqlSession session = distributedNativeConnectionProvider.getCqlSession();
return new NodeResolverImpl(session);
}

@Bean
public ReplicationState replicationState(
final DistributedNativeConnectionProvider distributedNativeConnectionProvider,
final NodeResolver nodeResolver)
{
CqlSession session = distributedNativeConnectionProvider.getCqlSession();
return new ReplicationStateImpl(nodeResolver, session);
}

private Security getSecurityConfig() throws ConfigurationException
{
return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class);
Expand Down
102 changes: 102 additions & 0 deletions core.impl/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2024 Telefonaktiebolaget LM Ericsson
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>agent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<artifactId>core.impl</artifactId>

<dependencies>
<!-- Internal -->
<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.ericsson.bss.cassandra.ecchronos</groupId>
<artifactId>connection</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
</dependency>

<!-- Cassandra driver -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata;

import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
import java.net.InetAddress;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;

public class NodeResolverImpl implements NodeResolver
{
private final ConcurrentMap<InetAddress, DriverNode> addressToNodeMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, DriverNode> idToNodeMap = new ConcurrentHashMap<>();

private final CqlSession session;

public NodeResolverImpl(final CqlSession aSession)
{
this.session = aSession;
}

@Override
public final Optional<DriverNode> fromIp(final InetAddress inetAddress)
{
DriverNode node = addressToNodeMap.get(inetAddress);

if (node == null)
{
node = addressToNodeMap.computeIfAbsent(inetAddress, address -> lookup(inetAddress));
}
else if (!inetAddress.equals(node.getPublicAddress()))
{
// IP mapping is wrong, we should remove the old entry and retry
addressToNodeMap.remove(inetAddress, node);
return fromIp(inetAddress);
}

return Optional.ofNullable(node);
}

@Override
public final Optional<DriverNode> fromUUID(final UUID nodeId)
{
return Optional.ofNullable(resolve(nodeId));
}

private DriverNode resolve(final UUID nodeId)
{
DriverNode node = idToNodeMap.get(nodeId);
if (node == null)
{
node = idToNodeMap.computeIfAbsent(nodeId, this::lookup);
}

return node;
}

private DriverNode lookup(final UUID nodeId)
{
Metadata metadata = session.getMetadata();
for (Node node : metadata.getNodes().values())
{
if (node.getHostId().equals(nodeId))
{
return new DriverNode(node);
}
}
return null;
}

private DriverNode lookup(final InetAddress inetAddress)
{
Metadata metadata = session.getMetadata();
for (Node node : metadata.getNodes().values())
{
if (node.getBroadcastAddress().get().getAddress().equals(inetAddress))
{
return resolve(node.getHostId());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains the implementations and resources for mapping node metadata.
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata;
Loading

0 comments on commit 5abdc18

Please sign in to comment.