Skip to content

Commit

Permalink
Add support for CONSTANT_QPS dark canary cluster strategy (#612)
Browse files Browse the repository at this point in the history
This implementation of the CONSTANT_QPS dark canary cluster strategy leverages a
circular buffer and rate limiter to produce a guaranteed rate of traffic matching
the configured value.
  • Loading branch information
lesterhaynes authored Jun 10, 2021
1 parent 0105db4 commit e6444b4
Show file tree
Hide file tree
Showing 20 changed files with 1,251 additions and 57 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.19.1] - 2021-06-09
- Add support for CONSTANT_QPS dark canary cluster strategy

## [29.18.15] - 2021-06-02
- Fix race conditions in D2 cluster subsetting. Refactor subsetting cache to SubsettingState.

Expand Down Expand Up @@ -4969,7 +4972,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.18.15...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.19.1...master
[29.19.1]: https://github.com/linkedin/rest.li/compare/v29.18.15...v29.19.1
[29.18.15]: https://github.com/linkedin/rest.li/compare/v29.18.14...v29.18.15
[29.18.14]: https://github.com/linkedin/rest.li/compare/v29.18.13...v29.18.14
[29.18.13]: https://github.com/linkedin/rest.li/compare/v29.18.12...v29.18.13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ record DarkClusterConfig {
multiplier: float = 0.0

/**
* Desired query rate to be maintained to dark canaries. Measured in qps.
* Desired query rate to be maintained to the dark cluster per dark cluster host by the CONSTANT_QPS strategy. Measured in qps.
*/
dispatcherOutboundTargetRate: int = 0

/**
* Max rate dispatcher can send to dark canary. Measured in qps. Will act as upper bound to protect canaries in case of traffic spikes
* Number of requests to store in the circular buffer used for asynchronous dispatching by the CONSTANT_QPS strategy.
*/
dispatcherOutboundMaxRate: int = 2147483647
dispatcherMaxRequestsToBuffer: int = 1

/**
* Amount of time in seconds that a request is eligible for asynchronous dispatch once it is added to the circular buffer by the CONSTANT_QPS strategy.
*/
dispatcherBufferedRequestExpiryInSeconds: int = 1

/**
* Prioritized order of dark cluster multiplier strategies. This is a list to support adding new strategies and having the strategy users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ enum DarkClusterStrategyName {
RELATIVE_TRAFFIC

/**
* This strategy will try to maintain a certain queries per second to the entire dark cluster. Configured with "dispatcherOutboundTargetRate".
* This strategy will maintain a certain number of queries per second to the entire dark cluster. It does so by adding every inbound
* request to a circular buffer which is consumed asynchronously by a rate-limited event loop. Configured with:
* "dispatcherOutboundTargetRate": the target rate which each dark cluster host should receive.
* "dispatcherMaxRequestsToBuffer": the number of requests to store in the circular buffer.
* "dispatcherBufferedRequestExpiryInSeconds": time in seconds that a request is eligible for dispatch
* once it is added to the circular buffer.
*/
CONSTANT_QPS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void testDarkClustersConverterDefaults()
DarkClusterConfig resultConfig = DarkClustersConverter.toConfig(DarkClustersConverter.toProperties(configMap)).get(DARK_CLUSTER_KEY);
Assert.assertEquals(resultConfig.getMultiplier(), DARK_CLUSTER_DEFAULT_MULTIPLIER);
Assert.assertEquals((int)resultConfig.getDispatcherOutboundTargetRate(), DARK_CLUSTER_DEFAULT_TARGET_RATE);
Assert.assertEquals((int)resultConfig.getDispatcherOutboundMaxRate(), DARK_CLUSTER_DEFAULT_MAX_RATE);
Assert.assertEquals(resultConfig.getDarkClusterStrategyPrioritizedList().size(), 1, "default strategy list should be size 1");
Assert.assertFalse(resultConfig.hasTransportClientProperties(), "default shouldn't have transportProperties");
}
Expand Down Expand Up @@ -177,5 +176,3 @@ public void testBadStrategies()
Assert.assertEquals(strategyList.get(1), DarkClusterStrategyName.$UNKNOWN, "second strategy should be unknown");
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public void testDarkClusterJsonSerializer() throws PropertySerializationExceptio

DarkClusterConfig darkCluster1 = new DarkClusterConfig()
.setMultiplier(1.5f)
.setDispatcherOutboundMaxRate(123456)
.setDispatcherBufferedRequestExpiryInSeconds(10)
.setDispatcherMaxRequestsToBuffer(100)
.setDispatcherOutboundTargetRate(50);
DarkClusterConfigMap darkClusterConfigMap = new DarkClusterConfigMap();
darkClusterConfigMap.put(DARK_CLUSTER1_KEY, darkCluster1);
Expand All @@ -113,12 +114,14 @@ public void test2DarkClusterJsonSerializer() throws PropertySerializationExcepti

DarkClusterConfig darkCluster1 = new DarkClusterConfig()
.setMultiplier(1.5f)
.setDispatcherOutboundMaxRate(123456)
.setDispatcherBufferedRequestExpiryInSeconds(10)
.setDispatcherMaxRequestsToBuffer(100)
.setDispatcherOutboundTargetRate(50);
DarkClusterConfigMap darkClusterConfigMap = new DarkClusterConfigMap();
darkClusterConfigMap.put(DARK_CLUSTER1_KEY, darkCluster1);
DarkClusterConfig darkCluster2 = new DarkClusterConfig()
.setDispatcherOutboundMaxRate(200)
.setDispatcherBufferedRequestExpiryInSeconds(10)
.setDispatcherMaxRequestsToBuffer(100)
.setDispatcherOutboundTargetRate(50)
.setMultiplier(0);
darkClusterConfigMap.put(DARK_CLUSTER2_KEY, darkCluster2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright (c) 2021 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.darkcluster.impl;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

import com.linkedin.common.util.Notifier;
import com.linkedin.d2.DarkClusterConfig;
import com.linkedin.d2.balancer.ServiceUnavailableException;
import com.linkedin.d2.balancer.util.ClusterInfoProvider;
import com.linkedin.darkcluster.api.BaseDarkClusterDispatcher;
import com.linkedin.darkcluster.api.DarkClusterStrategy;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;

/**
* ConstantQpsDarkClusterStrategy figures out how many dark requests to send. The high level goal of this strategy is to
* keep the incoming QPS per dark cluster host constant.
*
* It uses the {@link ClusterInfoProvider} to determine the number of instances in both the source and target cluster,
* and uses that to calculate the number of requests to send in order to make the QPS per dark cluster host constant and equal
* to a specified value, assuming all hosts in the source cluster send traffic.
*
* This strategy differs from the RELATIVE_TRAFFIC and IDENTICAL_TRAFFIC strategies in that requests are dispatched by a
* rate-limited event loop after being stored in a circular buffer. This provides a steady stream of outbound traffic that
* only duplicates requests when the inbound rate of traffic is less than the outbound rate. With the other strategies,
* requests are randomly selected based on a multiplier. With this strategy, all requests are submitted to the rate-limiter,
* which dispatches and evicts stored requests based on its configuration.
*/
public class ConstantQpsDarkClusterStrategy implements DarkClusterStrategy
{
private final String _originalClusterName;
private final String _darkClusterName;
private final Integer _darkClusterPerHostQps;
private final BaseDarkClusterDispatcher _baseDarkClusterDispatcher;
private final Notifier _notifier;
private final ClusterInfoProvider _clusterInfoProvider;
private final ConstantQpsRateLimiter _rateLimiter;

private static final long ONE_SECOND_PERIOD = TimeUnit.SECONDS.toMillis(1);
private static final int NUM_REQUESTS_TO_SEND_PER_RATE_LIMITER_CYCLE = 1;

public ConstantQpsDarkClusterStrategy(@Nonnull String originalClusterName, @Nonnull String darkClusterName,
@Nonnull Integer darkClusterPerHostQps, @Nonnull BaseDarkClusterDispatcher baseDarkClusterDispatcher,
@Nonnull Notifier notifier, @Nonnull ClusterInfoProvider clusterInfoProvider, @Nonnull ConstantQpsRateLimiter rateLimiter)
{
_originalClusterName = originalClusterName;
_darkClusterName = darkClusterName;
_darkClusterPerHostQps = darkClusterPerHostQps;
_baseDarkClusterDispatcher = baseDarkClusterDispatcher;
_notifier = notifier;
_clusterInfoProvider = clusterInfoProvider;
_rateLimiter = rateLimiter;
}

@Override
public boolean handleRequest(RestRequest originalRequest, RestRequest darkRequest, RequestContext requestContext)
{
float sendRate = getSendRate();
// set burst in such a way that requests are dispatched evenly across the ONE_SECOND_PERIOD
int burst = (int) Math.ceil(sendRate / ONE_SECOND_PERIOD);
_rateLimiter.setRate(sendRate, ONE_SECOND_PERIOD, burst);
return addRequest(originalRequest, darkRequest, requestContext);
}

/**
* We won't create this strategy if this config isn't valid for this strategy. For instance, we don't want to create
* the ConstantQpsDarkClusterStrategy if any of the configurables are zero, because we'd be doing pointless work on every getOrCreate.
* Instead if will go to the next strategy (or NoOpDarkClusterStrategy).
*
* This is a static method defined here because we don't want to instantiate a strategy to check this. It cannot be a
* method that is on the interface because static methods on an interface cannot be overridden by implementations.
* @param darkClusterConfig
* @return true if config is valid for this strategy
*/
public static boolean isValidConfig(DarkClusterConfig darkClusterConfig)
{
return darkClusterConfig.hasDispatcherOutboundTargetRate() &&
darkClusterConfig.getDispatcherOutboundTargetRate() > 0 &&
darkClusterConfig.hasDispatcherMaxRequestsToBuffer() &&
darkClusterConfig.getDispatcherMaxRequestsToBuffer() > 0 &&
darkClusterConfig.hasDispatcherBufferedRequestExpiryInSeconds() &&
darkClusterConfig.getDispatcherBufferedRequestExpiryInSeconds() > 0;
}

/**
* Provides the rate of requests to send per second from this host to the dark cluster. Result of this method call should
* be used to configure the ConstantQpsRateLimiter.
*
* It uses the {@link ClusterInfoProvider} to make the following calculation:
*
* RequestsPerSecond = ((# instances in dark cluster) * darkClusterPerHostQps) / (# instances in source cluster)
*
* For example, if there are 2 dark instances, and 10 instances in the source cluster, with a darkClusterPerHostQps of 50, we get:
* RequestsPerSecond = (2 * 50)/10 = 10.
*
* another example:
* 1 dark instance, 7 source instances, darkClusterPerHostQps = 75.
* RequestsPerSecond = (1 * 75)/7 = 10.71429.
*
* An uncommon but possible configuration:
* 10 dark instances, 1 source instance, darkClusterPerHostQps = 50.
* RequestsPerSecond = (10 * 50)/1 = 500.
*
* @return requests per second this host should dispatch
*/
private float getSendRate()
{
try
{
// Only support https for now. http support can be added later if truly needed, but would be non-ideal
// because potentially both dark and source would have to be configured.
int numDarkClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_originalClusterName);
if (numSourceClusterInstances != 0)
{
return (float) (numDarkClusterInstances * _darkClusterPerHostQps) / numSourceClusterInstances;
}

return 0F;
}
catch (ServiceUnavailableException e)
{
_notifier.notify(() -> new RuntimeException(
"PEGA_0020 unable to compute strategy for source cluster: " + _originalClusterName + ", darkClusterName: " + _darkClusterName, e));
// safe thing is to return 0 so dark traffic isn't sent.
return 0F;
}
}

/**
* Wraps the provided request in a Callback and adds it to the rate-limiter for storage in its buffer. Once stored,
* the rate-limiter will begin including this request in the collection of requests it dispatches. Requests stored in
* the {@link ConstantQpsRateLimiter} will continue to be dispatched until overwritten by newer requests, or until their TTLs expire.
* @return always returns true since callbacks can always be added to {@link ConstantQpsRateLimiter};
*/
private boolean addRequest(RestRequest originalRequest, RestRequest darkRequest, RequestContext requestContext)
{
_rateLimiter.submit(new Callback<None>()
{
@Override
public void onError(Throwable e)
{
//
}

@Override
public void onSuccess(None result)
{
_baseDarkClusterDispatcher.sendRequest(originalRequest, darkRequest, requestContext, NUM_REQUESTS_TO_SEND_PER_RATE_LIMITER_CYCLE);
}
});
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.linkedin.darkcluster.impl;

import com.linkedin.common.callback.Callback;
import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -63,13 +65,15 @@ public class DarkClusterStrategyFactoryImpl implements DarkClusterStrategyFactor
private final Random _random;
private final LoadBalancerClusterListener _clusterListener;
private final DarkClusterVerifierManager _verifierManager;
private final ConstantQpsRateLimiter _rateLimiter;

public DarkClusterStrategyFactoryImpl(@Nonnull Facilities facilities,
@Nonnull String sourceClusterName,
@Nonnull DarkClusterDispatcher darkClusterDispatcher,
@Nonnull Notifier notifier,
@Nonnull Random random,
@Nonnull DarkClusterVerifierManager verifierManager)
@Nonnull DarkClusterVerifierManager verifierManager,
ConstantQpsRateLimiter rateLimiter)
{
_facilities = facilities;
_sourceClusterName = sourceClusterName;
Expand All @@ -78,9 +82,20 @@ public DarkClusterStrategyFactoryImpl(@Nonnull Facilities facilities,
_random = random;
_darkClusterDispatcher = darkClusterDispatcher;
_verifierManager = verifierManager;
_rateLimiter = rateLimiter;
_clusterListener = new DarkClusterListener();
}

public DarkClusterStrategyFactoryImpl(@Nonnull Facilities facilities,
@Nonnull String sourceClusterName,
@Nonnull DarkClusterDispatcher darkClusterDispatcher,
@Nonnull Notifier notifier,
@Nonnull Random random,
@Nonnull DarkClusterVerifierManager verifierManager)
{
this(facilities, sourceClusterName, darkClusterDispatcher, notifier, random, verifierManager, null);
}

@Override
public void start()
{
Expand Down Expand Up @@ -128,24 +143,39 @@ private DarkClusterStrategy createStrategy(String darkClusterName, DarkClusterCo
if (RelativeTrafficMultiplierDarkClusterStrategy.isValidConfig(darkClusterConfig))
{
BaseDarkClusterDispatcher baseDarkClusterDispatcher =
new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager);
return new RelativeTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName, darkClusterConfig.getMultiplier(),
baseDarkClusterDispatcher, _notifier, _facilities.getClusterInfoProvider(),
_random);
new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager);
return new RelativeTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName,
darkClusterConfig.getMultiplier(), baseDarkClusterDispatcher,
_notifier, _facilities.getClusterInfoProvider(), _random);
}
break;
case IDENTICAL_TRAFFIC:
if (IdenticalTrafficMultiplierDarkClusterStrategy.isValidConfig(darkClusterConfig))
{
BaseDarkClusterDispatcher baseDarkClusterDispatcher =
new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager);
return new IdenticalTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName, darkClusterConfig.getMultiplier(),
baseDarkClusterDispatcher, _notifier, _facilities.getClusterInfoProvider(),
_random);
return new IdenticalTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName,
darkClusterConfig.getMultiplier(), baseDarkClusterDispatcher,
_notifier, _facilities.getClusterInfoProvider(), _random);
}
break;
case CONSTANT_QPS:
// the constant qps strategy is not yet implemented, continue to the next strategy if it exists
if (_rateLimiter == null)
{
LOG.error("Dark Cluster {} configured to use CONSTANT_QPS strategy, but no rate limiter provided during instantiation. "
+ "No Dark Cluster strategy will be used!", darkClusterName);
break;
}
if (ConstantQpsDarkClusterStrategy.isValidConfig(darkClusterConfig))
{
BaseDarkClusterDispatcher baseDarkClusterDispatcher =
new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager);
_rateLimiter.setBufferCapacity(darkClusterConfig.getDispatcherMaxRequestsToBuffer());
_rateLimiter.setBufferTtl(darkClusterConfig.getDispatcherBufferedRequestExpiryInSeconds(), ChronoUnit.SECONDS);
return new ConstantQpsDarkClusterStrategy(_sourceClusterName, darkClusterName,
darkClusterConfig.getDispatcherOutboundTargetRate(), baseDarkClusterDispatcher,
_notifier, _facilities.getClusterInfoProvider(), _rateLimiter);
}
break;
default:
break;
Expand All @@ -171,7 +201,8 @@ public void onClusterAdded(String updatedClusterName)
_facilities.getClusterInfoProvider().getDarkClusterConfigMap(_sourceClusterName, new Callback<DarkClusterConfigMap>()
{
@Override
public void onError(Throwable e) {
public void onError(Throwable e)
{
_notifier.notify(() -> new RuntimeException("PEGA_0019 unable to refresh DarkClusterConfigMap for source cluster: "
+ _sourceClusterName, e));
}
Expand Down
Loading

0 comments on commit e6444b4

Please sign in to comment.