From cf2d5903e3deadd947185ca32b33888ec30a9285 Mon Sep 17 00:00:00 2001 From: Eran Landau Date: Wed, 7 Feb 2018 13:58:03 -0800 Subject: [PATCH] Introduce tolerance for min rtt changes --- concurrency-limits-core/build.gradle | 1 + .../concurrency/limits/limit/VegasLimit.java | 34 +++++++- .../limits/limiter/DefaultLimiter.java | 79 ++++++++++++++++++- .../BlockingAdaptiveExecutorSimulation.java | 4 +- .../limits/limiter/BlockingLimiterTest.java | 2 +- concurrency-limits-grpc/build.gradle | 1 + .../grpc/client/GrpcClientLimiterBuilder.java | 2 +- .../grpc/server/GrpcServerLimiterBuilder.java | 2 +- ...ConcurrencyLimitClientInterceptorTest.java | 8 +- ...ConcurrencyLimitServerInterceptorTest.java | 8 +- .../limits/servlet/ServletLimiterBuilder.java | 2 +- .../spectator/SpectatorMetricRegistry.java | 7 +- 12 files changed, 128 insertions(+), 22 deletions(-) diff --git a/concurrency-limits-core/build.gradle b/concurrency-limits-core/build.gradle index 11bf0bbf..54baa587 100644 --- a/concurrency-limits-core/build.gradle +++ b/concurrency-limits-core/build.gradle @@ -8,4 +8,5 @@ dependencies { compile "org.slf4j:slf4j-api:1.7.+" testCompile 'junit:junit-dep:4.10' + testCompile "org.slf4j:slf4j-log4j12:1.7.+" } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java index 5b8c495e..5f5ebce0 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java @@ -28,6 +28,7 @@ public static class Builder { private int maxConcurrency = 1000; private int alpha = 3; private int beta = 6; + private double tolerance = 1.0; private double backoffRatio = 0.9; private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; private boolean fastStartEnabled = false; @@ -62,6 +63,26 @@ public Builder metricRegistry(MetricRegistry registry) { return this; } + /** + * Tolerance multiple of the windowed RTT measurement when adjusting the limit down. + * A value of 1 means little tolerance for changes in window RTT and the sample is used as + * is to determine queuing. A value of 2 means that the window RTT may be double the + * absolute minimum before being considered + * @param tolerance + * @return + */ + public Builder tolerance(double tolerance) { + Preconditions.checkArgument(tolerance >= 1, "Tolerance must be >= 1"); + this.tolerance = tolerance; + return this; + } + + /** + * When enabled allows for exponential limit growth to quickly discover + * the limit at startup. + * @param fastStartEnabled + * @return Chainable builder + */ public Builder fastStartEnabled(boolean fastStartEnabled) { this.fastStartEnabled = fastStartEnabled; return this; @@ -96,6 +117,7 @@ public static VegasLimit newDefault() { */ private final int maxLimit; + private final double tolerance; private final int alpha; private final int beta; private final double backoffRatio; @@ -107,6 +129,7 @@ private VegasLimit(Builder builder) { this.beta = builder.beta; this.backoffRatio = builder.backoffRatio; this.fastStart = builder.fastStartEnabled; + this.tolerance = builder.tolerance; builder.registry.registerGauge(MetricIds.MIN_RTT_GUAGE_NAME, () -> rtt_noload); } @@ -115,7 +138,7 @@ public synchronized void update(long rtt) { Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt); if (rtt_noload == 0 || rtt < rtt_noload) { - LOG.info("MinRTT {}", rtt); + LOG.debug("New MinRTT {}", rtt); rtt_noload = rtt; } @@ -124,7 +147,8 @@ public synchronized void update(long rtt) { fastStart = false; } else { int newLimit = estimatedLimit; - int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt)); + long adjusted_rtt = rtt < (rtt_noload * tolerance) ? rtt_noload : rtt; + int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / adjusted_rtt)); if (queueSize <= alpha) { if (fastStart) { newLimit = Math.min(maxLimit, estimatedLimit * 2); @@ -138,8 +162,12 @@ public synchronized void update(long rtt) { newLimit = Math.max(1, Math.min(maxLimit, newLimit)); if (newLimit != estimatedLimit) { - LOG.info("Limit {}", estimatedLimit); estimatedLimit = newLimit; + LOG.debug("New limit={} minRtt={} μs winRtt={} μs queueSize={}", + estimatedLimit, + TimeUnit.NANOSECONDS.toMicros(rtt_noload), + TimeUnit.NANOSECONDS.toMicros(rtt), + queueSize); } } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java index 3419ff7f..d0e8fbe3 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java @@ -10,6 +10,7 @@ import com.netflix.concurrency.limits.Strategy; import com.netflix.concurrency.limits.Strategy.Token; import com.netflix.concurrency.limits.internal.Preconditions; +import com.netflix.concurrency.limits.limit.VegasLimit; /** * {@link Limiter} that combines a plugable limit algorithm and enforcement strategy to @@ -19,13 +20,14 @@ public final class DefaultLimiter implements Limiter { private final Supplier nanoClock = System::nanoTime; - private final long MIN_WINDOW_SIZE = TimeUnit.MILLISECONDS.toNanos(200); + private final static long DEFAULT_MIN_WINDOW_TIME = TimeUnit.MILLISECONDS.toNanos(200); + private final static int DEFAULT_WINDOW_SIZE = 100; /** * Ideal RTT when no queuing occurs. For simplicity we assume the lowest latency * ever observed is the ideal RTT. */ - private volatile long RTT_noload = TimeUnit.MILLISECONDS.toNanos(100); + private volatile long RTT_noload = Long.MAX_VALUE; /** * Smallest observed RTT during the sampling window. @@ -51,16 +53,78 @@ public final class DefaultLimiter implements Limiter { */ private final Limit limit; + /** + * Strategy for enforcing the limit + */ private final Strategy strategy; + /** + * Minimum window size in nanonseconds for sampling a new minRtt + */ + private final long minWindowTime; + + /** + * Sampling window size in multiple of the measured minRtt + */ + private final int windowSize; + + public static class Builder { + private Limit limit = VegasLimit.newDefault(); + private long minWindowTime = DEFAULT_MIN_WINDOW_TIME; + private int windowSize = DEFAULT_WINDOW_SIZE; + + public Builder limit(Limit limit) { + Preconditions.checkArgument(limit != null, "Algorithm may not be null"); + this.limit = limit; + return this; + } + + public Builder minWindowTime(long minWindowTime, TimeUnit units) { + Preconditions.checkArgument(minWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms"); + this.minWindowTime = units.toNanos(minWindowTime); + return this; + } + + public Builder windowSize(int windowSize) { + Preconditions.checkArgument(windowSize >= 10, "Window size must be >= 10"); + this.windowSize = windowSize; + return this; + } + + public DefaultLimiter build(Strategy strategy) { + Preconditions.checkArgument(strategy != null, "Strategy may not be null"); + return new DefaultLimiter(this, strategy); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * @deprecated Use {@link DefaultLimiter#newBuilder} + * @param limit + * @param strategy + */ + @Deprecated public DefaultLimiter(Limit limit, Strategy strategy) { Preconditions.checkArgument(limit != null, "Algorithm may not be null"); Preconditions.checkArgument(strategy != null, "Strategy may not be null"); this.limit = limit; this.strategy = strategy; + this.windowSize = DEFAULT_WINDOW_SIZE; + this.minWindowTime = DEFAULT_MIN_WINDOW_TIME; strategy.setLimit(limit.getLimit()); } + private DefaultLimiter(Builder builder, Strategy strategy) { + this.limit = builder.limit; + this.minWindowTime = builder.minWindowTime; + this.windowSize = builder.windowSize; + this.strategy = strategy; + strategy.setLimit(limit.getLimit()); + } + @Override public Optional acquire(final ContextT context) { final long startTime = nanoClock.get(); @@ -90,7 +154,7 @@ public void onSuccess() { } long updateTime = nextUpdateTime.get(); - if (endTime >= updateTime && nextUpdateTime.compareAndSet(updateTime, endTime + Math.max(MIN_WINDOW_SIZE, RTT_noload * 20))) { + if (endTime >= updateTime && nextUpdateTime.compareAndSet(updateTime, endTime + Math.max(minWindowTime, RTT_noload * windowSize))) { if (!isAppLimited && current != Integer.MAX_VALUE && RTT_candidate.compareAndSet(current, Integer.MAX_VALUE)) { limit.update(current); strategy.setLimit(limit.getLimit()); @@ -116,10 +180,17 @@ public void onDropped() { protected int getLimit() { return limit.getLimit(); } + + /** + * @return Return the minimum observed RTT time or 0 if none found yet + */ + protected long getMinRtt() { + return RTT_noload == Long.MAX_VALUE ? 0 : RTT_noload; + } @Override public String toString() { - return "DefaultLimiter [RTT_noload=" + TimeUnit.NANOSECONDS.toMillis(RTT_noload) + return "DefaultLimiter [RTT_noload=" + TimeUnit.NANOSECONDS.toMillis(getMinRtt()) + ", RTT_candidate=" + TimeUnit.NANOSECONDS.toMillis(RTT_candidate.get()) + ", isAppLimited=" + isAppLimited + ", " + limit diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java index 4b5d0f15..618ff769 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java @@ -21,7 +21,7 @@ public class BlockingAdaptiveExecutorSimulation { @Test public void test() { - DefaultLimiter limiter = new DefaultLimiter<>(AIMDLimit.newBuilder().initialLimit(10).build(), new SimpleStrategy<>()); + DefaultLimiter limiter = DefaultLimiter.newBuilder().limit(AIMDLimit.newBuilder().initialLimit(10).build()).build(new SimpleStrategy<>()); Executor executor = new BlockingAdaptiveExecutor(limiter); run(10000, 20, executor, randomLatency(50, 150)); @@ -29,7 +29,7 @@ public void test() { @Test public void testVegas() { - DefaultLimiter limiter = new DefaultLimiter<>(VegasLimit.newBuilder().initialLimit(100).build(), new SimpleStrategy<>()); + DefaultLimiter limiter = DefaultLimiter.newBuilder().limit(VegasLimit.newBuilder().initialLimit(100).build()).build(new SimpleStrategy<>()); Executor executor = new BlockingAdaptiveExecutor(limiter); run(10000, 50, executor, randomLatency(50, 150)); } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java index 38d87ec3..16d07e6d 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java @@ -12,7 +12,7 @@ public class BlockingLimiterTest { @Test public void test() { SettableLimit limit = SettableLimit.startingAt(10); - BlockingLimiter limiter = BlockingLimiter.wrap(new DefaultLimiter<>(limit, new SimpleStrategy())); + BlockingLimiter limiter = BlockingLimiter.wrap(DefaultLimiter.newBuilder().limit(limit).build(new SimpleStrategy<>())); LinkedList listeners = new LinkedList<>(); for (int i = 0; i < 10; i++) { diff --git a/concurrency-limits-grpc/build.gradle b/concurrency-limits-grpc/build.gradle index 89152bb6..6d05b795 100644 --- a/concurrency-limits-grpc/build.gradle +++ b/concurrency-limits-grpc/build.gradle @@ -14,4 +14,5 @@ dependencies { testCompile "io.grpc:grpc-netty:1.9.0" testCompile "io.grpc:grpc-stub:1.9.0" testCompile "junit:junit-dep:4.10" + testCompile "org.slf4j:slf4j-log4j12:1.7.+" } diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java index f7584385..aa326d1d 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java @@ -41,7 +41,7 @@ protected GrpcClientLimiterBuilder self() { } public Limiter build() { - Limiter limiter = new DefaultLimiter<>(limit, getFinalStrategy()); + Limiter limiter = DefaultLimiter.newBuilder().limit(limit).build(getFinalStrategy()); if (blockOnLimit) { limiter = BlockingLimiter.wrap(limiter); } diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java index 375e7d66..b6e0c2d9 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java @@ -52,6 +52,6 @@ protected GrpcServerLimiterBuilder self() { } public Limiter build() { - return new DefaultLimiter<>(limit, getFinalStrategy()); + return DefaultLimiter.newBuilder().limit(limit).build(getFinalStrategy()); } } diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java index 460456ca..30fbbab6 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java @@ -24,8 +24,12 @@ import org.junit.Test; public class ConcurrencyLimitClientInterceptorTest { - private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.create( - MethodType.UNARY, "service/method", StringMarshaller.INSTANCE, StringMarshaller.INSTANCE); + private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(StringMarshaller.INSTANCE) + .setResponseMarshaller(StringMarshaller.INSTANCE) + .build(); @Test @Ignore diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java index 4077248c..c45daf65 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java @@ -38,8 +38,12 @@ import org.junit.Test; public class ConcurrencyLimitServerInterceptorTest { - private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.create(MethodType.UNARY, - "service/method", StringMarshaller.INSTANCE, StringMarshaller.INSTANCE); + private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(StringMarshaller.INSTANCE) + .setResponseMarshaller(StringMarshaller.INSTANCE) + .build(); private static final Key ID_HEADER = Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER); diff --git a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java index 6a37aefd..fd42c8ee 100644 --- a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java +++ b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java @@ -88,6 +88,6 @@ protected ServletLimiterBuilder self() { } public Limiter build() { - return new DefaultLimiter<>(limit, getFinalStrategy()); + return DefaultLimiter.newBuilder().limit(limit).build(getFinalStrategy()); } } diff --git a/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java b/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java index 56ad4b25..e12be212 100644 --- a/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java +++ b/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java @@ -1,16 +1,13 @@ package com.netflix.concurrency.limits.spectator; +import java.util.function.Supplier; + import com.netflix.concurrency.limits.MetricRegistry; import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.patterns.PolledMeter; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public final class SpectatorMetricRegistry implements MetricRegistry { private final Registry registry; private final Id baseId;