diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java index b7b57fa9..17ce2f4a 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java @@ -19,14 +19,15 @@ import com.netflix.concurrency.limits.MetricRegistry; import com.netflix.concurrency.limits.MetricRegistry.SampleListener; import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; -import com.netflix.concurrency.limits.internal.Preconditions; -import com.netflix.concurrency.limits.limit.functions.Log10RootFunction; +import com.netflix.concurrency.limits.limit.functions.Log10RootIntFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.DoubleUnaryOperator; import java.util.function.Function; +import java.util.function.IntUnaryOperator; /** * Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha) @@ -41,7 +42,7 @@ public class VegasLimit extends AbstractLimit { private static final Logger LOG = LoggerFactory.getLogger(VegasLimit.class); - private static final Function LOG10 = Log10RootFunction.create(0); + private static final IntUnaryOperator LOG10 = Log10RootIntFunction.create(0); public static class Builder { private int initialLimit = 20; @@ -49,11 +50,11 @@ public static class Builder { private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; private double smoothing = 1.0; - private Function alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue()); - private Function betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue()); - private Function thresholdFunc = (limit) -> LOG10.apply(limit.intValue()); - private Function increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue()); - private Function decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue()); + private IntUnaryOperator alphaFunc = (limit) -> 3 * LOG10.applyAsInt(limit); + private IntUnaryOperator betaFunc = (limit) -> 6 * LOG10.applyAsInt(limit); + private IntUnaryOperator thresholdFunc = LOG10; + private DoubleUnaryOperator increaseFunc = (limit) -> limit + LOG10.applyAsInt((int) limit); + private DoubleUnaryOperator decreaseFunc = (limit) -> limit - LOG10.applyAsInt((int) limit); private int probeMultiplier = 30; private Builder() { @@ -74,13 +75,31 @@ public Builder alpha(int alpha) { this.alphaFunc = (ignore) -> alpha; return this; } - + + /** + * @deprecated use {@link #thresholdFunction(IntUnaryOperator)} + */ + @Deprecated public Builder threshold(Function threshold) { + this.thresholdFunc = threshold::apply; + return this; + } + + public Builder thresholdFunction(IntUnaryOperator threshold) { this.thresholdFunc = threshold; return this; } - + + /** + * @deprecated use {@link #alphaFunction(IntUnaryOperator)} + */ + @Deprecated public Builder alpha(Function alpha) { + this.alphaFunc = alpha::apply; + return this; + } + + public Builder alphaFunction(IntUnaryOperator alpha) { this.alphaFunc = alpha; return this; } @@ -89,18 +108,45 @@ public Builder beta(int beta) { this.betaFunc = (ignore) -> beta; return this; } - + + /** + * @deprecated use {@link #betaFunction(IntUnaryOperator)} + */ + @Deprecated public Builder beta(Function beta) { + this.betaFunc = beta::apply; + return this; + } + + public Builder betaFunction(IntUnaryOperator beta) { this.betaFunc = beta; return this; } - + + /** + * @deprecated use {@link #increaseFunction(DoubleUnaryOperator)} + */ + @Deprecated public Builder increase(Function increase) { + this.increaseFunc = increase::apply; + return this; + } + + public Builder increaseFunction(DoubleUnaryOperator increase) { this.increaseFunc = increase; return this; } - + + /** + * @deprecated use {@link #decreaseFunction(DoubleUnaryOperator)} + */ + @Deprecated public Builder decrease(Function decrease) { + this.decreaseFunc = decrease::apply; + return this; + } + + public Builder decreaseFunction(DoubleUnaryOperator decrease) { this.decreaseFunc = decrease; return this; } @@ -164,11 +210,11 @@ public static VegasLimit newDefault() { private final int maxLimit; private final double smoothing; - private final Function alphaFunc; - private final Function betaFunc; - private final Function thresholdFunc; - private final Function increaseFunc; - private final Function decreaseFunc; + private final IntUnaryOperator alphaFunc; + private final IntUnaryOperator betaFunc; + private final IntUnaryOperator thresholdFunc; + private final DoubleUnaryOperator increaseFunc; + private final DoubleUnaryOperator decreaseFunc; private final SampleListener rttSampleListener; private final int probeMultiplier; private int probeCount = 0; @@ -201,69 +247,77 @@ private boolean shouldProbe() { @Override protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { - Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt); + if (rtt <= 0) { + throw new IllegalArgumentException("rtt must be >0 but got " + rtt); + } probeCount++; if (shouldProbe()) { - LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); + if (LOG.isDebugEnabled()) { + LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); + } resetProbeJitter(); probeCount = 0; rtt_noload = rtt; - return (int)estimatedLimit; + return (int) estimatedLimit; } - + + long rtt_noload = this.rtt_noload; if (rtt_noload == 0 || rtt < rtt_noload) { - LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); - rtt_noload = rtt; - return (int)estimatedLimit; + if (LOG.isDebugEnabled()) { + LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); + } + this.rtt_noload = rtt; + return (int) estimatedLimit; } - + rttSampleListener.addLongSample(rtt_noload); - return updateEstimatedLimit(rtt, inflight, didDrop); + return updateEstimatedLimit(rtt, rtt_noload, inflight, didDrop); } - private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) { - final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt)); + private int updateEstimatedLimit(long rtt, long rtt_noload, int inflight, boolean didDrop) { + double estimatedLimit = this.estimatedLimit; + final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double) rtt_noload / rtt)); double newLimit; // Treat any drop (i.e timeout) as needing to reduce the limit if (didDrop) { - newLimit = decreaseFunc.apply(estimatedLimit); + newLimit = decreaseFunc.applyAsDouble(estimatedLimit); // Prevent upward drift if not close to the limit } else if (inflight * 2 < estimatedLimit) { - return (int)estimatedLimit; + return (int) estimatedLimit; } else { - int alpha = alphaFunc.apply((int)estimatedLimit); - int beta = betaFunc.apply((int)estimatedLimit); - int threshold = this.thresholdFunc.apply((int)estimatedLimit); + int alpha = alphaFunc.applyAsInt((int) estimatedLimit); + int beta = betaFunc.applyAsInt((int) estimatedLimit); + int threshold = thresholdFunc.applyAsInt((int) estimatedLimit); // Aggressive increase when no queuing if (queueSize <= threshold) { newLimit = estimatedLimit + beta; // Increase the limit if queue is still manageable } else if (queueSize < alpha) { - newLimit = increaseFunc.apply(estimatedLimit); + newLimit = increaseFunc.applyAsDouble(estimatedLimit); // Detecting latency so decrease } else if (queueSize > beta) { - newLimit = decreaseFunc.apply(estimatedLimit); + newLimit = decreaseFunc.applyAsDouble(estimatedLimit); // We're within he sweet spot so nothing to do } else { - return (int)estimatedLimit; + return (int) estimatedLimit; } } newLimit = Math.max(1, Math.min(maxLimit, newLimit)); newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit; - if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) { + if ((int) newLimit != (int) estimatedLimit && LOG.isDebugEnabled()) { LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}", - (int)newLimit, + (int) newLimit, TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0, TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0, queueSize); } - estimatedLimit = newLimit; - return (int)estimatedLimit; + this.estimatedLimit = newLimit; + return (int) newLimit; } @Override diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootFunction.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootFunction.java index 78acbe84..4c1079ff 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootFunction.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootFunction.java @@ -19,30 +19,33 @@ import java.util.stream.IntStream; /** - * Function used by limiters to calculate thredsholds using log10 of the current limit. + * Function used by limiters to calculate thresholds using log10 of the current limit. * Here we pre-compute the log10 of numbers up to 1000 as an optimization. + * + * @deprecated use {@link Log10RootIntFunction} */ +@Deprecated public final class Log10RootFunction implements Function { static final int[] lookup = new int[1000]; - + static { IntStream.range(0, 1000).forEach(i -> lookup[i] = Math.max(1, (int)Math.log10(i))); } - + private static final Log10RootFunction INSTANCE = new Log10RootFunction(); - + /** * Create an instance of a function that returns : baseline + sqrt(limit) - * + * * @param baseline * @return */ public static Function create(int baseline) { return INSTANCE.andThen(t -> t + baseline); } - + @Override public Integer apply(Integer t) { return t < 1000 ? lookup[t] : (int)Math.log10(t); } -} +} \ No newline at end of file diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootIntFunction.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootIntFunction.java new file mode 100644 index 00000000..356cf270 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootIntFunction.java @@ -0,0 +1,37 @@ +package com.netflix.concurrency.limits.limit.functions; + +import java.util.function.IntUnaryOperator; + +/** + * Function used by limiters to calculate thresholds using log10 of the current limit. + * Here we pre-compute the log10 of numbers up to 1000 as an optimization. + */ +public final class Log10RootIntFunction implements IntUnaryOperator { + + private Log10RootIntFunction() {} + + private static final int[] lookup = new int[1000]; + + static { + for (int i = 0; i < lookup.length; i++) { + lookup[i] = Math.max(1, (int) Math.log10(i)); + } + } + + private static final Log10RootIntFunction INSTANCE = new Log10RootIntFunction(); + + /** + * Create an instance of a function that returns : baseline + sqrt(limit) + * + * @param baseline + * @return + */ + public static IntUnaryOperator create(int baseline) { + return baseline == 0 ? INSTANCE : INSTANCE.andThen(t -> t + baseline); + } + + @Override + public int applyAsInt(int t) { + return t < 1000 ? lookup[t] : (int) Math.log10(t); + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java index c32b6940..acb4d7d5 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java @@ -58,7 +58,7 @@ public void noChangeIfWithinThresholds() { @Test public void decreaseSmoothing() { VegasLimit limit = VegasLimit.newBuilder() - .decrease(current -> current / 2) + .decreaseFunction(current -> current / 2) .smoothing(0.5) .initialLimit(100) .maxConcurrency(200) @@ -77,10 +77,33 @@ public void decreaseSmoothing() { Assert.assertEquals(56, limit.getLimit()); } + @Test + public void decreaseSmoothingDeprecatedBuilderMethod() { + @SuppressWarnings("deprecation") + VegasLimit limit = VegasLimit.newBuilder() + .decrease(current -> current / 2) + .smoothing(0.5) + .initialLimit(100) + .maxConcurrency(200) + .build(); + + // Pick up first min-rtt + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false); + Assert.assertEquals(100, limit.getLimit()); + + // First decrease + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); + Assert.assertEquals(75, limit.getLimit()); + + // Second decrease + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); + Assert.assertEquals(56, limit.getLimit()); + } + @Test public void decreaseWithoutSmoothing() { VegasLimit limit = VegasLimit.newBuilder() - .decrease(current -> current / 2) + .decreaseFunction(current -> current / 2) .initialLimit(100) .maxConcurrency(200) .build(); @@ -97,4 +120,26 @@ public void decreaseWithoutSmoothing() { limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); Assert.assertEquals(25, limit.getLimit()); } + + @Test + public void decreaseWithoutSmoothingDeprecatedBuilderMethod() { + @SuppressWarnings("deprecation") + VegasLimit limit = VegasLimit.newBuilder() + .decrease(current -> current / 2) + .initialLimit(100) + .maxConcurrency(200) + .build(); + + // Pick up first min-rtt + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false); + Assert.assertEquals(100, limit.getLimit()); + + // First decrease + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); + Assert.assertEquals(50, limit.getLimit()); + + // Second decrease + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); + Assert.assertEquals(25, limit.getLimit()); + } } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/functions/Log10RootIntFunctionTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/functions/Log10RootIntFunctionTest.java new file mode 100644 index 00000000..cbcd04b8 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/functions/Log10RootIntFunctionTest.java @@ -0,0 +1,26 @@ +package com.netflix.concurrency.limits.limit.functions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.function.IntUnaryOperator; + +public class Log10RootIntFunctionTest { + @Test + public void test0Index() { + IntUnaryOperator func = Log10RootIntFunction.create(0); + Assert.assertEquals(1, func.applyAsInt(0)); + } + + @Test + public void testInRange() { + IntUnaryOperator func = Log10RootIntFunction.create(0); + Assert.assertEquals(2, func.applyAsInt(100)); + } + + @Test + public void testOutofLookupRange() { + IntUnaryOperator func = Log10RootIntFunction.create(0); + Assert.assertEquals(4, func.applyAsInt(10000)); + } +} \ No newline at end of file