Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

D2 client side implementation of server-reported health #617

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
D2 client side implementation of server-reported health
Ruxin committed May 7, 2021

Verified

This commit was signed with the committer’s verified signature.
zajca Martin Zajíc
commit d7b009c53071d6dbb2dbc682a84803213a6439bc
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import com.linkedin.d2.balancer.strategies.framework.LatencyCorrelation;
import com.linkedin.d2.balancer.strategies.framework.LoadBalancerStrategyTestRunner;
import com.linkedin.d2.balancer.strategies.framework.LoadBalancerStrategyTestRunnerBuilder;
import com.linkedin.d2.balancer.strategies.framework.ServerLoadScoreCorrelation;
import com.linkedin.d2.loadBalancerStrategyType;
import java.net.URI;
import java.util.ArrayList;
@@ -334,6 +335,39 @@ public Object[][] isFastRecovery()
};
}

@Test(dataProvider = "clusterSizeAndQps")
public void testLoadDifference(int clusterSize, int numRequestsPerInterval)
{
LoadBalancerStrategyTestRunner testRunner = buildRelativeRunnerWithDifferentLoad(clusterSize, numRequestsPerInterval);
testRunner.runWait();

List<Integer> loadHistory = testRunner.getLoadHistory().get(testRunner.getUri(0));
System.out.println(loadHistory);

/**
* Test result in terms of load:
* [19, 18, 15, 12, 8, 7, 0, 0, 1, 2, 0, 2, 1, 2, 3, 2, 6, 7, 3, 7, 8, 9, 7, 9, 7, 19, 21, 18, 17, 14, 17]
* [2, 1, 0, 1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
* [1, 2, 0, 1, 1, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
* [21, 22, 15, 18, 7, 6, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 3, 3, 4, 3, 9, 9, 10, 9, 12, 12, 11, 10, 11, 9, 17]
*
* Conclusion: When QPS is big, the recovery time is faster.
* Reaction time is similar, it takes around 6 intervals to drop to 0
* */
}

@DataProvider(name = "clusterSizeAndQps")
public Object[][] clusterSizeAndQps()
{
return new Object[][]
{
{5, 100}, // Small cluster, high QPS per host
{5, 5}, // Small cluster, low QPS per host
{50, 100}, // Big cluster, small QPS per host
{50, 1000} // Big cluster, high QPS per host
};
}

private LoadBalancerStrategyTestRunner buildDefaultRunnerWithConstantBadHost(int numHosts, long badHostLatency,
double relativeLatencyHighThresholdFactor)
{
@@ -474,4 +508,38 @@ private LoadBalancerStrategyTestRunner buildRelativeRunnerWithRandomLatencyInRan
.setRelativeLoadBalancerStrategies(relativeStrategyProperties)
.build();
}

private LoadBalancerStrategyTestRunner buildRelativeRunnerWithDifferentLoad(int numHosts, int numRequestsPerInterval)
{
int overloadedServerLoad = 200;

List<LatencyCorrelation> latencyCorrelationList = new ArrayList<>();
List<ServerLoadScoreCorrelation> serverLoadScoreCorrelationList = new ArrayList<>();

long leftLimit = 0L;
long rightLimit = 400L;
for (int i = 0; i < numHosts; i ++)
{
latencyCorrelationList.add((requestsPerInterval, intervalIndex) ->
400L + (long) (Math.random() * (rightLimit - leftLimit)));
}

// Return high load for the first 10 intervals
serverLoadScoreCorrelationList.add((requestsPerInterval, intervalIndex) ->
intervalIndex >= 0 && intervalIndex <= 10 ? overloadedServerLoad : requestsPerInterval
);
for (int i = 1; i < numHosts; i ++)
{
serverLoadScoreCorrelationList.add((requestsPerInterval, intervalIndex) ->
requestsPerInterval);
}

return new LoadBalancerStrategyTestRunnerBuilder(loadBalancerStrategyType.RELATIVE, DEFAULT_SERVICE_NAME, numHosts)
.enableServerReportedLoad(true)
.setConstantRequestCount(numRequestsPerInterval)
.setNumIntervals(30)
.setDynamicLatency(latencyCorrelationList)
.setDynamicServerLoadScore(serverLoadScoreCorrelationList)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -48,6 +48,18 @@ record D2RelativeStrategyProperties {
*/
lowErrorRate: optional double

/**
* If the latest server reported load score is above this specified factor of the cluster average,
* we will decrease the health score by downStep.
*/
relativeLoadHighThresholdFactor: optional double

/**
* If the latest server reported load score is under this specified factor of the cluster average,
* we will increase the health score by upStep.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have more discussion on up/downStep model vs. P2C. Sticky routing is currently best effort. It's not guaranteed to reach the same host when the pointsMap changes. I still think that 5s maybe too long for server reported load to be effective.

*/
relativeLoadLowThresholdFactor: optional double

/**
* The health score for a server will not be calculated unless the number of calls to it in the interval
* meets or exceeds the minimum call count.
Original file line number Diff line number Diff line change
@@ -175,7 +175,8 @@ public D2Client build()
_config.d2JmxManagerPrefix,
_config.zookeeperReadWindowMs,
_config.enableRelativeLoadBalancer,
_config.deterministicSubsettingMetadataProvider);
_config.deterministicSubsettingMetadataProvider,
_config.enableServerReportedLoad);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
new ZKFSLoadBalancerWithFacilitiesFactory() :
@@ -572,7 +573,7 @@ private Map<String, LoadBalancerStrategyFactory<?>> createDefaultLoadBalancerStr
{
final RelativeLoadBalancerStrategyFactory relativeLoadBalancerStrategyFactory = new RelativeLoadBalancerStrategyFactory(
_config._executorService, _config.healthCheckOperations, Collections.emptyList(), _config.eventEmitter,
SystemClock.instance());
SystemClock.instance(), _config.enableServerReportedLoad);
loadBalancerStrategyFactories.putIfAbsent(RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME,
relativeLoadBalancerStrategyFactory);
}
8 changes: 6 additions & 2 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@

public class D2ClientConfig
{
public static final int DEFAULT_RETRY_LIMIT = 3;

String zkHosts = null;
long zkSessionTimeoutInMs = 3600000L;
long zkStartupTimeoutInMs = 10000L;
@@ -105,7 +107,7 @@ public class D2ClientConfig
String d2JmxManagerPrefix = "UnknownPrefix";
boolean enableRelativeLoadBalancer = false;
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider = null;
public static final int DEFAULT_RETRY_LIMIT = 3;
boolean enableServerReportedLoad = false;

public D2ClientConfig()
{
@@ -161,7 +163,8 @@ public D2ClientConfig()
String d2JmxManagerPrefix,
int zookeeperReadWindowMs,
boolean enableRelativeLoadBalancer,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider)
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
boolean enableServerReportedLoad)
{
this.zkHosts = zkHosts;
this.zkSessionTimeoutInMs = zkSessionTimeoutInMs;
@@ -214,5 +217,6 @@ public D2ClientConfig()
this.zookeeperReadWindowMs = zookeeperReadWindowMs;
this.enableRelativeLoadBalancer = enableRelativeLoadBalancer;
this.deterministicSubsettingMetadataProvider = deterministicSubsettingMetadataProvider;
this.enableServerReportedLoad = enableServerReportedLoad;
}
}
Original file line number Diff line number Diff line change
@@ -192,11 +192,11 @@ public void onResponse(TransportResponse<RestResponse> response)
if (response.hasError())
{
Throwable throwable = response.getError();
handleError(_callCompletion, throwable);
handleError(_callCompletion, throwable, response.getWireAttributes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to parse server load from wireAttributes when enableServerReportedLoad is set to false.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 places that we can potentially control the behavior on the client side:

  1. TrackerClient - whether to pass wire attributes
  2. CallTracker - whether to update the server-reported load in each interval
  3. load balancer strategy - whether to take the server-reported load into load balancing decision.

When I designed this I also debated where I should put the control, in the end I only put one control in load balancer strategy to make the control logic all in one place.

Now TrackerClient and CallTracker just update the server reported load whenever it's ready, and in the load balancing strategy there are 2 cases we need to watch out:

  1. Server enables reporting, but client did not enable, which we respect the client side config flag
  2. Server does not enable yet, but client enables, which means the reported load will always be -1. Either case, we control the RLB to not consider server reported load.

There is definitely a way to add control in TrackerClient, where we can always report -1 as the score. There will be more code we need to cleanup later if we add a control here. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed this logic in my latest commit. I guarded the code with the config on both TrackerClient and the hash ring selector logic.

}
else
{
_callCompletion.endCall();
_callCompletion.endCall(response.getWireAttributes());
}

_wrappedCallback.onResponse(response);
@@ -227,7 +227,7 @@ public void onResponse(TransportResponse<StreamResponse> response)
if (response.hasError())
{
Throwable throwable = response.getError();
handleError(_callCompletion, throwable);
handleError(_callCompletion, throwable, response.getWireAttributes());
}
else
{
@@ -259,13 +259,13 @@ public void onDataAvailable(ByteString data)
@Override
public void onDone()
{
_callCompletion.endCall();
_callCompletion.endCall(response.getWireAttributes());
}

@Override
public void onError(Throwable e)
{
handleError(_callCompletion, e);
handleError(_callCompletion, e, response.getWireAttributes());
}
};
entityStream.addObserver(observer);
@@ -275,35 +275,35 @@ public void onError(Throwable e)
}
}

private void handleError(CallCompletion callCompletion, Throwable throwable)
private void handleError(CallCompletion callCompletion, Throwable throwable, Map<String, String> wireAttributes)
{
if (isServerError(throwable))
{
callCompletion.endCallWithError(ErrorType.SERVER_ERROR);
callCompletion.endCallWithError(ErrorType.SERVER_ERROR, wireAttributes);
}
else if (throwable instanceof RemoteInvocationException)
{
Throwable originalThrowable = LoadBalancerUtil.findOriginalThrowable(throwable);
if (originalThrowable instanceof ConnectException)
{
callCompletion.endCallWithError(ErrorType.CONNECT_EXCEPTION);
callCompletion.endCallWithError(ErrorType.CONNECT_EXCEPTION, wireAttributes);
}
else if (originalThrowable instanceof ClosedChannelException)
{
callCompletion.endCallWithError(ErrorType.CLOSED_CHANNEL_EXCEPTION);
callCompletion.endCallWithError(ErrorType.CLOSED_CHANNEL_EXCEPTION, wireAttributes);
}
else if (originalThrowable instanceof TimeoutException)
{
callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION);
callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION, wireAttributes);
}
else
{
callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION);
callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION, wireAttributes);
}
}
else
{
callCompletion.endCallWithError();
callCompletion.endCallWithError(wireAttributes);
}
}

Original file line number Diff line number Diff line change
@@ -71,20 +71,26 @@ public class RelativeLoadBalancerStrategyFactory implements LoadBalancerStrategy
// Default ring properties
public static final int DEFAULT_POINTS_PER_WEIGHT = 100;

public static final double DEFAULT_RELATIVE_LOAD_HIGH_THRESHOLD_FACTOR = 2.0;
public static final double DEFAULT_RELATIVE_LOAD_LOW_THRESHOLD_FACTOR = 1.2;

private final ScheduledExecutorService _executorService;
private final HealthCheckOperations _healthCheckOperations;
private final List<PartitionStateUpdateListener.Factory<PartitionState>> _stateListenerFactories;
private final EventEmitter _eventEmitter;
private final Clock _clock;
private final boolean _enableServerReportedLoad;

public RelativeLoadBalancerStrategyFactory(ScheduledExecutorService executorService, HealthCheckOperations healthCheckOperations,
List<PartitionStateUpdateListener.Factory<PartitionState>> stateListenerFactories, EventEmitter eventEmitter, Clock clock)
List<PartitionStateUpdateListener.Factory<PartitionState>> stateListenerFactories, EventEmitter eventEmitter, Clock clock,
boolean enableServerReportedLoad)
{
_executorService = executorService;
_healthCheckOperations = healthCheckOperations;
_stateListenerFactories = stateListenerFactories;
_eventEmitter = (eventEmitter == null) ? new NoopEventEmitter() : eventEmitter;
_clock = clock;
_enableServerReportedLoad = enableServerReportedLoad;
}


@@ -112,7 +118,7 @@ private StateUpdater getRelativeStateUpdater(D2RelativeStrategyProperties relati
{
listenerFactories.addAll(_stateListenerFactories);
}
return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, serviceName);
return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, serviceName, _enableServerReportedLoad);
}

private ClientSelector getClientSelector(D2RelativeStrategyProperties relativeStrategyProperties)
@@ -163,6 +169,8 @@ static D2RelativeStrategyProperties putDefaultValues(D2RelativeStrategyPropertie
properties.setErrorStatusFilter(getOrDefault(properties.getErrorStatusFilter(), DEFAULT_ERROR_STATUS_FILTER));
properties.setEmittingIntervalMs(getOrDefault(properties.getEmittingIntervalMs(), DEFAULT_EMITTING_INTERVAL_MS));
properties.setEnableFastRecovery(getOrDefault(properties.isEnableFastRecovery(), DEFAULT_ENABLE_FAST_RECOVERY));
properties.setRelativeLoadHighThresholdFactor(getOrDefault(properties.getRelativeLoadHighThresholdFactor(), DEFAULT_RELATIVE_LOAD_HIGH_THRESHOLD_FACTOR));
properties.setRelativeLoadLowThresholdFactor(getOrDefault(properties.getRelativeLoadLowThresholdFactor(), DEFAULT_RELATIVE_LOAD_LOW_THRESHOLD_FACTOR));

D2QuarantineProperties quarantineProperties = properties.hasQuarantineProperties()
? properties.getQuarantineProperties() : new D2QuarantineProperties();
Loading