Skip to content

Commit

Permalink
Support failing out requests to clusters (#777)
Browse files Browse the repository at this point in the history
Co-authored-by: John Stewart <[email protected]>
Co-authored-by: Chris Stufflebeam <[email protected]>
  • Loading branch information
3 people authored Apr 28, 2022
1 parent 78c2dc0 commit f82cebe
Show file tree
Hide file tree
Showing 33 changed files with 1,495 additions and 32 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.33.4] - 2022-04-26
- Support failout redirection in D2 client.

## [29.33.3] - 2022-04-25
- Add end-to-end integration tests for D2 client.

Expand Down Expand Up @@ -5221,7 +5224,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.33.3...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.33.4...master
[29.33.4]: https://github.com/linkedin/rest.li/compare/v29.33.3...v29.33.4
[29.33.3]: https://github.com/linkedin/rest.li/compare/v29.33.2...v29.33.3
[29.33.2]: https://github.com/linkedin/rest.li/compare/v29.33.1...v29.33.2
[29.33.1]: https://github.com/linkedin/rest.li/compare/v29.33.0...v29.33.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.ReflectionException;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -109,11 +111,6 @@ public void setup() throws Exception
@BeforeMethod
public void init() throws Exception
{
// Bring up all echo servers
if (_echoServers != null)
{
stopAllEchoServers(_echoServers);
}
startEchoServers(NUMBER_OF_HOSTS);
assertAllEchoServersRunning(_echoServers);
assertAllEchoServersRegistered(_cli.getZKClient(), _zkUriString, _echoServers);
Expand All @@ -129,6 +126,16 @@ public void init() throws Exception
callback.get(5, TimeUnit.SECONDS);
}

@AfterMethod
public void teardown()
throws Exception
{
if (_echoServers != null)
{
stopAllEchoServers(_echoServers);
}
}

/**
* Given that all the downstream hosts in the cluster are healthy and have a uniform weight,
* the requests sending from the clients should result in an even distribution. The total call count
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.linkedin.d2.loadbalancer.failout;

import com.linkedin.d2.balancer.LoadBalancerState;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfig;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfigProvider;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfigProviderFactory;
import com.linkedin.d2.balancer.clusterfailout.ZKFailoutConfigProvider;
import com.linkedin.d2.balancer.properties.FailoutProperties;

import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class MockFailoutConfigProviderFactory implements FailoutConfigProviderFactory
{
private FailoutConfigProvider _failoutConfigProvider;

@Override
public FailoutConfigProvider create(LoadBalancerState loadBalancerState)
{
if (_failoutConfigProvider == null)
{
_failoutConfigProvider = new MockFailoutConfigProvider(loadBalancerState);
}
return _failoutConfigProvider;
}

public static class MockFailoutConfigProvider extends ZKFailoutConfigProvider
{

public MockFailoutConfigProvider(LoadBalancerState loadBalancerState)
{
super(loadBalancerState);
}

@Nullable
@Override
public FailoutConfig createFailoutConfig(@Nonnull String clusterName, @Nullable FailoutProperties failoutProperties)
{
if (failoutProperties == null)
{
return null;
}
return new FailoutConfig()
{
@Override
public boolean isFailedOut()
{
return false;
}

@Override
public Set<String> getPeerClusters()
{
return null;
}
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package com.linkedin.d2.loadbalancer.failout;

import com.linkedin.common.callback.FutureCallback;
import com.linkedin.common.util.None;
import com.linkedin.d2.D2BaseTest;
import com.linkedin.d2.balancer.LoadBalancerState;
import com.linkedin.d2.balancer.clients.DynamicClient;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfigProviderFactory;
import com.linkedin.d2.balancer.properties.ClusterProperties;
import com.linkedin.d2.balancer.properties.ClusterPropertiesJsonSerializer;
import com.linkedin.d2.balancer.properties.ClusterStoreProperties;
import com.linkedin.d2.balancer.properties.FailoutProperties;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancer;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState;
import com.linkedin.d2.balancer.util.LoadBalancerClientCli;
import com.linkedin.d2.balancer.util.LoadBalancerEchoServer;
import com.linkedin.d2.discovery.stores.zk.ZKServer;
import com.linkedin.d2.discovery.stores.zk.ZKTestUtil;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.util.NamedThreadFactory;
import com.linkedin.test.util.retry.ThreeRetries;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

public class TestLoadBalancerWithFailout extends D2BaseTest
{
private static final String D2_CONFIG_FILE = "d2_config_example.json";
private static final String ZK_HOST = "127.0.0.1";
private static final int ECHO_SERVER_PORT_START = 2861;
private static final int NUMBER_OF_HOSTS = 2;

private LoadBalancerClientCli _cli;
private List<LoadBalancerEchoServer> _echoServers;
private int _zkPort;
private String _zkUriString;

private SimpleLoadBalancerState _state;
private SimpleLoadBalancer _loadBalancer;
private FailoutConfigProviderFactory _failoutConfigProviderFactory;

@BeforeTest
public void setup()
throws Exception
{
// Start ZK Server
ZKServer zkServer = ZKTestUtil.startZKServer();
_zkPort = zkServer.getPort();
String zkHosts = ZK_HOST + ":" + _zkPort;
_zkUriString = "zk://" + zkHosts;

// Register D2 clusters/services
URL d2Config = getClass().getClassLoader().getResource(D2_CONFIG_FILE);
if (d2Config != null)
{
LoadBalancerClientCli.runDiscovery(zkHosts, "/d2", new File(d2Config.toURI()));
}

// Set up SimpleLoadBalancerState and D2 Client
_cli = new LoadBalancerClientCli(zkHosts, "/d2");
}

@BeforeMethod
public void init()
throws Exception
{
startEchoServers(NUMBER_OF_HOSTS);
assertAllEchoServersRunning(_echoServers);
assertAllEchoServersRegistered(_cli.getZKClient(), _zkUriString, _echoServers);

_failoutConfigProviderFactory = new MockFailoutConfigProviderFactory();

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("D2 PropertyEventExecutor"));
_state = LoadBalancerClientCli.createSimpleLoadBalancerState(_cli.getZKClient(), _zkUriString, "/d2", executor);
_loadBalancer = new SimpleLoadBalancer(_state, 5, TimeUnit.SECONDS, executor, _failoutConfigProviderFactory);

// Start the load balancer
FutureCallback<None> callback = new FutureCallback<>();
_loadBalancer.start(callback);
callback.get(5, TimeUnit.SECONDS);

_loadBalancer.listenToCluster("cluster-1", true, new LoadBalancerState.NullStateListenerCallback());

DynamicClient client = new DynamicClient(_loadBalancer, null);
URI uri = URI.create("d2://" + "service-1_1");

// Use one request to trigger load balancer state update
RestRequest trigger = new RestRequestBuilder(uri).build();
try
{
client.restRequest(trigger, new RequestContext()).get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException("Failed the test because thread was interrupted");
}
}

@AfterMethod
public void teardown()
throws Exception
{
if (_echoServers != null)
{
stopAllEchoServers(_echoServers);
}
}

@Test
public void testFailout()
throws ExecutionException, InterruptedException, TimeoutException
{
assertNull(_loadBalancer.getFailoutConfig("cluster-1"), "No failout config should exist");

final ClusterProperties originalProperties = _state.getClusterProperties("cluster-1").getProperty();
// Inserts dummy failout config
final ClusterStoreProperties propertiesWithFailout =
new ClusterStoreProperties(originalProperties, null, null, new FailoutProperties(Collections.emptyList(), Collections.emptyList()));

writeClusterProperties(propertiesWithFailout);

waitForFailoutPropertyUpdate(true);
assertNotNull(_loadBalancer.getFailoutConfig("cluster-1"));

// Removes the failout config
final ClusterStoreProperties propertiesWithoutFailout = new ClusterStoreProperties(originalProperties, null, null, null);
writeClusterProperties(propertiesWithoutFailout);

waitForFailoutPropertyUpdate(false);
assertNull(_loadBalancer.getFailoutConfig("cluster-1"));
}

private void writeClusterProperties(ClusterStoreProperties propertiesWithFailout)
throws InterruptedException, ExecutionException, TimeoutException
{
FutureCallback<None> callback = new FutureCallback<>();
_cli.getZKClient().setDataUnsafe("/d2/clusters/cluster-1", new ClusterPropertiesJsonSerializer().toBytes(propertiesWithFailout), callback);
callback.get(5, TimeUnit.SECONDS);
}

private void waitForFailoutPropertyUpdate(boolean shouldHaveFailoutProperties)
throws InterruptedException
{
// Wait up to 3 seconds for the subscriber to pick up the change.
for (int i = 0; i < 30; i++)
{
final boolean hasFailoutProperties = _loadBalancer.getFailoutConfig("cluster-1") != null;
if (hasFailoutProperties != shouldHaveFailoutProperties)
{
Thread.sleep(100);
}
else
{
return;
}
}
}

private void startEchoServers(int numHosts)
throws Exception
{
_echoServers = new ArrayList<>();

for (int i = 0; i < numHosts; i++)
{
_echoServers.add(
startEchoServer(ZK_HOST, _zkPort, ECHO_SERVER_HOST, ECHO_SERVER_PORT_START + i, "cluster-1", null, true, "service-1_1", "service-1_2",
"service-1_3"));
}
}
}
39 changes: 38 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import com.linkedin.common.util.None;
import com.linkedin.d2.backuprequests.BackupRequestsStrategyStatsConsumer;
import com.linkedin.d2.balancer.clients.BackupRequestsClient;
import com.linkedin.d2.balancer.clients.FailoutClient;
import com.linkedin.d2.balancer.clients.FailoutRedirectStrategy;
import com.linkedin.d2.balancer.clients.DynamicClient;
import com.linkedin.d2.balancer.clients.RequestTimeoutClient;
import com.linkedin.d2.balancer.clients.RetryClient;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfigProviderFactory;
import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.simple.SslSessionValidatorFactory;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
Expand Down Expand Up @@ -178,7 +181,10 @@ public D2Client build()
_config.zookeeperReadWindowMs,
_config.enableRelativeLoadBalancer,
_config.deterministicSubsettingMetadataProvider,
_config.canaryDistributionProvider);
_config.canaryDistributionProvider,
_config.enableClusterFailout,
_config.failoutConfigProviderFactory,
_config.failoutRedirectStrategy);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
new ZKFSLoadBalancerWithFacilitiesFactory() :
Expand Down Expand Up @@ -223,6 +229,19 @@ else if (_config.restRetryEnabled || _config.streamRetryEnabled)
_config.restRetryEnabled, _config.streamRetryEnabled);
}

if (_config.enableClusterFailout)
{
if (_config.failoutRedirectStrategy == null)
{
LOG.warn("A URI rewrite strategy is required for failout.");
}
else
{
LOG.info("Enabling D2Client failout support");
d2Client = new FailoutClient(d2Client, loadBalancer, _config.failoutRedirectStrategy);
}
}

// If we created default transport client factories, we need to shut them down when d2Client
// is being shut down.
if (_config.clientFactories != transportClientFactories)
Expand Down Expand Up @@ -547,6 +566,24 @@ public D2ClientBuilder setCanaryDistributionProvider(CanaryDistributionProvider
return this;
}

public D2ClientBuilder setEnableClusterFailout(boolean enableClusterFailout)
{
_config.enableClusterFailout = enableClusterFailout;
return this;
}

public D2ClientBuilder setFailoutConfigProviderFactory(FailoutConfigProviderFactory failoutConfigProviderFactory)
{
_config.failoutConfigProviderFactory = failoutConfigProviderFactory;
return this;
}

public D2ClientBuilder setFailoutRedirectStrategy(FailoutRedirectStrategy failoutRedirectStrategy)
{
_config.failoutRedirectStrategy = failoutRedirectStrategy;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
Expand Down
Loading

0 comments on commit f82cebe

Please sign in to comment.