From e6444b4273afcda2be0a9a12d3ebd47fdef17e65 Mon Sep 17 00:00:00 2001 From: Lester Haynes Date: Thu, 10 Jun 2021 16:40:28 -0700 Subject: [PATCH] Add support for CONSTANT_QPS dark canary cluster strategy (#612) This implementation of the CONSTANT_QPS dark canary cluster strategy leverages a circular buffer and rate limiter to produce a guaranteed rate of traffic matching the configured value. --- CHANGELOG.md | 6 +- .../com/linkedin/d2/DarkClusterConfig.pdl | 11 +- .../linkedin/d2/DarkClusterStrategyName.pdl | 7 +- .../config/DarkClustersConverterTest.java | 3 - .../ClusterPropertiesSerializerTest.java | 9 +- .../impl/ConstantQpsDarkClusterStrategy.java | 174 +++++++++++++ .../impl/DarkClusterStrategyFactoryImpl.java | 51 +++- .../TestConstantQpsDarkClusterStrategy.java | 116 +++++++++ .../darkcluster/TestDarkClusterFilter.java | 12 +- .../TestDarkClusterStrategyFactory.java | 8 +- gradle.properties | 2 +- .../http/client/ConstantQpsRateLimiter.java | 101 ++++++++ .../http/client/EvictingCircularBuffer.java | 237 ++++++++++++++++++ .../http/client/SmoothRateLimiter.java | 124 ++++++--- .../client/ratelimiter/CallbackBuffer.java | 43 ++++ .../RateLimiterExecutionTracker.java | 56 +++++ .../ratelimiter/SimpleCallbackBuffer.java | 48 ++++ .../client/TestEvictingCircularBuffer.java | 200 +++++++++++++++ .../TestConstantQpsRateLimiter.java | 92 +++++++ .../linkedin/test/util/ClockedExecutor.java | 8 + 20 files changed, 1251 insertions(+), 57 deletions(-) create mode 100644 darkcluster/src/main/java/com/linkedin/darkcluster/impl/ConstantQpsDarkClusterStrategy.java create mode 100644 darkcluster/src/test/java/com/linkedin/darkcluster/TestConstantQpsDarkClusterStrategy.java create mode 100644 r2-core/src/main/java/com/linkedin/r2/transport/http/client/ConstantQpsRateLimiter.java create mode 100644 r2-core/src/main/java/com/linkedin/r2/transport/http/client/EvictingCircularBuffer.java create mode 100644 r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/CallbackBuffer.java create mode 100644 r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/RateLimiterExecutionTracker.java create mode 100644 r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/SimpleCallbackBuffer.java create mode 100644 r2-core/src/test/java/com/linkedin/r2/transport/http/client/TestEvictingCircularBuffer.java create mode 100644 r2-core/src/test/java/com/linkedin/r2/transport/http/client/ratelimiter/TestConstantQpsRateLimiter.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 338d8aeb47..a9b69a6be1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.19.1] - 2021-06-09 +- Add support for CONSTANT_QPS dark canary cluster strategy + ## [29.18.15] - 2021-06-02 - Fix race conditions in D2 cluster subsetting. Refactor subsetting cache to SubsettingState. @@ -4969,7 +4972,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.18.15...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.19.1...master +[29.19.1]: https://github.com/linkedin/rest.li/compare/v29.18.15...v29.19.1 [29.18.15]: https://github.com/linkedin/rest.li/compare/v29.18.14...v29.18.15 [29.18.14]: https://github.com/linkedin/rest.li/compare/v29.18.13...v29.18.14 [29.18.13]: https://github.com/linkedin/rest.li/compare/v29.18.12...v29.18.13 diff --git a/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterConfig.pdl b/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterConfig.pdl index 13abe1385c..bc50049c5d 100644 --- a/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterConfig.pdl +++ b/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterConfig.pdl @@ -11,14 +11,19 @@ record DarkClusterConfig { multiplier: float = 0.0 /** - * Desired query rate to be maintained to dark canaries. Measured in qps. + * Desired query rate to be maintained to the dark cluster per dark cluster host by the CONSTANT_QPS strategy. Measured in qps. */ dispatcherOutboundTargetRate: int = 0 /** - * Max rate dispatcher can send to dark canary. Measured in qps. Will act as upper bound to protect canaries in case of traffic spikes + * Number of requests to store in the circular buffer used for asynchronous dispatching by the CONSTANT_QPS strategy. */ - dispatcherOutboundMaxRate: int = 2147483647 + dispatcherMaxRequestsToBuffer: int = 1 + + /** + * Amount of time in seconds that a request is eligible for asynchronous dispatch once it is added to the circular buffer by the CONSTANT_QPS strategy. + */ + dispatcherBufferedRequestExpiryInSeconds: int = 1 /** * Prioritized order of dark cluster multiplier strategies. This is a list to support adding new strategies and having the strategy users diff --git a/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterStrategyName.pdl b/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterStrategyName.pdl index 6c2609f985..5e29c31c0c 100644 --- a/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterStrategyName.pdl +++ b/d2-schemas/src/main/pegasus/com/linkedin/d2/DarkClusterStrategyName.pdl @@ -16,7 +16,12 @@ enum DarkClusterStrategyName { RELATIVE_TRAFFIC /** - * This strategy will try to maintain a certain queries per second to the entire dark cluster. Configured with "dispatcherOutboundTargetRate". + * This strategy will maintain a certain number of queries per second to the entire dark cluster. It does so by adding every inbound + * request to a circular buffer which is consumed asynchronously by a rate-limited event loop. Configured with: + * "dispatcherOutboundTargetRate": the target rate which each dark cluster host should receive. + * "dispatcherMaxRequestsToBuffer": the number of requests to store in the circular buffer. + * "dispatcherBufferedRequestExpiryInSeconds": time in seconds that a request is eligible for dispatch + * once it is added to the circular buffer. */ CONSTANT_QPS diff --git a/d2/src/test/java/com/linkedin/d2/balancer/config/DarkClustersConverterTest.java b/d2/src/test/java/com/linkedin/d2/balancer/config/DarkClustersConverterTest.java index 391f6a01e6..1cf58a3339 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/config/DarkClustersConverterTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/config/DarkClustersConverterTest.java @@ -92,7 +92,6 @@ public void testDarkClustersConverterDefaults() DarkClusterConfig resultConfig = DarkClustersConverter.toConfig(DarkClustersConverter.toProperties(configMap)).get(DARK_CLUSTER_KEY); Assert.assertEquals(resultConfig.getMultiplier(), DARK_CLUSTER_DEFAULT_MULTIPLIER); Assert.assertEquals((int)resultConfig.getDispatcherOutboundTargetRate(), DARK_CLUSTER_DEFAULT_TARGET_RATE); - Assert.assertEquals((int)resultConfig.getDispatcherOutboundMaxRate(), DARK_CLUSTER_DEFAULT_MAX_RATE); Assert.assertEquals(resultConfig.getDarkClusterStrategyPrioritizedList().size(), 1, "default strategy list should be size 1"); Assert.assertFalse(resultConfig.hasTransportClientProperties(), "default shouldn't have transportProperties"); } @@ -177,5 +176,3 @@ public void testBadStrategies() Assert.assertEquals(strategyList.get(1), DarkClusterStrategyName.$UNKNOWN, "second strategy should be unknown"); } } - - diff --git a/d2/src/test/java/com/linkedin/d2/balancer/properties/ClusterPropertiesSerializerTest.java b/d2/src/test/java/com/linkedin/d2/balancer/properties/ClusterPropertiesSerializerTest.java index 1d1654743b..de317afb4d 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/properties/ClusterPropertiesSerializerTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/properties/ClusterPropertiesSerializerTest.java @@ -97,7 +97,8 @@ public void testDarkClusterJsonSerializer() throws PropertySerializationExceptio DarkClusterConfig darkCluster1 = new DarkClusterConfig() .setMultiplier(1.5f) - .setDispatcherOutboundMaxRate(123456) + .setDispatcherBufferedRequestExpiryInSeconds(10) + .setDispatcherMaxRequestsToBuffer(100) .setDispatcherOutboundTargetRate(50); DarkClusterConfigMap darkClusterConfigMap = new DarkClusterConfigMap(); darkClusterConfigMap.put(DARK_CLUSTER1_KEY, darkCluster1); @@ -113,12 +114,14 @@ public void test2DarkClusterJsonSerializer() throws PropertySerializationExcepti DarkClusterConfig darkCluster1 = new DarkClusterConfig() .setMultiplier(1.5f) - .setDispatcherOutboundMaxRate(123456) + .setDispatcherBufferedRequestExpiryInSeconds(10) + .setDispatcherMaxRequestsToBuffer(100) .setDispatcherOutboundTargetRate(50); DarkClusterConfigMap darkClusterConfigMap = new DarkClusterConfigMap(); darkClusterConfigMap.put(DARK_CLUSTER1_KEY, darkCluster1); DarkClusterConfig darkCluster2 = new DarkClusterConfig() - .setDispatcherOutboundMaxRate(200) + .setDispatcherBufferedRequestExpiryInSeconds(10) + .setDispatcherMaxRequestsToBuffer(100) .setDispatcherOutboundTargetRate(50) .setMultiplier(0); darkClusterConfigMap.put(DARK_CLUSTER2_KEY, darkCluster2); diff --git a/darkcluster/src/main/java/com/linkedin/darkcluster/impl/ConstantQpsDarkClusterStrategy.java b/darkcluster/src/main/java/com/linkedin/darkcluster/impl/ConstantQpsDarkClusterStrategy.java new file mode 100644 index 0000000000..4492ebcea1 --- /dev/null +++ b/darkcluster/src/main/java/com/linkedin/darkcluster/impl/ConstantQpsDarkClusterStrategy.java @@ -0,0 +1,174 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.darkcluster.impl; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.util.None; +import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; + +import com.linkedin.common.util.Notifier; +import com.linkedin.d2.DarkClusterConfig; +import com.linkedin.d2.balancer.ServiceUnavailableException; +import com.linkedin.d2.balancer.util.ClusterInfoProvider; +import com.linkedin.darkcluster.api.BaseDarkClusterDispatcher; +import com.linkedin.darkcluster.api.DarkClusterStrategy; +import com.linkedin.r2.message.RequestContext; +import com.linkedin.r2.message.rest.RestRequest; + +/** + * ConstantQpsDarkClusterStrategy figures out how many dark requests to send. The high level goal of this strategy is to + * keep the incoming QPS per dark cluster host constant. + * + * It uses the {@link ClusterInfoProvider} to determine the number of instances in both the source and target cluster, + * and uses that to calculate the number of requests to send in order to make the QPS per dark cluster host constant and equal + * to a specified value, assuming all hosts in the source cluster send traffic. + * + * This strategy differs from the RELATIVE_TRAFFIC and IDENTICAL_TRAFFIC strategies in that requests are dispatched by a + * rate-limited event loop after being stored in a circular buffer. This provides a steady stream of outbound traffic that + * only duplicates requests when the inbound rate of traffic is less than the outbound rate. With the other strategies, + * requests are randomly selected based on a multiplier. With this strategy, all requests are submitted to the rate-limiter, + * which dispatches and evicts stored requests based on its configuration. + */ +public class ConstantQpsDarkClusterStrategy implements DarkClusterStrategy +{ + private final String _originalClusterName; + private final String _darkClusterName; + private final Integer _darkClusterPerHostQps; + private final BaseDarkClusterDispatcher _baseDarkClusterDispatcher; + private final Notifier _notifier; + private final ClusterInfoProvider _clusterInfoProvider; + private final ConstantQpsRateLimiter _rateLimiter; + + private static final long ONE_SECOND_PERIOD = TimeUnit.SECONDS.toMillis(1); + private static final int NUM_REQUESTS_TO_SEND_PER_RATE_LIMITER_CYCLE = 1; + + public ConstantQpsDarkClusterStrategy(@Nonnull String originalClusterName, @Nonnull String darkClusterName, + @Nonnull Integer darkClusterPerHostQps, @Nonnull BaseDarkClusterDispatcher baseDarkClusterDispatcher, + @Nonnull Notifier notifier, @Nonnull ClusterInfoProvider clusterInfoProvider, @Nonnull ConstantQpsRateLimiter rateLimiter) + { + _originalClusterName = originalClusterName; + _darkClusterName = darkClusterName; + _darkClusterPerHostQps = darkClusterPerHostQps; + _baseDarkClusterDispatcher = baseDarkClusterDispatcher; + _notifier = notifier; + _clusterInfoProvider = clusterInfoProvider; + _rateLimiter = rateLimiter; + } + + @Override + public boolean handleRequest(RestRequest originalRequest, RestRequest darkRequest, RequestContext requestContext) + { + float sendRate = getSendRate(); + // set burst in such a way that requests are dispatched evenly across the ONE_SECOND_PERIOD + int burst = (int) Math.ceil(sendRate / ONE_SECOND_PERIOD); + _rateLimiter.setRate(sendRate, ONE_SECOND_PERIOD, burst); + return addRequest(originalRequest, darkRequest, requestContext); + } + + /** + * We won't create this strategy if this config isn't valid for this strategy. For instance, we don't want to create + * the ConstantQpsDarkClusterStrategy if any of the configurables are zero, because we'd be doing pointless work on every getOrCreate. + * Instead if will go to the next strategy (or NoOpDarkClusterStrategy). + * + * This is a static method defined here because we don't want to instantiate a strategy to check this. It cannot be a + * method that is on the interface because static methods on an interface cannot be overridden by implementations. + * @param darkClusterConfig + * @return true if config is valid for this strategy + */ + public static boolean isValidConfig(DarkClusterConfig darkClusterConfig) + { + return darkClusterConfig.hasDispatcherOutboundTargetRate() && + darkClusterConfig.getDispatcherOutboundTargetRate() > 0 && + darkClusterConfig.hasDispatcherMaxRequestsToBuffer() && + darkClusterConfig.getDispatcherMaxRequestsToBuffer() > 0 && + darkClusterConfig.hasDispatcherBufferedRequestExpiryInSeconds() && + darkClusterConfig.getDispatcherBufferedRequestExpiryInSeconds() > 0; + } + + /** + * Provides the rate of requests to send per second from this host to the dark cluster. Result of this method call should + * be used to configure the ConstantQpsRateLimiter. + * + * It uses the {@link ClusterInfoProvider} to make the following calculation: + * + * RequestsPerSecond = ((# instances in dark cluster) * darkClusterPerHostQps) / (# instances in source cluster) + * + * For example, if there are 2 dark instances, and 10 instances in the source cluster, with a darkClusterPerHostQps of 50, we get: + * RequestsPerSecond = (2 * 50)/10 = 10. + * + * another example: + * 1 dark instance, 7 source instances, darkClusterPerHostQps = 75. + * RequestsPerSecond = (1 * 75)/7 = 10.71429. + * + * An uncommon but possible configuration: + * 10 dark instances, 1 source instance, darkClusterPerHostQps = 50. + * RequestsPerSecond = (10 * 50)/1 = 500. + * + * @return requests per second this host should dispatch + */ + private float getSendRate() + { + try + { + // Only support https for now. http support can be added later if truly needed, but would be non-ideal + // because potentially both dark and source would have to be configured. + int numDarkClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_darkClusterName); + int numSourceClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_originalClusterName); + if (numSourceClusterInstances != 0) + { + return (float) (numDarkClusterInstances * _darkClusterPerHostQps) / numSourceClusterInstances; + } + + return 0F; + } + catch (ServiceUnavailableException e) + { + _notifier.notify(() -> new RuntimeException( + "PEGA_0020 unable to compute strategy for source cluster: " + _originalClusterName + ", darkClusterName: " + _darkClusterName, e)); + // safe thing is to return 0 so dark traffic isn't sent. + return 0F; + } + } + + /** + * Wraps the provided request in a Callback and adds it to the rate-limiter for storage in its buffer. Once stored, + * the rate-limiter will begin including this request in the collection of requests it dispatches. Requests stored in + * the {@link ConstantQpsRateLimiter} will continue to be dispatched until overwritten by newer requests, or until their TTLs expire. + + * @return always returns true since callbacks can always be added to {@link ConstantQpsRateLimiter}; + */ + private boolean addRequest(RestRequest originalRequest, RestRequest darkRequest, RequestContext requestContext) + { + _rateLimiter.submit(new Callback() + { + @Override + public void onError(Throwable e) + { + // + } + + @Override + public void onSuccess(None result) + { + _baseDarkClusterDispatcher.sendRequest(originalRequest, darkRequest, requestContext, NUM_REQUESTS_TO_SEND_PER_RATE_LIMITER_CYCLE); + } + }); + return true; + } +} diff --git a/darkcluster/src/main/java/com/linkedin/darkcluster/impl/DarkClusterStrategyFactoryImpl.java b/darkcluster/src/main/java/com/linkedin/darkcluster/impl/DarkClusterStrategyFactoryImpl.java index d3a42e4de6..c415872c1e 100644 --- a/darkcluster/src/main/java/com/linkedin/darkcluster/impl/DarkClusterStrategyFactoryImpl.java +++ b/darkcluster/src/main/java/com/linkedin/darkcluster/impl/DarkClusterStrategyFactoryImpl.java @@ -17,6 +17,8 @@ package com.linkedin.darkcluster.impl; import com.linkedin.common.callback.Callback; +import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter; +import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.Random; import java.util.Set; @@ -63,13 +65,15 @@ public class DarkClusterStrategyFactoryImpl implements DarkClusterStrategyFactor private final Random _random; private final LoadBalancerClusterListener _clusterListener; private final DarkClusterVerifierManager _verifierManager; + private final ConstantQpsRateLimiter _rateLimiter; public DarkClusterStrategyFactoryImpl(@Nonnull Facilities facilities, @Nonnull String sourceClusterName, @Nonnull DarkClusterDispatcher darkClusterDispatcher, @Nonnull Notifier notifier, @Nonnull Random random, - @Nonnull DarkClusterVerifierManager verifierManager) + @Nonnull DarkClusterVerifierManager verifierManager, + ConstantQpsRateLimiter rateLimiter) { _facilities = facilities; _sourceClusterName = sourceClusterName; @@ -78,9 +82,20 @@ public DarkClusterStrategyFactoryImpl(@Nonnull Facilities facilities, _random = random; _darkClusterDispatcher = darkClusterDispatcher; _verifierManager = verifierManager; + _rateLimiter = rateLimiter; _clusterListener = new DarkClusterListener(); } + public DarkClusterStrategyFactoryImpl(@Nonnull Facilities facilities, + @Nonnull String sourceClusterName, + @Nonnull DarkClusterDispatcher darkClusterDispatcher, + @Nonnull Notifier notifier, + @Nonnull Random random, + @Nonnull DarkClusterVerifierManager verifierManager) + { + this(facilities, sourceClusterName, darkClusterDispatcher, notifier, random, verifierManager, null); + } + @Override public void start() { @@ -128,10 +143,10 @@ private DarkClusterStrategy createStrategy(String darkClusterName, DarkClusterCo if (RelativeTrafficMultiplierDarkClusterStrategy.isValidConfig(darkClusterConfig)) { BaseDarkClusterDispatcher baseDarkClusterDispatcher = - new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager); - return new RelativeTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName, darkClusterConfig.getMultiplier(), - baseDarkClusterDispatcher, _notifier, _facilities.getClusterInfoProvider(), - _random); + new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager); + return new RelativeTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName, + darkClusterConfig.getMultiplier(), baseDarkClusterDispatcher, + _notifier, _facilities.getClusterInfoProvider(), _random); } break; case IDENTICAL_TRAFFIC: @@ -139,13 +154,28 @@ private DarkClusterStrategy createStrategy(String darkClusterName, DarkClusterCo { BaseDarkClusterDispatcher baseDarkClusterDispatcher = new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager); - return new IdenticalTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName, darkClusterConfig.getMultiplier(), - baseDarkClusterDispatcher, _notifier, _facilities.getClusterInfoProvider(), - _random); + return new IdenticalTrafficMultiplierDarkClusterStrategy(_sourceClusterName, darkClusterName, + darkClusterConfig.getMultiplier(), baseDarkClusterDispatcher, + _notifier, _facilities.getClusterInfoProvider(), _random); } break; case CONSTANT_QPS: - // the constant qps strategy is not yet implemented, continue to the next strategy if it exists + if (_rateLimiter == null) + { + LOG.error("Dark Cluster {} configured to use CONSTANT_QPS strategy, but no rate limiter provided during instantiation. " + + "No Dark Cluster strategy will be used!", darkClusterName); + break; + } + if (ConstantQpsDarkClusterStrategy.isValidConfig(darkClusterConfig)) + { + BaseDarkClusterDispatcher baseDarkClusterDispatcher = + new BaseDarkClusterDispatcherImpl(darkClusterName, _darkClusterDispatcher, _notifier, _verifierManager); + _rateLimiter.setBufferCapacity(darkClusterConfig.getDispatcherMaxRequestsToBuffer()); + _rateLimiter.setBufferTtl(darkClusterConfig.getDispatcherBufferedRequestExpiryInSeconds(), ChronoUnit.SECONDS); + return new ConstantQpsDarkClusterStrategy(_sourceClusterName, darkClusterName, + darkClusterConfig.getDispatcherOutboundTargetRate(), baseDarkClusterDispatcher, + _notifier, _facilities.getClusterInfoProvider(), _rateLimiter); + } break; default: break; @@ -171,7 +201,8 @@ public void onClusterAdded(String updatedClusterName) _facilities.getClusterInfoProvider().getDarkClusterConfigMap(_sourceClusterName, new Callback() { @Override - public void onError(Throwable e) { + public void onError(Throwable e) + { _notifier.notify(() -> new RuntimeException("PEGA_0019 unable to refresh DarkClusterConfigMap for source cluster: " + _sourceClusterName, e)); } diff --git a/darkcluster/src/test/java/com/linkedin/darkcluster/TestConstantQpsDarkClusterStrategy.java b/darkcluster/src/test/java/com/linkedin/darkcluster/TestConstantQpsDarkClusterStrategy.java new file mode 100644 index 0000000000..c116dd4972 --- /dev/null +++ b/darkcluster/src/test/java/com/linkedin/darkcluster/TestConstantQpsDarkClusterStrategy.java @@ -0,0 +1,116 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.darkcluster; + +import com.linkedin.darkcluster.impl.ConstantQpsDarkClusterStrategy; +import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter; +import com.linkedin.r2.transport.http.client.EvictingCircularBuffer; +import com.linkedin.test.util.ClockedExecutor; +import com.linkedin.util.clock.Clock; +import java.net.URI; + +import com.linkedin.darkcluster.api.DarkClusterDispatcher; +import com.linkedin.darkcluster.impl.BaseDarkClusterDispatcherImpl; +import com.linkedin.darkcluster.impl.DefaultDarkClusterDispatcher; +import com.linkedin.r2.message.RequestContext; +import com.linkedin.r2.message.rest.RestRequest; +import com.linkedin.r2.message.rest.RestRequestBuilder; + +import java.time.temporal.ChronoUnit; +import java.util.stream.IntStream; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TestConstantQpsDarkClusterStrategy +{ + private static final String SOURCE_CLUSTER_NAME = "FooCluster"; + private static final String DARK_CLUSTER_NAME = "fooCluster-dark"; + private static final float ERR_PCT = 0.30f; // 5% + + private static final int TEST_CAPACITY = 5; + private static final int TEST_TTL = 5; + private static final ChronoUnit TEST_TTL_UNIT = ChronoUnit.SECONDS; + + @DataProvider + public Object[][] qpsKeys() + { + return new Object[][]{ + // numIterations, qps, numSourceInstances, numDarkInstances + {0, 0, 10, 10}, + {0, 100, 10, 10}, + {1000, 10, 10, 10}, + {1000, 30, 10, 10}, + {1000, 50, 10, 10}, + {1000, 100, 10, 10}, + {1000, 150, 10, 10}, + {100, 200, 10, 10}, + // now test typical case of differing qps with different instance sizes + {1000, 100, 10, 1}, + {1000, 90, 10, 1}, + {1000, 120, 10, 1}, + {1000, 100, 10, 2}, + {1000, 100, 40, 3}, + {1000, 200, 10, 1}, + {1000, 250, 10, 1}, + {1000, 400, 10, 1} + }; + } + + @Test(dataProvider = "qpsKeys") + public void testStrategy(int numIterations, int qps, int numSourceInstances, int numDarkInstances) + { + IntStream.of(1, 1000, 1000000).forEach(capacity -> + { + DarkClusterDispatcher darkClusterDispatcher = new DefaultDarkClusterDispatcher(new MockClient(false)); + BaseDarkClusterDispatcherImpl baseDispatcher = new BaseDarkClusterDispatcherImpl(DARK_CLUSTER_NAME, + darkClusterDispatcher, + new DoNothingNotifier(), + new CountingVerifierManager()); + MockClusterInfoProvider mockClusterInfoProvider = new MockClusterInfoProvider(); + mockClusterInfoProvider.putHttpsClusterCount(DARK_CLUSTER_NAME, numDarkInstances); + mockClusterInfoProvider.putHttpsClusterCount(SOURCE_CLUSTER_NAME, numSourceInstances); + ClockedExecutor executor = new ClockedExecutor(); + + EvictingCircularBuffer buffer = TestConstantQpsDarkClusterStrategy.getBuffer(executor); + ConstantQpsRateLimiter rateLimiter = + new ConstantQpsRateLimiter(executor, executor, executor, buffer); + rateLimiter.setBufferCapacity(capacity); + ConstantQpsDarkClusterStrategy strategy = new ConstantQpsDarkClusterStrategy(SOURCE_CLUSTER_NAME, + DARK_CLUSTER_NAME, + qps, + baseDispatcher, + new DoNothingNotifier(), + mockClusterInfoProvider, + rateLimiter); + for (int i=0; i < numIterations; i++) + { + RestRequest dummyRestRequest = new RestRequestBuilder(URI.create("foo")).build(); + strategy.handleRequest(dummyRestRequest, dummyRestRequest, new RequestContext()); + } + executor.runFor(1000); + int expectedCount = ((numIterations == 0 ? 0 : 1) * qps * numDarkInstances)/(numSourceInstances); + int actualCount = baseDispatcher.getRequestCount(); + Assert.assertEquals(actualCount, expectedCount, expectedCount * ERR_PCT, "count not within expected range"); + }); + } + + static EvictingCircularBuffer getBuffer(Clock clock) + { + return new EvictingCircularBuffer(TEST_CAPACITY, TEST_TTL, TEST_TTL_UNIT, clock); + } +} diff --git a/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterFilter.java b/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterFilter.java index 8bd93b7401..a2172711bc 100644 --- a/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterFilter.java +++ b/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterFilter.java @@ -16,7 +16,12 @@ package com.linkedin.darkcluster; +import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter; +import com.linkedin.r2.transport.http.client.EvictingCircularBuffer; +import com.linkedin.util.clock.Clock; +import com.linkedin.util.clock.SystemClock; import java.net.URI; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -66,6 +71,9 @@ public class TestDarkClusterFilter private ExecutorService _executorService = Executors.newSingleThreadExecutor(); private DarkClusterVerifier _darkClusterVerifier = new NoOpDarkClusterVerifier(); private DarkClusterVerifierManager _verifierManager = new DarkClusterVerifierManagerImpl(_darkClusterVerifier, _executorService); + Clock clock = SystemClock.instance(); + private ConstantQpsRateLimiter _rateLimiter = new ConstantQpsRateLimiter( + _scheduledExecutorService, _executorService, clock, new EvictingCircularBuffer(1, 1, ChronoUnit.SECONDS, clock)); private Random _random = new Random(); private DarkClusterFilter _darkClusterFilter; private DarkClusterStrategyFactory _darkClusterStrategyFactory; @@ -80,7 +88,7 @@ public void setup() _darkClusterStrategyFactory = new DarkClusterStrategyFactoryImpl(_facilities, SOURCE_CLUSTER_NAME, _darkClusterDispatcher, _notifier, _random, - _verifierManager); + _verifierManager, _rateLimiter); DarkClusterManager darkClusterManager = new DarkClusterManagerImpl(SOURCE_CLUSTER_NAME, _facilities, @@ -113,7 +121,7 @@ public void testDarkClusterAssemblyWithDarkCluster() _darkClusterStrategyFactory = new DarkClusterStrategyFactoryImpl(_facilities, SOURCE_CLUSTER_NAME, _darkClusterDispatcher, _notifier, _random, - _verifierManager); + _verifierManager, _rateLimiter); _darkClusterStrategyFactory.start(); DarkClusterManager darkClusterManager = new DarkClusterManagerImpl(SOURCE_CLUSTER_NAME, _facilities, diff --git a/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterStrategyFactory.java b/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterStrategyFactory.java index 395b804948..95300c17ee 100644 --- a/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterStrategyFactory.java +++ b/darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterStrategyFactory.java @@ -16,6 +16,8 @@ package com.linkedin.darkcluster; +import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter; +import com.linkedin.test.util.ClockedExecutor; import java.net.URI; import java.util.Arrays; import java.util.Collections; @@ -59,6 +61,7 @@ public class TestDarkClusterStrategyFactory private static final int SEED = 2; private DarkClusterStrategyFactory _strategyFactory; private MockClusterInfoProvider _clusterInfoProvider; + private ConstantQpsRateLimiter _rateLimiter; @BeforeMethod public void setup() @@ -68,12 +71,15 @@ public void setup() DarkClusterConfig darkClusterConfigOld = createRelativeTrafficMultiplierConfig(0.5f); _clusterInfoProvider.addDarkClusterConfig(SOURCE_CLUSTER_NAME, PREEXISTING_DARK_CLUSTER_NAME, darkClusterConfigOld); DarkClusterDispatcher darkClusterDispatcher = new DefaultDarkClusterDispatcher(new MockClient(false)); + ClockedExecutor executor = new ClockedExecutor(); + _rateLimiter = new ConstantQpsRateLimiter(executor, executor, executor, TestConstantQpsDarkClusterStrategy.getBuffer(executor)); _strategyFactory = new DarkClusterStrategyFactoryImpl(facilities, SOURCE_CLUSTER_NAME, darkClusterDispatcher, new DoNothingNotifier(), new Random(SEED), - new CountingVerifierManager()); + new CountingVerifierManager(), + _rateLimiter); _strategyFactory.start(); } diff --git a/gradle.properties b/gradle.properties index b1e8fff5b2..b68b638c6a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.18.15 +version=29.19.1 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ConstantQpsRateLimiter.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ConstantQpsRateLimiter.java new file mode 100644 index 0000000000..62b432acf2 --- /dev/null +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ConstantQpsRateLimiter.java @@ -0,0 +1,101 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client; + +import com.linkedin.r2.transport.http.client.ratelimiter.RateLimiterExecutionTracker; +import com.linkedin.util.clock.Clock; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * A {@link SmoothRateLimiter} that never rejects new callbacks, and continues to execute callbacks as long as the underlying + * {@link EvictingCircularBuffer} has callbacks to supply. This rate-limiter should only be used in cases where the user + * demands a constant rate of callback execution, and it's not important that all callbacks are executed, or executed only once. + * + * Rest.li's original use case for this rate-limiter is to supply Dark Clusters with a steady stream of request volume for + * testing purposes under a given load. + */ +public class ConstantQpsRateLimiter extends SmoothRateLimiter +{ + private final EvictingCircularBuffer _evictingCircularBuffer; + + public ConstantQpsRateLimiter( + ScheduledExecutorService scheduler, Executor executor, Clock clock, EvictingCircularBuffer callbackBuffer) + { + super(scheduler, executor, clock, callbackBuffer, BufferOverflowMode.NONE, "ConstantQpsRateLimiter", new UnboundedRateLimiterExecutionTracker()); + _evictingCircularBuffer = callbackBuffer; + } + + /** + * Sets the underlying {@link EvictingCircularBuffer} size, which controls the maximum number of callbacks to store in memory concurrently. + * @param capacity + */ + public void setBufferCapacity(int capacity) + { + _evictingCircularBuffer.setCapacity(capacity); + } + + /** + * Sets the underlying {@link EvictingCircularBuffer} ttl, which controls how long a request can exist in the buffer + * until it is no longer available. + * @param ttl + * @param ttlUnit + */ + public void setBufferTtl(int ttl, ChronoUnit ttlUnit) + { + _evictingCircularBuffer.setTtl(ttl, ttlUnit); + } + + + private static class UnboundedRateLimiterExecutionTracker implements RateLimiterExecutionTracker + { + private final AtomicBoolean _paused = new AtomicBoolean(true); + + public int getPending() + { + return 1; + } + + public boolean getPausedAndIncrement() + { + return _paused.getAndSet(false); + } + + public boolean decrementAndGetPaused() + { + return _paused.get(); + } + + public void pauseExecution() + { + _paused.set(true); + } + + public boolean isPaused() + { + return _paused.get(); + } + + public int getMaxBuffered() + { + return Integer.MAX_VALUE; + } + } +} diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/EvictingCircularBuffer.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/EvictingCircularBuffer.java new file mode 100644 index 0000000000..9da9099dee --- /dev/null +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/EvictingCircularBuffer.java @@ -0,0 +1,237 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.util.None; +import com.linkedin.r2.transport.http.client.ratelimiter.CallbackBuffer; +import com.linkedin.util.clock.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * A CallbackBuffer specifically designed to feed an asynchronous event loop with a constant supply of unique Callbacks. + * + * EvictingCircularBuffer accomplishes a few key goals: + * - Must always accept submission of new Callbacks, replacing oldest Callbacks when buffer is exhausted + * - Must provide a unique Callback between subsequent get() calls + * - Must be able to honor get requests in excess of put requests, returning previously returned Callbacks is acceptable. + * - During periods of insufficient write throughput, must prune stale Callbacks, ultimately throwing NoSuchElementException + * to upstream callers when inbound write throughput has dropped to zero. + * - Must do the above with high performance, without adding meaningful latency to reader/writer threads. + * + * This class is thread-safe, achieving performance through granular locking of each element of the underlying circular buffer. + */ +public class EvictingCircularBuffer implements CallbackBuffer +{ + private Duration _ttl; + private final ArrayList> _callbacks = new ArrayList<>(); + private final ArrayList _ttlBuffer = new ArrayList<>(); + private final ArrayList _elementLocks = new ArrayList<>(); + private final AtomicInteger _readerPosition = new AtomicInteger(); + private final AtomicInteger _writerPosition = new AtomicInteger(); + private final Clock _clock; + + /** + * @param capacity initial value for the maximum number of Callbacks storable by this buffer + * @param ttl Amount of time a callback is eligible for being returned after being stored + * @param ttlUnit Unit of time for ttl value + * @param clock Clock instance used for calculating ttl expiry + */ + public EvictingCircularBuffer(int capacity, int ttl, ChronoUnit ttlUnit, Clock clock) + { + setCapacity(capacity); + setTtl(ttl, ttlUnit); + _clock = clock; + } + + /** + * Adds the supplied Callback to the internal circular buffer. If the buffer is full, the oldest Callback in the buffer + * will be overwritten. Calls to put always succeed, and there is no guarantee that Callbacks submitted through the put() + * method will be subsequently returned by the get() method. + * + * @param toAdd Callback that is to be possibly returned by later calls to get() + */ + public void put(Callback toAdd) + { + int writerPosition = getAndBumpWriterPosition(); + ReentrantReadWriteLock thisLock = _elementLocks.get(writerPosition); + thisLock.writeLock().lock(); + try + { + _callbacks.set(writerPosition, toAdd); + _ttlBuffer.set(writerPosition, Instant.ofEpochMilli(_clock.currentTimeMillis())); + } + finally + { + thisLock.writeLock().unlock(); + } + } + + /** + * Returns a Callback previously stored in the circular buffer through the put() method. Callbacks are generally returned + * in the order they were received, but in cases of get() throughput in excess of put() throughput, previously returned + * Callbacks will be sent. Oldest age of returned Callbacks is configurable through the ttl param in the constructor. + * + * Calls to get() will always succeed as long as calls to put() continue at a cadence within the ttl duration. When write + * throughput has dropped to zero, get() will eventually throw NoSuchElementException once the circular buffer has become + * fully pruned through expired ttl. + * @return Callback + * @throws NoSuchElementException if internal circular buffer is empty + */ + public Callback get() throws NoSuchElementException + { + for (int i = 0; i <= getCapacity(); i++) + { + int thisReaderPosition = getAndBumpReaderPosition(); + ReentrantReadWriteLock thisLock = _elementLocks.get(thisReaderPosition); + thisLock.readLock().lock(); + Callback callback; + Instant ttl; + try + { + callback = _callbacks.get(thisReaderPosition); + ttl = _ttlBuffer.get(thisReaderPosition); + } + finally + { + thisLock.readLock().unlock(); + } + + if (callback != null) + { + // check for expired ttl + if (Duration.between(ttl, Instant.ofEpochMilli(_clock.currentTimeMillis())).compareTo(_ttl) > 0) + { + thisLock.writeLock().lock(); + try + { + // after acquiring write lock at reader position, ensure the data at reader position is the same as when we read it + if (callback == _callbacks.get(thisReaderPosition)) + { + _callbacks.set(thisReaderPosition, null); + _ttlBuffer.set(thisReaderPosition, null); + } + } + finally + { + thisLock.writeLock().unlock(); + } + } + else + { + return callback; + } + } + } + throw new NoSuchElementException("buffer is empty"); + } + + /** + * @return the number of unique Callbacks this buffer can hold. + */ + int getCapacity() + { + return _callbacks.size(); + } + + /** + * Resizes the circular buffer, deleting the contents in the process. + * This method should not be called frequently, ideally only as part of a startup lifecycle, as it does heavy locking + * to ensure all reads and writes are drained coinciding with the resizing of the buffer. + * @param capacity + */ + void setCapacity(int capacity) + { + if (capacity < 1) + { + throw new IllegalArgumentException("capacity can't be less than 1"); + } + // acquire write lock for all elements in the buffer to prevent reads while the buffer is re-created, + // taking care to store them in a temporary location for releasing afterward. + ArrayList tempLocks = new ArrayList<>(); + _elementLocks.forEach(x -> + { + x.writeLock().lock(); + tempLocks.add(x); + }); + try + { + _callbacks.clear(); + _ttlBuffer.clear(); + _elementLocks.clear(); + // populate ArrayList with nulls to prevent changes to underlying data structure size during writes, + // also needed to compute reader and writer position through calls to size() + _ttlBuffer.addAll(Collections.nCopies(capacity, null)); + _callbacks.addAll(Collections.nCopies(capacity, null)); + for(int i = 0; i <= capacity; i++) + { + _elementLocks.add(new ReentrantReadWriteLock()); + } + } + finally + { + // these locks no longer exist in _elementLocks, but we need to release them in order to unblock + // pending reads. + tempLocks.forEach(x -> x.writeLock().unlock()); + } + } + + /** + * @return the currently configured TTL. + */ + Duration getTtl() + { + return _ttl; + } + + /** + * Sets the amount of time a Callback is eligible to be returned after it has been stored in the buffer. + * TTL is shared across all stored Callbacks for the sake of simplicity. + * @param ttl number value of amount of time + * @param ttlUnit unit of time for number value + */ + void setTtl(int ttl, ChronoUnit ttlUnit) + { + if (ttl < 1) + { + throw new IllegalArgumentException("ttl can't be less than 1"); + } + if (ttlUnit == null) + { + throw new IllegalArgumentException("ttlUnit can't be null."); + } + _ttl = Duration.of(ttl, ttlUnit); + } + + private int getAndBumpWriterPosition() + { + return (_writerPosition.getAndUpdate(x -> (x + 1) % _callbacks.size())); + } + + private int getAndBumpReaderPosition() + { + return (_readerPosition.getAndUpdate(x -> (x + 1) % _callbacks.size())); + } +} diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/SmoothRateLimiter.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/SmoothRateLimiter.java index 1abe83032e..d4d0481f18 100644 --- a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/SmoothRateLimiter.java +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/SmoothRateLimiter.java @@ -18,10 +18,14 @@ import com.linkedin.common.callback.Callback; import com.linkedin.common.util.None; +import com.linkedin.r2.transport.http.client.ratelimiter.CallbackBuffer; +import com.linkedin.r2.transport.http.client.ratelimiter.RateLimiterExecutionTracker; +import com.linkedin.r2.transport.http.client.ratelimiter.SimpleCallbackBuffer; import com.linkedin.r2.transport.http.client.ratelimiter.Rate; import com.linkedin.util.ArgumentUtil; import com.linkedin.util.RateLimitedLogger; import com.linkedin.util.clock.Clock; +import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -52,10 +56,9 @@ public class SmoothRateLimiter implements AsyncRateLimiter private final String _rateLimiterName; private volatile Rate _rate = Rate.ZERO_VALUE; private final EventLoop _eventLoop; - private final int _maxBuffered; - private final Queue> _pendingCallbacks; + private final CallbackBuffer _pendingCallbacks; - private final AtomicInteger _pendingCount = new AtomicInteger(0); + private final RateLimiterExecutionTracker _executionTracker; private final AtomicReference _invocationError = new AtomicReference<>(null); private final static Long OVER_BUFFER_RATELIMITEDLOG_RATE_MS = 60000L; @@ -70,35 +73,46 @@ public enum BufferOverflowMode /** * Enqueue the request and run at least one to avoid the overflow */ - SCHEDULE_WITH_WARNING + SCHEDULE_WITH_WARNING, + /** + * Used for buffers that cannot overflow + */ + NONE + } + + public SmoothRateLimiter(ScheduledExecutorService scheduler, Executor executor, Clock clock, Queue> pendingCallbacks, + int maxBuffered, BufferOverflowMode bufferOverflowMode, String rateLimiterName) + { + this(scheduler, executor, clock, new SimpleCallbackBuffer(pendingCallbacks), bufferOverflowMode, rateLimiterName, new BoundedRateLimiterExecutionTracker(maxBuffered)); } /** * Constructs a new instance of {@link SmoothRateLimiter}. * The default rate is 0, no requests will be processed until the rate is changed * - * @param scheduler Scheduler used to execute the internal non-blocking event loop. MUST be single-threaded - * @param executor Executes the tasks for invoking #onSuccess and #onError (only during #callAll) - * @param clock Clock implementation that supports getting the current time accurate to milliseconds - * @param pendingCallbacks THREAD SAFE and NON-BLOCKING implementation of callback queue - * @param maxBuffered Maximum number of tasks kept in the queue before execution + * @param scheduler Scheduler used to execute the internal non-blocking event loop. MUST be single-threaded + * @param executor Executes the tasks for invoking #onSuccess and #onError (only during #callAll) + * @param clock Clock implementation that supports getting the current time accurate to milliseconds + * @param pendingCallbacks THREAD SAFE and NON-BLOCKING implementation of callback queue * @param bufferOverflowMode just what to do if the max buffer is reached. In many applications blindly - * dropping the request might not be backward compatible + * dropping the request might not be backward compatible + * @param rateLimiterName Name assigned for logging purposes + * @param executionTracker Adjusts the behavior of the rate limiter based on policies/state of RateLimiterExecutionTracker + */ - public SmoothRateLimiter(ScheduledExecutorService scheduler, Executor executor, Clock clock, Queue> pendingCallbacks, - int maxBuffered, BufferOverflowMode bufferOverflowMode, String rateLimiterName) + SmoothRateLimiter(ScheduledExecutorService scheduler, Executor executor, Clock clock, CallbackBuffer pendingCallbacks, + BufferOverflowMode bufferOverflowMode, String rateLimiterName, RateLimiterExecutionTracker executionTracker) { ArgumentUtil.ensureNotNull(scheduler, "scheduler"); ArgumentUtil.ensureNotNull(executor, "executor"); ArgumentUtil.ensureNotNull(clock, "clock"); - ArgumentUtil.checkArgument(maxBuffered >= 0, "maxBuffered"); _scheduler = scheduler; _executor = executor; _pendingCallbacks = pendingCallbacks; - _maxBuffered = maxBuffered; _bufferOverflowMode = bufferOverflowMode; _rateLimiterName = rateLimiterName; + _executionTracker = executionTracker; _eventLoop = new EventLoop(clock); _rateLimitedLoggerOverBuffer = new RateLimitedLogger(LOG, OVER_BUFFER_RATELIMITEDLOG_RATE_MS, clock); @@ -131,25 +145,26 @@ public void submit(Callback callback) throws RejectedExecutionException { ArgumentUtil.ensureNotNull(callback, "callback"); - if (_pendingCount.get() >= _maxBuffered) + if (_executionTracker.getPending() >= _executionTracker.getMaxBuffered()) { if (_bufferOverflowMode == BufferOverflowMode.DROP) { throw new RejectedExecutionException( - String.format("PEGA_2000: Cannot submit callback because the buffer is full at %d tasks for ratelimiter: %s", _maxBuffered, _rateLimiterName)); + String.format("PEGA_2000: Cannot submit callback because the buffer is full at %d tasks for ratelimiter: %s", + _executionTracker.getMaxBuffered(), _rateLimiterName)); } else { _rateLimitedLoggerOverBuffer.error(String.format( - "PEGA_2001: the buffer is full at %d tasks for ratelimiter: %s. Executing a request immediately to avoid overflowing and dropping the task.", _maxBuffered, - _rateLimiterName)); + "PEGA_2001: the buffer is full at %d tasks for ratelimiter: %s. Executing a request immediately to avoid overflowing and dropping the task.", + _executionTracker.getMaxBuffered(), _rateLimiterName)); } } - _pendingCallbacks.offer(callback); - if (_pendingCount.getAndIncrement() == 0) + _pendingCallbacks.put(callback); + if (_executionTracker.getPausedAndIncrement()) { - _scheduler.execute(_eventLoop::loop); + _scheduler.execute(_eventLoop::loop); } } @@ -189,7 +204,7 @@ public void cancelAll(Throwable throwable) @Override public int getPendingTasksCount(){ - return _pendingCount.get(); + return _executionTracker.getPending(); } /** @@ -244,16 +259,14 @@ public void loop() _permitsInTimeFrame = rate.getEvents(); } - // if all the tasks have been previously consumed, there is no need for continuing the loop - if (_pendingCount.get() == 0) + if (_executionTracker.isPaused()) { return; } - // the size of the pending cannot be ever greater than the maxBuffered. We prefer - // running above the limit then risking a leak - if (_pendingCount.get() > _maxBuffered) + if (_executionTracker.getPending() > _executionTracker.getMaxBuffered()) { + // We prefer running above the limit then risking a leak _permitAvailableCount++; } @@ -263,9 +276,13 @@ public void loop() Callback callback = null; try { - callback = _pendingCallbacks.poll(); + callback = _pendingCallbacks.get(); _executor.execute(new Task(callback, _invocationError.get())); } + catch (NoSuchElementException ex) + { + _executionTracker.pauseExecution(); + } catch (Throwable e) { // Invoke the callback#onError on the current thread as the last resort. Executing the callback on the @@ -282,7 +299,7 @@ public void loop() } finally { - if (_pendingCount.decrementAndGet() > 0) + if (!_executionTracker.decrementAndGetPaused()) { _scheduler.execute(this::loop); } @@ -299,14 +316,13 @@ public void loop() { _nextScheduled = nextRunAbsolute; - _scheduler.schedule(this::loop, nextRunRelativeTime, - TimeUnit.MILLISECONDS); + _scheduler.schedule(this::loop, nextRunRelativeTime, TimeUnit.MILLISECONDS); } } catch (Throwable throwable) { LOG.error("An unrecoverable exception occurred while scheduling the event loop causing the rate limiter" - + "to stop processing submitted tasks.", throwable); + + "to stop processing submitted tasks.", throwable); } } } @@ -350,4 +366,48 @@ public void run() } } } + + private static class BoundedRateLimiterExecutionTracker implements RateLimiterExecutionTracker + { + private final AtomicInteger _pendingCount = new AtomicInteger(0); + private final int _maxBuffered; + + public BoundedRateLimiterExecutionTracker(int maxBuffered) + { + ArgumentUtil.checkArgument(maxBuffered >= 0, "maxBuffered"); + + _maxBuffered = maxBuffered; + } + + public boolean getPausedAndIncrement() + { + return _pendingCount.getAndIncrement() == 0; + } + + public boolean decrementAndGetPaused() + { + return _pendingCount.updateAndGet(i -> i > 0 ? i - 1 : i) == 0; + } + + public boolean isPaused() + { + // if all the tasks have been previously consumed, there is no need for continuing execution + return _pendingCount.get() == 0; + } + + public void pauseExecution() + { + _pendingCount.set(0); + } + + public int getPending() + { + return _pendingCount.get(); + } + + public int getMaxBuffered() + { + return _maxBuffered; + } + } } diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/CallbackBuffer.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/CallbackBuffer.java new file mode 100644 index 0000000000..1093993f5f --- /dev/null +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/CallbackBuffer.java @@ -0,0 +1,43 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client.ratelimiter; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.util.None; +import java.util.NoSuchElementException; + + +/** + * A lightweight queue-like interface specifically for Callbacks + */ +public interface CallbackBuffer +{ + + /** + * Buffers a Callback for later retrieval. + * @param callback + */ + void put(Callback callback); + + /** + * Provides a Callback previously stored through the put method. + * This interface makes no recommendation of ordering between put and get calls. + * @return Callback + * @throws NoSuchElementException if the CallbackBuffer is empty + */ + Callback get() throws NoSuchElementException; +} diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/RateLimiterExecutionTracker.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/RateLimiterExecutionTracker.java new file mode 100644 index 0000000000..7f0e437707 --- /dev/null +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/RateLimiterExecutionTracker.java @@ -0,0 +1,56 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client.ratelimiter; + +/** + * Used by a RateLimiter to track execution of callbacks pending in its internal buffer. + */ +public interface RateLimiterExecutionTracker +{ + + /** + * Unpauses execution on RateLimiter if applicable. Increments the number of pending callbacks by 1. + * @return whether or not the RateLimiter was paused when method call happened. + */ + boolean getPausedAndIncrement(); + + /** + * Pauses execution on RateLimiter if applicable. Decrements the number of pending callbacks by 1. + * @return whether on not the RateLimiter was paused as a result of the call happening. + */ + boolean decrementAndGetPaused(); + + /** + * Pauses execution on the RateLimiter. + */ + void pauseExecution(); + + /** + * @return whether or not execution on the RateLimiter is currently paused. + */ + boolean isPaused(); + + /** + * @return outstanding number of callbacks pending to be executed in the RateLimiter + */ + int getPending(); + + /** + * @return maximum number of callbacks that can be stored in the RateLimiter + */ + int getMaxBuffered(); +} diff --git a/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/SimpleCallbackBuffer.java b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/SimpleCallbackBuffer.java new file mode 100644 index 0000000000..58389a602c --- /dev/null +++ b/r2-core/src/main/java/com/linkedin/r2/transport/http/client/ratelimiter/SimpleCallbackBuffer.java @@ -0,0 +1,48 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client.ratelimiter; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.util.None; +import com.linkedin.util.ArgumentUtil; +import java.util.NoSuchElementException; +import java.util.Queue; + + +/** + * A simple CallbackBuffer implementation that delegates to the provided Queue + */ +public class SimpleCallbackBuffer implements CallbackBuffer +{ + private final Queue> _queue; + + public SimpleCallbackBuffer(Queue> queue) + { + ArgumentUtil.ensureNotNull(queue, "queue cannot be null"); + _queue = queue; + } + + public void put(Callback callback) + { + _queue.offer(callback); + } + + public Callback get() throws NoSuchElementException + { + return _queue.remove(); + } +} diff --git a/r2-core/src/test/java/com/linkedin/r2/transport/http/client/TestEvictingCircularBuffer.java b/r2-core/src/test/java/com/linkedin/r2/transport/http/client/TestEvictingCircularBuffer.java new file mode 100644 index 0000000000..262b899a40 --- /dev/null +++ b/r2-core/src/test/java/com/linkedin/r2/transport/http/client/TestEvictingCircularBuffer.java @@ -0,0 +1,200 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.FutureCallback; +import com.linkedin.common.util.None; +import com.linkedin.util.clock.Clock; +import com.linkedin.util.clock.SettableClock; +import com.linkedin.util.clock.SystemClock; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.NoSuchElementException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.stream.IntStream; +import org.junit.Assert; +import org.testng.annotations.Test; + + +public class TestEvictingCircularBuffer +{ + private static final int TEST_TIMEOUT = 3000; + private static final int TEST_CAPACITY = 5; + private static final int TEST_TTL = 5; + private static final ChronoUnit TEST_TTL_UNIT = ChronoUnit.SECONDS; + private static final SettableClock TEST_CLOCK = new SettableClock(); + + @Test(timeOut = TEST_TIMEOUT) + public void testGettersAfterInstantiateSimple() + { + EvictingCircularBuffer buffer = new EvictingCircularBuffer(TEST_CAPACITY, TEST_TTL, TEST_TTL_UNIT, SystemClock.instance()); + Assert.assertEquals(buffer.getCapacity(), TEST_CAPACITY); + Assert.assertEquals(buffer.getTtl().getSeconds(), TEST_TTL); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testCreatePutGetRepeatInOrder() + { + Callback callback = new FutureCallback<>(); + Callback callbackAlso = new FutureCallback<>(); + EvictingCircularBuffer buffer = getBuffer(); + buffer.put(callback); + Assert.assertSame(buffer.get(), callback); + Assert.assertSame(buffer.get(), callback); + buffer.put(callbackAlso); + Assert.assertSame(buffer.get(), callbackAlso); + Assert.assertSame(buffer.get(), callback); + Assert.assertSame(buffer.get(), callbackAlso); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testTtlPurge() + { + Callback callback = new FutureCallback<>(); + EvictingCircularBuffer buffer = getBuffer(); + buffer.put(callback); + Assert.assertSame(buffer.get(), callback); + TEST_CLOCK.addDuration(5001); + try + { + buffer.get(); + } + catch (NoSuchElementException ex) + { + // get + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testParallelPutGet() + { + CyclicBarrier floodgate = new CyclicBarrier(9); + Callback callback = new FutureCallback<>(); + EvictingCircularBuffer buffer = getBuffer(); + + buffer.put(callback); + + for (int i = 0; i < 4; i++) + { + new Thread(() -> { + try + { + floodgate.await(); + } + catch (InterruptedException | BrokenBarrierException ignored) {} + buffer.put(new FutureCallback<>()); + }).start(); + } + + for (int i = 0; i < 5; i++) + { + new Thread(() -> { + try + { + floodgate.await(); + } + catch (InterruptedException | BrokenBarrierException ignored) {} + buffer.get(); + }).start(); + } + + ArrayList> results = new ArrayList<>(); + IntStream.range(0, 5).forEach(x -> results.add(buffer.get())); + Assert.assertTrue(results.contains(callback)); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testSetCapacityAfterCreate() + { + EvictingCircularBuffer buffer = getBuffer(); + buffer.put(new FutureCallback<>()); + buffer.setCapacity(9001); + try + { + buffer.get(); + } + catch (NoSuchElementException ex) + { + // buffer clears after resize by design + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testSetTtlAfterCreate() + { + EvictingCircularBuffer buffer = getBuffer(); + Callback callback = new FutureCallback<>(); + buffer.put(callback); + buffer.setTtl(9001, ChronoUnit.MILLIS); + TEST_CLOCK.addDuration(8000); + Assert.assertSame(buffer.get(), callback); + TEST_CLOCK.addDuration(1002); + try + { + buffer.get(); + } + catch (NoSuchElementException ex) + { + // expired ttl + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testIllegalTtlAndCapacityArguments() + { + EvictingCircularBuffer buffer = getBuffer(); + + try + { + buffer.setTtl(0, TEST_TTL_UNIT); + } + catch (IllegalArgumentException ex) + { + // TTL can't be less than 1. + } + + try + { + buffer.setTtl(1, null); + } + catch (IllegalArgumentException ex) + { + // TTL unit can't be null + } + + try + { + buffer.setCapacity(0); + } + catch (IllegalArgumentException ex) + { + // we can always do puts on EvictingCircularBuffer, so capacity should never be less than 1. + } + } + + public static EvictingCircularBuffer getBuffer() + { + return getBuffer(TEST_CLOCK); + } + + public static EvictingCircularBuffer getBuffer(Clock clock) + { + return new EvictingCircularBuffer(TEST_CAPACITY, TEST_TTL, TEST_TTL_UNIT, clock); + } +} diff --git a/r2-core/src/test/java/com/linkedin/r2/transport/http/client/ratelimiter/TestConstantQpsRateLimiter.java b/r2-core/src/test/java/com/linkedin/r2/transport/http/client/ratelimiter/TestConstantQpsRateLimiter.java new file mode 100644 index 0000000000..b6540466ee --- /dev/null +++ b/r2-core/src/test/java/com/linkedin/r2/transport/http/client/ratelimiter/TestConstantQpsRateLimiter.java @@ -0,0 +1,92 @@ +/* + Copyright (c) 2021 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.r2.transport.http.client.ratelimiter; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.util.None; +import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter; +import com.linkedin.r2.transport.http.client.TestEvictingCircularBuffer; +import com.linkedin.test.util.ClockedExecutor; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.testng.annotations.Test; + + +public class TestConstantQpsRateLimiter +{ + private static final int TEST_TIMEOUT = 3000; + private static final float TEST_QPS = 5; + private static final int ONE_SECOND = 1000; + private static final int TEST_NUM_CYCLES = 10; + private static final int UNLIMITED_BURST = Integer.MAX_VALUE; + + + @Test(timeOut = TEST_TIMEOUT) + public void submitOnceGetMany() + { + ClockedExecutor executor = new ClockedExecutor(); + ConstantQpsRateLimiter rateLimiter = + new ConstantQpsRateLimiter(executor, executor, executor, TestEvictingCircularBuffer.getBuffer(executor)); + + rateLimiter.setRate(TEST_QPS, ONE_SECOND, UNLIMITED_BURST); + rateLimiter.setBufferCapacity(1); + + TattlingCallback tattler = new TattlingCallback<>(); + rateLimiter.submit(tattler); + executor.runFor(ONE_SECOND * TEST_NUM_CYCLES); + Assert.assertTrue(tattler.getInteractCount() > 1); + } + + @Test(timeOut = TEST_TIMEOUT) + public void eventLoopStopsWhenTtlExpiresAllRequests() + { + ClockedExecutor executor = new ClockedExecutor(); + ConstantQpsRateLimiter rateLimiter = + new ConstantQpsRateLimiter(executor, executor, executor, TestEvictingCircularBuffer.getBuffer(executor)); + + rateLimiter.setRate(TEST_QPS, ONE_SECOND, UNLIMITED_BURST); + rateLimiter.setBufferTtl(999, ChronoUnit.MILLIS); + TattlingCallback tattler = new TattlingCallback<>(); + rateLimiter.submit(tattler); + executor.runFor(ONE_SECOND * TEST_NUM_CYCLES); + Assert.assertSame(tattler.getInteractCount(), (int) TEST_QPS); + long prevTaskCount = executor.getExecutedTaskCount(); + executor.runFor(ONE_SECOND * TEST_NUM_CYCLES); + // EventLoop continues by scheduling itself at the end. If executed task count remains the same, + // then EventLoop hasn't re-scheduled itself. + Assert.assertSame(executor.getExecutedTaskCount(), prevTaskCount); + } + + private static class TattlingCallback implements Callback + { + private AtomicInteger _interactCount = new AtomicInteger(); + + @Override + public void onError(Throwable e) {} + + @Override + public void onSuccess(T result) { + _interactCount.incrementAndGet(); + } + + public int getInteractCount() + { + return _interactCount.intValue(); + } + } +} diff --git a/test-util/src/main/java/com/linkedin/test/util/ClockedExecutor.java b/test-util/src/main/java/com/linkedin/test/util/ClockedExecutor.java index fa97e0230e..fbc862e5cb 100644 --- a/test-util/src/main/java/com/linkedin/test/util/ClockedExecutor.java +++ b/test-util/src/main/java/com/linkedin/test/util/ClockedExecutor.java @@ -13,6 +13,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.linkedin.util.clock.Clock; @@ -31,6 +32,7 @@ public class ClockedExecutor implements Clock, ScheduledExecutorService private volatile long _currentTimeMillis = 0L; private volatile Boolean _stopped = true; + private volatile long _taskCount = 0L; private PriorityBlockingQueue _taskList = new PriorityBlockingQueue<>(); public Future runFor(long duration) @@ -72,6 +74,7 @@ public Future runUntil(long untilTime) LOG.debug("Processing task " + task.toString() + " total {}, time {}", _taskList.size(), _currentTimeMillis); } task.run(); + _taskCount++; if (task.repeatCount() > 0 && !task.isCancelled() && !_stopped) { task.reschedule(_currentTimeMillis); @@ -82,6 +85,11 @@ public Future runUntil(long untilTime) return null; } + public long getExecutedTaskCount() + { + return _taskCount; + } + @Override public ScheduledFuture schedule(Runnable cmd, long delay, TimeUnit unit) {