Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RepairScheduler, Schedule Manager and Related Things #742

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Version 1.0.0 (Not yet Released)

* Investigate Creation of RepairScheduler and ScheduleManager #714
* Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740
* Implement RepairGroup Class for Managing and Executing Repair Tasks - Issue #738
* Create IncrementalRepairTask Class - Issue #736
* Implement ScheduledRepairJob, ScheduledJob and ScheduledTask for Automated Recurring Task Scheduling in Cassandra - Issue #737
* Create RepairTask Abstract Class to Handle Repair Operations - Issue #717
* Create ReplicationState and ReplicationStateImpl Class for Managing Token-to-Replicas Mapping - Issue #722
* Create a RepairHistory to Store Information on Repair Operations Performed by ecChronos Agent #730
* Generate Unique EcChronos ID #678
* Create RepairConfiguration class for repair configurations - Issue #716
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.scheduler.SchedulerConfig;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Config
{
private ConnectionConfig myConnectionConfig = new ConnectionConfig();
private GlobalRepairConfig myRepairConfig = new GlobalRepairConfig();
private RunPolicyConfig myRunPolicyConfig = new RunPolicyConfig();
private SchedulerConfig mySchedulerConfig = new SchedulerConfig();
private RestServerConfig myRestServerConfig = new RestServerConfig();

Expand Down Expand Up @@ -68,6 +70,21 @@ public void setRepairConfig(final GlobalRepairConfig repairConfig)
}
}

@JsonProperty("run_policy")
public final RunPolicyConfig getRunPolicy()
{
return myRunPolicyConfig;
}

@JsonProperty("run_policy")
public final void setRunPolicyConfig(final RunPolicyConfig runPolicyConfig)
{
if (runPolicyConfig != null)
{
myRunPolicyConfig = runPolicyConfig;
}
}

@JsonProperty("scheduler")
public final SchedulerConfig getSchedulerConfig()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

import com.ericsson.bss.cassandra.ecchronos.application.config.repair.Interval;
import com.fasterxml.jackson.annotation.JsonProperty;

public class ConnectionConfig
{
private DistributedNativeConnection myCqlConnection = new DistributedNativeConnection();
private DistributedJmxConnection myJmxConnection = new DistributedJmxConnection();
private Interval myConnectionDelay = new Interval();

@JsonProperty("cql")
public final DistributedNativeConnection getCqlConnection()
Expand Down Expand Up @@ -58,26 +56,5 @@ public final String toString()
{
return String.format("Connection(cql=%s, jmx=%s)", myCqlConnection, myJmxConnection);
}
/**
* Sets the connectionDelay used to specify the time until the next connection.
*
* @param connectionDelay
* the local datacenter to set.
*/
@JsonProperty("connectionDelay")
public void setConnectionDelay(final Interval connectionDelay)
{
myConnectionDelay = connectionDelay;
}
/**
* Gets the connectionDelay used to specify the time until the next connection.
*
* @return the connectionDelay.
*/
@JsonProperty("connectionDelay")
public Interval getConnectionDelay()
{
return myConnectionDelay;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.Interval;
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.CertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.function.Supplier;

public class DistributedNativeConnection extends Connection<DistributedNativeConnectionProvider>
{
private AgentConnectionConfig myAgentConnectionConfig = new AgentConnectionConfig();
private Interval myConnectionDelay = new Interval();

public DistributedNativeConnection()
{
Expand All @@ -50,6 +53,28 @@ public final void setAgentConnectionConfig(final AgentConnectionConfig agentConn
myAgentConnectionConfig = agentConnectionConfig;
}

/**
* Sets the connectionDelay used to specify the time until the next connection.
*
* @param connectionDelay
* the local datacenter to set.
*/
@JsonProperty("connectionDelay")
public void setConnectionDelay(final Interval connectionDelay)
{
myConnectionDelay = connectionDelay;
}
/**
* Gets the connectionDelay used to specify the time until the next connection.
*
* @return the connectionDelay.
*/
@JsonProperty("connectionDelay")
public Interval getConnectionDelay()
{
return myConnectionDelay;
}

/**
* @return Class<?>[]
*/
Expand All @@ -60,7 +85,8 @@ protected Class<?>[] expectedConstructor()
{
Config.class,
Supplier.class,
CertificateHandler.class
CertificateHandler.class,
DefaultRepairConfigurationProvider.class,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.repair;

import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.converter.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.utils.converter.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RunPolicyConfig
{
private TimeBasedConfig myTimeBasedConfig = new TimeBasedConfig();

@JsonProperty("time_based")
public final TimeBasedConfig getTimeBasedConfig()
{
return myTimeBasedConfig;
}

@JsonProperty("time_based")
public final void setTimeBasedConfig(final TimeBasedConfig timeBasedConfig)
{
myTimeBasedConfig = timeBasedConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy;

import com.fasterxml.jackson.annotation.JsonProperty;

public class TimeBasedConfig
{
private String myKeyspaceName = "ecchronos";

@JsonProperty("keyspace")
public final String getKeyspaceName()
{
return myKeyspaceName;
}

@JsonProperty("keyspace")
public final void setKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains configurations related to run policy.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy;
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedNativeConnectionProviderImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,7 +64,8 @@ public class AgentNativeConnectionProvider implements DistributedNativeConnectio
public AgentNativeConnectionProvider(
final Config config,
final Supplier<Security.CqlSecurity> cqlSecuritySupplier,
final CertificateHandler certificateHandler
final CertificateHandler certificateHandler,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
{
AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig().getCqlConnection()
Expand All @@ -89,7 +91,9 @@ public AgentNativeConnectionProvider(
.withAgentType(agentConnectionConfig.getType())
.withLocalDatacenter(agentConnectionConfig.getLocalDatacenter())
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory);
.withSslEngineFactory(sslEngineFactory)
.withSchemaChangeListener(defaultRepairConfigurationProvider)
.withNodeStateListener(defaultRepairConfigurationProvider);
LOG.info("Preparing Agent Connection Config");
nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig);
LOG.info("Establishing Connection With Nodes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
Expand All @@ -42,6 +42,8 @@
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.CertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
Expand Down Expand Up @@ -141,6 +143,17 @@ public void addFormatters(final FormatterRegistry registry)
};
}

/**
* Provides a {@link DefaultRepairConfigurationProvider} bean.
*
* @return a {@link DefaultRepairConfigurationProvider} object.
*/
@Bean
public DefaultRepairConfigurationProvider defaultRepairConfigurationProvider()
{
return new DefaultRepairConfigurationProvider();
}

/**
* Configures the embedded web server factory with the host and port specified in the application configuration.
*
Expand Down Expand Up @@ -168,10 +181,11 @@ public ConfigurableServletWebServerFactory webServerFactory(final Config config)
*/
@Bean
public DistributedNativeConnectionProvider distributedNativeConnectionProvider(
final Config config
final Config config,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
{
return getDistributedNativeConnection(config, cqlSecurity::get);
return getDistributedNativeConnection(config, cqlSecurity::get, defaultRepairConfigurationProvider);
}

/**
Expand Down Expand Up @@ -253,12 +267,17 @@ private Config getConfiguration() throws ConfigurationException

private DistributedNativeConnectionProvider getDistributedNativeConnection(
final Config config,
final Supplier<Security.CqlSecurity> securitySupplier
final Supplier<Security.CqlSecurity> securitySupplier,
final DefaultRepairConfigurationProvider defaultRepairConfigurationProvider
)
{
Supplier<CqlTLSConfig> tlsSupplier = () -> securitySupplier.get().getCqlTlsConfig();
CertificateHandler certificateHandler = createCertificateHandler(tlsSupplier);
return new AgentNativeConnectionProvider(config, securitySupplier, certificateHandler);
return new AgentNativeConnectionProvider(
config,
securitySupplier,
certificateHandler,
defaultRepairConfigurationProvider);
}

private DistributedJmxConnectionProvider getDistributedJmxConnection(
Expand Down Expand Up @@ -299,7 +318,7 @@ private EccNodesSync getEccNodesSync(
final DistributedNativeConnectionProvider distributedNativeConnectionProvider
) throws UnknownHostException, EcChronosException, ConfigurationException
{
Interval connectionDelay = config().getConnectionConfig().getConnectionDelay();
Interval connectionDelay = config().getConnectionConfig().getCqlConnection().getConnectionDelay();
EccNodesSync myEccNodesSync = EccNodesSync.newBuilder()
.withInitialNodesList(distributedNativeConnectionProvider.getNodes())
.withSession(distributedNativeConnectionProvider.getCqlSession())
Expand Down
Loading