Skip to content

Commit

Permalink
Implement RepairScheduler, Schedule Manager and Related Things (#742)
Browse files Browse the repository at this point in the history
* Introduce RepairScheduler and SchedulerManager with Incremental Repair

* Fix PMD Violations and Rebase Change

* Removing test that requires lock to pass

* Fix Review Comments
  • Loading branch information
VictorCavichioli authored Oct 17, 2024
1 parent 56dd30a commit 1122325
Show file tree
Hide file tree
Showing 76 changed files with 7,769 additions and 66 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Version 1.0.0 (Not yet Released)

* Investigate Creation of RepairScheduler and ScheduleManager #714
* Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740
* Implement RepairGroup Class for Managing and Executing Repair Tasks - Issue #738
* Create IncrementalRepairTask Class - Issue #736
* Implement ScheduledRepairJob, ScheduledJob and ScheduledTask for Automated Recurring Task Scheduling in Cassandra - Issue #737
* Create RepairTask Abstract Class to Handle Repair Operations - Issue #717
* Create ReplicationState and ReplicationStateImpl Class for Managing Token-to-Replicas Mapping - Issue #722
* Create a RepairHistory to Store Information on Repair Operations Performed by ecChronos Agent #730
* Generate Unique EcChronos ID #678
* Create RepairConfiguration class for repair configurations - Issue #716
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.scheduler.SchedulerConfig;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Config
{
private ConnectionConfig myConnectionConfig = new ConnectionConfig();
private GlobalRepairConfig myRepairConfig = new GlobalRepairConfig();
private RunPolicyConfig myRunPolicyConfig = new RunPolicyConfig();
private SchedulerConfig mySchedulerConfig = new SchedulerConfig();
private RestServerConfig myRestServerConfig = new RestServerConfig();

Expand Down Expand Up @@ -68,6 +70,21 @@ public void setRepairConfig(final GlobalRepairConfig repairConfig)
}
}

@JsonProperty("run_policy")
public final RunPolicyConfig getRunPolicy()
{
return myRunPolicyConfig;
}

@JsonProperty("run_policy")
public final void setRunPolicyConfig(final RunPolicyConfig runPolicyConfig)
{
if (runPolicyConfig != null)
{
myRunPolicyConfig = runPolicyConfig;
}
}

@JsonProperty("scheduler")
public final SchedulerConfig getSchedulerConfig()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

import com.ericsson.bss.cassandra.ecchronos.application.config.repair.Interval;
import com.fasterxml.jackson.annotation.JsonProperty;

public class ConnectionConfig
{
private DistributedNativeConnection myCqlConnection = new DistributedNativeConnection();
private DistributedJmxConnection myJmxConnection = new DistributedJmxConnection();
private Interval myConnectionDelay = new Interval();

@JsonProperty("cql")
public final DistributedNativeConnection getCqlConnection()
Expand Down Expand Up @@ -58,26 +56,5 @@ public final String toString()
{
return String.format("Connection(cql=%s, jmx=%s)", myCqlConnection, myJmxConnection);
}
/**
* Sets the connectionDelay used to specify the time until the next connection.
*
* @param connectionDelay
* the local datacenter to set.
*/
@JsonProperty("connectionDelay")
public void setConnectionDelay(final Interval connectionDelay)
{
myConnectionDelay = connectionDelay;
}
/**
* Gets the connectionDelay used to specify the time until the next connection.
*
* @return the connectionDelay.
*/
@JsonProperty("connectionDelay")
public Interval getConnectionDelay()
{
return myConnectionDelay;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.Interval;
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.CertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.function.Supplier;

public class DistributedNativeConnection extends Connection<DistributedNativeConnectionProvider>
{
private AgentConnectionConfig myAgentConnectionConfig = new AgentConnectionConfig();
private Interval myConnectionDelay = new Interval();

public DistributedNativeConnection()
{
Expand All @@ -50,6 +53,28 @@ public final void setAgentConnectionConfig(final AgentConnectionConfig agentConn
myAgentConnectionConfig = agentConnectionConfig;
}

/**
* Sets the connectionDelay used to specify the time until the next connection.
*
* @param connectionDelay
* the local datacenter to set.
*/
@JsonProperty("connectionDelay")
public void setConnectionDelay(final Interval connectionDelay)
{
myConnectionDelay = connectionDelay;
}
/**
* Gets the connectionDelay used to specify the time until the next connection.
*
* @return the connectionDelay.
*/
@JsonProperty("connectionDelay")
public Interval getConnectionDelay()
{
return myConnectionDelay;
}

/**
* @return Class<?>[]
*/
Expand All @@ -60,7 +85,8 @@ protected Class<?>[] expectedConstructor()
{
Config.class,
Supplier.class,
CertificateHandler.class
CertificateHandler.class,
DefaultRepairConfigurationProvider.class,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.repair;

import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.converter.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.utils.converter.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RunPolicyConfig
{
private TimeBasedConfig myTimeBasedConfig = new TimeBasedConfig();

@JsonProperty("time_based")
public final TimeBasedConfig getTimeBasedConfig()
{
return myTimeBasedConfig;
}

@JsonProperty("time_based")
public final void setTimeBasedConfig(final TimeBasedConfig timeBasedConfig)
{
myTimeBasedConfig = timeBasedConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy;

import com.fasterxml.jackson.annotation.JsonProperty;

public class TimeBasedConfig
{
private String myKeyspaceName = "ecchronos";

@JsonProperty("keyspace")
public final String getKeyspaceName()
{
return myKeyspaceName;
}

@JsonProperty("keyspace")
public final void setKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains configurations related to run policy.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy;
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedNativeConnectionProviderImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,7 +64,8 @@ public class AgentNativeConnectionProvider implements DistributedNativeConnectio
public AgentNativeConnectionProvider(
final Config config,
final Supplier<Security.CqlSecurity> cqlSecuritySupplier,
final CertificateHandler certificateHandler
final CertificateHandler certificateHandler,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
{
AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig().getCqlConnection()
Expand All @@ -89,7 +91,9 @@ public AgentNativeConnectionProvider(
.withAgentType(agentConnectionConfig.getType())
.withLocalDatacenter(agentConnectionConfig.getLocalDatacenter())
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory);
.withSslEngineFactory(sslEngineFactory)
.withSchemaChangeListener(defaultRepairConfigurationProvider)
.withNodeStateListener(defaultRepairConfigurationProvider);
LOG.info("Preparing Agent Connection Config");
nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig);
LOG.info("Establishing Connection With Nodes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
Expand All @@ -42,6 +42,8 @@
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.CertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
Expand Down Expand Up @@ -141,6 +143,17 @@ public void addFormatters(final FormatterRegistry registry)
};
}

/**
* Provides a {@link DefaultRepairConfigurationProvider} bean.
*
* @return a {@link DefaultRepairConfigurationProvider} object.
*/
@Bean
public DefaultRepairConfigurationProvider defaultRepairConfigurationProvider()
{
return new DefaultRepairConfigurationProvider();
}

/**
* Configures the embedded web server factory with the host and port specified in the application configuration.
*
Expand Down Expand Up @@ -168,10 +181,11 @@ public ConfigurableServletWebServerFactory webServerFactory(final Config config)
*/
@Bean
public DistributedNativeConnectionProvider distributedNativeConnectionProvider(
final Config config
final Config config,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
{
return getDistributedNativeConnection(config, cqlSecurity::get);
return getDistributedNativeConnection(config, cqlSecurity::get, defaultRepairConfigurationProvider);
}

/**
Expand Down Expand Up @@ -253,12 +267,17 @@ private Config getConfiguration() throws ConfigurationException

private DistributedNativeConnectionProvider getDistributedNativeConnection(
final Config config,
final Supplier<Security.CqlSecurity> securitySupplier
final Supplier<Security.CqlSecurity> securitySupplier,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
{
Supplier<CqlTLSConfig> tlsSupplier = () -> securitySupplier.get().getCqlTlsConfig();
CertificateHandler certificateHandler = createCertificateHandler(tlsSupplier);
return new AgentNativeConnectionProvider(config, securitySupplier, certificateHandler);
return new AgentNativeConnectionProvider(
config,
securitySupplier,
certificateHandler,
defaultRepairConfigurationProvider);
}

private DistributedJmxConnectionProvider getDistributedJmxConnection(
Expand Down Expand Up @@ -299,7 +318,7 @@ private EccNodesSync getEccNodesSync(
final DistributedNativeConnectionProvider distributedNativeConnectionProvider
) throws UnknownHostException, EcChronosException, ConfigurationException
{
Interval connectionDelay = config().getConnectionConfig().getConnectionDelay();
Interval connectionDelay = config().getConnectionConfig().getCqlConnection().getConnectionDelay();
EccNodesSync myEccNodesSync = EccNodesSync.newBuilder()
.withInitialNodesList(distributedNativeConnectionProvider.getNodes())
.withSession(distributedNativeConnectionProvider.getCqlSession())
Expand Down
Loading

0 comments on commit 1122325

Please sign in to comment.