Skip to content

Commit

Permalink
ISPN-16670 Retry server failed on client after timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Oct 28, 2024
1 parent 708b6aa commit 1f57c03
Show file tree
Hide file tree
Showing 19 changed files with 214 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private void actualStart() {
executorFactory = Util.getInstance(configuration.asyncExecutorFactory().factoryClass());
}
asyncExecutorService = executorFactory.getExecutor(configuration.asyncExecutorFactory().properties());
channelFactory.start(configuration, marshaller, asyncExecutorService,
channelFactory.start(marshaller, asyncExecutorService,
listenerNotifier, marshallerRegistry);
counterManager.start(channelFactory, configuration, listenerNotifier);

Expand Down Expand Up @@ -418,7 +418,7 @@ private static void registerDefaultSchemas(SerializationContext ctx, String... c
}

public ChannelFactory createChannelFactory() {
return new ChannelFactory(new CodecHolder(configuration.version().getCodec()));
return new ChannelFactory(configuration, new CodecHolder(configuration.version().getCodec()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public ConfigurationBuilder dnsResolverNegativeTTL(int negativeTTL) {
return builder.dnsResolverNegativeTTL(negativeTTL);
}

@Override
public ConfigurationBuilder serverFailureTimeout(int timeoutInMilliseconds) {
return builder.serverFailureTimeout(timeoutInMilliseconds);
}

@Override
public ConfigurationBuilder forceReturnValues(boolean forceReturnValues) {
return builder.forceReturnValues(forceReturnValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.REQUEST_BALANCING_STRATEGY;
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.SASL_MECHANISM;
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.SASL_PROPERTIES_PREFIX;
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.SERVER_FAILURE_TIMEOUT;
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.SERVER_LIST;
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.SNI_HOST_NAME;
import static org.infinispan.client.hotrod.impl.ConfigurationProperties.SO_TIMEOUT;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class Configuration {
private final int dnsResolverMinTTL;
private final int dnsResolverMaxTTL;
private final int dnsResolverNegativeTTL;
private final int serverFailureTimeout;

public Configuration(ExecutorFactoryConfiguration asyncExecutorFactory, Supplier<FailoverRequestBalancingStrategy> balancingStrategyFactory, ClassLoader classLoader,
ClientIntelligence clientIntelligence, ConnectionPoolConfiguration connectionPool, int connectionTimeout, Class<? extends ConsistentHash>[] consistentHashImpl,
Expand All @@ -131,7 +133,8 @@ public Configuration(ExecutorFactoryConfiguration asyncExecutorFactory, Supplier
TransactionConfiguration transaction, StatisticsConfiguration statistics, Features features,
List<SerializationContextInitializer> contextInitializers,
Map<String, RemoteCacheConfiguration> remoteCaches,
TransportFactory transportFactory, boolean tracingPropagationEnabled) {
TransportFactory transportFactory, boolean tracingPropagationEnabled,
int serverFailureTimeout) {
this.asyncExecutorFactory = asyncExecutorFactory;
this.balancingStrategyFactory = balancingStrategyFactory;
this.maxRetries = maxRetries;
Expand Down Expand Up @@ -166,6 +169,7 @@ public Configuration(ExecutorFactoryConfiguration asyncExecutorFactory, Supplier
this.remoteCaches = remoteCaches;
this.transportFactory = transportFactory;
this.tracingPropagationEnabled = tracingPropagationEnabled;
this.serverFailureTimeout = serverFailureTimeout;
}

public ExecutorFactoryConfiguration asyncExecutorFactory() {
Expand Down Expand Up @@ -384,6 +388,15 @@ public boolean tracingPropagationEnabled() {
return tracingPropagationEnabled;
}

/**
* Controls how long a server is marked as failed in milliseconds.
* Default is 30_000 milliseconds or 30 seconds.
* @return time in milliseconds
*/
public int serverFailureTimeout() {
return serverFailureTimeout;
}

@Override
public String toString() {
return "Configuration [asyncExecutorFactory=" + asyncExecutorFactory + ", balancingStrategyFactory=()->" + balancingStrategyFactory.get()
Expand All @@ -398,6 +411,7 @@ public String toString() {
+ ", remoteCaches= " + remoteCaches
+ ", transaction=" + transaction
+ ", statistics=" + statistics
+ ", serverFailureTimeout=" + serverFailureTimeout
+ "]";
}

Expand Down Expand Up @@ -432,6 +446,7 @@ public Properties properties() {
properties.setProperty(VALUE_SIZE_ESTIMATE, valueSizeEstimate());
properties.setProperty(MAX_RETRIES, maxRetries());
properties.setProperty(STATISTICS, statistics().enabled());
properties.setProperty(SERVER_FAILURE_TIMEOUT, serverFailureTimeout());

properties.setProperty(DNS_RESOLVER_MIN_TTL, dnsResolverMinTTL);
properties.setProperty(DNS_RESOLVER_MAX_TTL, dnsResolverMaxTTL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class ConfigurationBuilder implements ConfigurationChildBuilder, Builder<
private boolean tcpKeepAlive = false;
private int valueSizeEstimate = ConfigurationProperties.DEFAULT_VALUE_SIZE;
private int maxRetries = ConfigurationProperties.DEFAULT_MAX_RETRIES;
private int serverFailureTimeout = ConfigurationProperties.DEFAULT_SERVER_FAILURE_TIMEOUT;
private final NearCacheConfigurationBuilder nearCache;
private final List<String> allowListRegExs = new ArrayList<>();
private int batchSize = ConfigurationProperties.DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -357,6 +358,12 @@ public ConfigurationBuilder maxRetries(int maxRetries) {
return this;
}

@Override
public ConfigurationBuilder serverFailureTimeout(int timeoutInMilliseconds) {
this.serverFailureTimeout = timeoutInMilliseconds;
return this;
}

@Override
public ConfigurationBuilder addJavaSerialAllowList(String... regEx) {
this.allowListRegExs.addAll(Arrays.asList(regEx));
Expand Down Expand Up @@ -485,6 +492,9 @@ public ConfigurationBuilder withProperties(Properties properties) {
if (typed.containsKey(ConfigurationProperties.MAX_RETRIES)) {
this.maxRetries(typed.getIntProperty(ConfigurationProperties.MAX_RETRIES, maxRetries, true));
}
if (typed.containsKey(ConfigurationProperties.SERVER_FAILURE_TIMEOUT)) {
this.serverFailureTimeout(typed.getIntProperty(ConfigurationProperties.SERVER_FAILURE_TIMEOUT, serverFailureTimeout, true));
}
if (typed.containsKey(ConfigurationProperties.DNS_RESOLVER_MIN_TTL)) {
this.dnsResolverMinTTL(typed.getIntProperty(ConfigurationProperties.DNS_RESOLVER_MIN_TTL, dnsResolverMinTTL, true));
}
Expand Down Expand Up @@ -626,7 +636,7 @@ public Configuration create() {
forceReturnValues, keySizeEstimate, buildMarshaller, buildMarshallerClass, protocolVersion, servers, socketTimeout,
security.create(), tcpNoDelay, tcpKeepAlive, valueSizeEstimate, maxRetries, nearCache.create(),
serverClusterConfigs, allowListRegExs, batchSize, transaction.create(), statistics.create(), features,
contextInitializers, remoteCaches, transportFactory, tracingPropagationEnabled);
contextInitializers, remoteCaches, transportFactory, tracingPropagationEnabled, serverFailureTimeout);
}

// Method that handles default marshaller - needed as a placeholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,17 @@ public interface ConfigurationChildBuilder {
*/
ConfigurationBuilder maxRetries(int maxRetries);

/**
* The time for a failed server to be cleared allowing for it to attempt to reconnect at a later point.
* <p>
* If the value is less than or equal to 0 it will be disabled, meaning a failed server will not be reconnected to
* until all configured servers have failed or a topology update ({@link ClientIntelligence#TOPOLOGY_AWARE} and
* {@link ClientIntelligence#HASH_DISTRIBUTION_AWARE} only)
* @param timeoutInMilliseconds the timeout to attempt to clear a failed server in milliseconds
* @return this builder
*/
ConfigurationBuilder serverFailureTimeout(int timeoutInMilliseconds);

/**
* List of regular expressions for classes that can be deserialized using standard Java deserialization
* when reading data that might have been stored with a different endpoint, e.g. REST.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
* <td>The {@link org.infinispan.client.hotrod.configuration.ConfigurationBuilder#connectionTimeout(int) timeout} for connections</td>
* </tr>
* <tr>
* <td><b>infinispan.client.hotrod.server_failure_timeout</b></td>
* <td>Integer</td>
* <td>30000</td>
* <td>The {@link org.infinispan.client.hotrod.configuration.ConfigurationBuilder#serverFailureTimeout(int) timeout} for a failed server when it is retried</td>
* </tr>
* <tr>
* <td><b>infinispan.client.hotrod.max_retries</b></td>
* <td>Integer</td>
* <td>2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ConfigurationProperties {
public static final String CONNECT_TIMEOUT = ICH + "connect_timeout";
public static final String PROTOCOL_VERSION = ICH + "protocol_version";
public static final String TRANSPORT_FACTORY = ICH + "transport_factory";
public static final String SERVER_FAILURE_TIMEOUT = ICH + "server_failure_timeout";
// Encryption properties
public static final String USE_SSL = ICH + "use_ssl";
public static final String KEY_STORE_FILE_NAME = ICH + "key_store_file_name";
Expand Down Expand Up @@ -155,6 +156,7 @@ public class ConfigurationProperties {
public static final int DEFAULT_MAX_WAIT = -1;
public static final int DEFAULT_MIN_IDLE = -1;
public static final boolean DEFAULT_TRACING_PROPAGATION_ENABLED = true;
public static final int DEFAULT_SERVER_FAILURE_TIMEOUT = 30_000;

private final TypedProperties props;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -62,6 +63,8 @@
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.SslContextFactory;

import com.github.benmanes.caffeine.cache.Caffeine;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -94,7 +97,6 @@ public class ChannelFactory {
private EventLoopGroup eventLoopGroup;
private ExecutorService executorService;
private OperationsFactory operationsFactory;
private Configuration configuration;
private int maxRetries;
private Marshaller marshaller;
private ClientListenerNotifier listenerNotifier;
Expand All @@ -110,23 +112,30 @@ public class ChannelFactory {
private CompletableFuture<Void> clusterSwitchStage;
// Servers for which the last connection attempt failed and which have no established connections
@GuardedBy("lock")
private final Set<SocketAddress> failedServers = new HashSet<>();
private final Set<SocketAddress> failedServers;
private final Configuration configuration;
private final CodecHolder codecHolder;
private AddressResolverGroup<?> dnsResolver;
private SslContext sslContext;
private FileWatcher watcher;

public ChannelFactory(CodecHolder codecHolder) {
public ChannelFactory(Configuration configuration, CodecHolder codecHolder) {
this.configuration = configuration;
this.codecHolder = codecHolder;

this.failedServers = configuration.serverFailureTimeout() > 0 ?
Collections.newSetFromMap(Caffeine.newBuilder()
.expireAfterWrite(configuration.serverFailureTimeout(), TimeUnit.MILLISECONDS)
.<SocketAddress, Boolean>build().asMap())
: new HashSet<>();
}

public void start(Configuration configuration, Marshaller marshaller, ExecutorService executorService,
public void start(Marshaller marshaller, ExecutorService executorService,
ClientListenerNotifier listenerNotifier, MarshallerRegistry marshallerRegistry) {
this.marshallerRegistry = marshallerRegistry;
lock.writeLock().lock();
try {
this.marshaller = marshaller;
this.configuration = configuration;
this.executorService = executorService;
this.listenerNotifier = listenerNotifier;
int asyncThreads = maxAsyncThreads(executorService, configuration);
Expand Down Expand Up @@ -881,6 +890,10 @@ public Configuration getConfiguration() {
return configuration;
}

public Set<SocketAddress> getFailedServers() {
return failedServers;
}

public long getRetries() {
return totalRetries.longValue();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.infinispan.client.hotrod;

import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.withRemoteCacheManager;
import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;
import static org.testng.AssertJUnit.assertFalse;

import org.infinispan.client.hotrod.configuration.ClientIntelligence;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.client.hotrod.test.RemoteCacheManagerCallable;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "client.hotrod.BasicClientIntelligenceTest")
public class BasicClientIntelligenceTest extends MultiHotRodServersTest {
private final ConfigurationBuilder builder = hotRodCacheConfiguration(
getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
@Override
protected void createCacheManagers() throws Throwable {
createHotRodServersWithoutClients(2, builder);
}

public void testOneServerDiedAndComesBack() {
int initialPort = server(1).getPort();

org.infinispan.client.hotrod.configuration.ConfigurationBuilder clientBuilder =
new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
// Retry after every half second to make test faster
clientBuilder.serverFailureTimeout(500);
clientBuilder.addServers(HotRodClientTestingUtil.getServersString(server(0), server(1)));
clientBuilder.clientIntelligence(ClientIntelligence.BASIC);

withRemoteCacheManager(new RemoteCacheManagerCallable(
new InternalRemoteCacheManager(clientBuilder.build())) {
@Override
public void call() {
RemoteCache<Object, Object> cache = rcm.getCache();
ChannelFactory cf = rcm.getChannelFactory();
assertFalse(cache.containsKey("k"));
killServer(1);
eventuallyEquals(1, () -> {
assertFalse(cache.containsKey("k"));
return cf.getFailedServers().size();
});

addHotRodServer(builder, initialPort);

eventuallyEquals(0, () -> {
assertFalse(cache.containsKey("k"));
return cf.getFailedServers().size();
});
}
});
}

public void testBasicHasSingleServerThatDied() {
int initialPort = server(1).getPort();

org.infinispan.client.hotrod.configuration.ConfigurationBuilder clientBuilder =
new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
clientBuilder.serverFailureTimeout(500);
clientBuilder.addServers(HotRodClientTestingUtil.getServersString(server(1)));
clientBuilder.clientIntelligence(ClientIntelligence.BASIC);

withRemoteCacheManager(new RemoteCacheManagerCallable(
new InternalRemoteCacheManager(clientBuilder.build())) {
@Override
public void call() {
RemoteCache<Object, Object> cache = rcm.getCache();
ChannelFactory cf = rcm.getChannelFactory();
assertFalse(cache.containsKey("k"));
killServer(1);
for (int i = 0; i < 10; i++) {
Exceptions.expectException(TransportException.class, () -> cache.containsKey("k"));
}
eventuallyEquals(1, () -> cf.getFailedServers().size());

addHotRodServer(builder, initialPort);

eventuallyEquals(0, () -> {
assertFalse(cache.containsKey("k"));
return cf.getFailedServers().size();
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private static class ControlledChannelFactory extends ChannelFactory {
private BiConsumer<SocketAddress, ChannelOperation> onFetch;

public ControlledChannelFactory(Configuration cfg) {
super(new CodecHolder(cfg.version().getCodec()));
super(cfg, new CodecHolder(cfg.version().getCodec()));
}

public void useOnFetch(BiConsumer<SocketAddress, ChannelOperation> onFetch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private static class CustomChannelFactory extends ChannelFactory {
private Supplier<Boolean> executeInstead;

public CustomChannelFactory(Configuration cfg) {
super(new CodecHolder(cfg.version().getCodec()));
super(cfg, new CodecHolder(cfg.version().getCodec()));
this.configuration = cfg;
this.executeInstead = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

import java.net.SocketAddress;

import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.impl.protocol.CodecHolder;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

public class TestChannelFactory extends ChannelFactory {
public TestChannelFactory(CodecHolder codecHolder) {
super(codecHolder);
public TestChannelFactory(Configuration configuration, CodecHolder codecHolder) {
super(configuration, codecHolder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ChannelFactory getChannelFactory() {
@Override
public ChannelFactory createChannelFactory() {
if (customChannelFactory != null) return customChannelFactory;
if (testReplay) return new TestChannelFactory(new CodecHolder(getConfiguration().version().getCodec()));
if (testReplay) return new TestChannelFactory(getConfiguration(), new CodecHolder(getConfiguration().version().getCodec()));
return super.createChannelFactory();
}
}
Loading

0 comments on commit 1f57c03

Please sign in to comment.