Skip to content

Commit

Permalink
Merge pull request #14 from Netflix/feature/tolerance
Browse files Browse the repository at this point in the history
Introduce tolerance for min rtt changes
  • Loading branch information
elandau authored Feb 7, 2018
2 parents a9c61a3 + cf2d590 commit dfb6ba7
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 22 deletions.
1 change: 1 addition & 0 deletions concurrency-limits-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ dependencies {
compile "org.slf4j:slf4j-api:1.7.+"

testCompile 'junit:junit-dep:4.10'
testCompile "org.slf4j:slf4j-log4j12:1.7.+"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static class Builder {
private int maxConcurrency = 1000;
private int alpha = 3;
private int beta = 6;
private double tolerance = 1.0;
private double backoffRatio = 0.9;
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private boolean fastStartEnabled = false;
Expand Down Expand Up @@ -62,6 +63,26 @@ public Builder metricRegistry(MetricRegistry registry) {
return this;
}

/**
* Tolerance multiple of the windowed RTT measurement when adjusting the limit down.
* A value of 1 means little tolerance for changes in window RTT and the sample is used as
* is to determine queuing. A value of 2 means that the window RTT may be double the
* absolute minimum before being considered
* @param tolerance
* @return
*/
public Builder tolerance(double tolerance) {
Preconditions.checkArgument(tolerance >= 1, "Tolerance must be >= 1");
this.tolerance = tolerance;
return this;
}

/**
* When enabled allows for exponential limit growth to quickly discover
* the limit at startup.
* @param fastStartEnabled
* @return Chainable builder
*/
public Builder fastStartEnabled(boolean fastStartEnabled) {
this.fastStartEnabled = fastStartEnabled;
return this;
Expand Down Expand Up @@ -96,6 +117,7 @@ public static VegasLimit newDefault() {
*/
private final int maxLimit;

private final double tolerance;
private final int alpha;
private final int beta;
private final double backoffRatio;
Expand All @@ -107,6 +129,7 @@ private VegasLimit(Builder builder) {
this.beta = builder.beta;
this.backoffRatio = builder.backoffRatio;
this.fastStart = builder.fastStartEnabled;
this.tolerance = builder.tolerance;
builder.registry.registerGauge(MetricIds.MIN_RTT_GUAGE_NAME, () -> rtt_noload);
}

Expand All @@ -115,7 +138,7 @@ public synchronized void update(long rtt) {
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);

if (rtt_noload == 0 || rtt < rtt_noload) {
LOG.info("MinRTT {}", rtt);
LOG.debug("New MinRTT {}", rtt);
rtt_noload = rtt;
}

Expand All @@ -124,7 +147,8 @@ public synchronized void update(long rtt) {
fastStart = false;
} else {
int newLimit = estimatedLimit;
int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
long adjusted_rtt = rtt < (rtt_noload * tolerance) ? rtt_noload : rtt;
int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / adjusted_rtt));
if (queueSize <= alpha) {
if (fastStart) {
newLimit = Math.min(maxLimit, estimatedLimit * 2);
Expand All @@ -138,8 +162,12 @@ public synchronized void update(long rtt) {

newLimit = Math.max(1, Math.min(maxLimit, newLimit));
if (newLimit != estimatedLimit) {
LOG.info("Limit {}", estimatedLimit);
estimatedLimit = newLimit;
LOG.debug("New limit={} minRtt={} μs winRtt={} μs queueSize={}",
estimatedLimit,
TimeUnit.NANOSECONDS.toMicros(rtt_noload),
TimeUnit.NANOSECONDS.toMicros(rtt),
queueSize);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.netflix.concurrency.limits.Strategy;
import com.netflix.concurrency.limits.Strategy.Token;
import com.netflix.concurrency.limits.internal.Preconditions;
import com.netflix.concurrency.limits.limit.VegasLimit;

/**
* {@link Limiter} that combines a plugable limit algorithm and enforcement strategy to
Expand All @@ -19,13 +20,14 @@
public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {
private final Supplier<Long> nanoClock = System::nanoTime;

private final long MIN_WINDOW_SIZE = TimeUnit.MILLISECONDS.toNanos(200);
private final static long DEFAULT_MIN_WINDOW_TIME = TimeUnit.MILLISECONDS.toNanos(200);
private final static int DEFAULT_WINDOW_SIZE = 100;

/**
* Ideal RTT when no queuing occurs. For simplicity we assume the lowest latency
* ever observed is the ideal RTT.
*/
private volatile long RTT_noload = TimeUnit.MILLISECONDS.toNanos(100);
private volatile long RTT_noload = Long.MAX_VALUE;

/**
* Smallest observed RTT during the sampling window.
Expand All @@ -51,16 +53,78 @@ public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {
*/
private final Limit limit;

/**
* Strategy for enforcing the limit
*/
private final Strategy<ContextT> strategy;

/**
* Minimum window size in nanonseconds for sampling a new minRtt
*/
private final long minWindowTime;

/**
* Sampling window size in multiple of the measured minRtt
*/
private final int windowSize;

public static class Builder {
private Limit limit = VegasLimit.newDefault();
private long minWindowTime = DEFAULT_MIN_WINDOW_TIME;
private int windowSize = DEFAULT_WINDOW_SIZE;

public Builder limit(Limit limit) {
Preconditions.checkArgument(limit != null, "Algorithm may not be null");
this.limit = limit;
return this;
}

public Builder minWindowTime(long minWindowTime, TimeUnit units) {
Preconditions.checkArgument(minWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms");
this.minWindowTime = units.toNanos(minWindowTime);
return this;
}

public Builder windowSize(int windowSize) {
Preconditions.checkArgument(windowSize >= 10, "Window size must be >= 10");
this.windowSize = windowSize;
return this;
}

public <ContextT> DefaultLimiter<ContextT> build(Strategy<ContextT> strategy) {
Preconditions.checkArgument(strategy != null, "Strategy may not be null");
return new DefaultLimiter<ContextT>(this, strategy);
}
}

public static Builder newBuilder() {
return new Builder();
}

/**
* @deprecated Use {@link DefaultLimiter#newBuilder}
* @param limit
* @param strategy
*/
@Deprecated
public DefaultLimiter(Limit limit, Strategy<ContextT> strategy) {
Preconditions.checkArgument(limit != null, "Algorithm may not be null");
Preconditions.checkArgument(strategy != null, "Strategy may not be null");
this.limit = limit;
this.strategy = strategy;
this.windowSize = DEFAULT_WINDOW_SIZE;
this.minWindowTime = DEFAULT_MIN_WINDOW_TIME;
strategy.setLimit(limit.getLimit());
}

private DefaultLimiter(Builder builder, Strategy<ContextT> strategy) {
this.limit = builder.limit;
this.minWindowTime = builder.minWindowTime;
this.windowSize = builder.windowSize;
this.strategy = strategy;
strategy.setLimit(limit.getLimit());
}

@Override
public Optional<Listener> acquire(final ContextT context) {
final long startTime = nanoClock.get();
Expand Down Expand Up @@ -90,7 +154,7 @@ public void onSuccess() {
}

long updateTime = nextUpdateTime.get();
if (endTime >= updateTime && nextUpdateTime.compareAndSet(updateTime, endTime + Math.max(MIN_WINDOW_SIZE, RTT_noload * 20))) {
if (endTime >= updateTime && nextUpdateTime.compareAndSet(updateTime, endTime + Math.max(minWindowTime, RTT_noload * windowSize))) {
if (!isAppLimited && current != Integer.MAX_VALUE && RTT_candidate.compareAndSet(current, Integer.MAX_VALUE)) {
limit.update(current);
strategy.setLimit(limit.getLimit());
Expand All @@ -116,10 +180,17 @@ public void onDropped() {
protected int getLimit() {
return limit.getLimit();
}

/**
* @return Return the minimum observed RTT time or 0 if none found yet
*/
protected long getMinRtt() {
return RTT_noload == Long.MAX_VALUE ? 0 : RTT_noload;
}

@Override
public String toString() {
return "DefaultLimiter [RTT_noload=" + TimeUnit.NANOSECONDS.toMillis(RTT_noload)
return "DefaultLimiter [RTT_noload=" + TimeUnit.NANOSECONDS.toMillis(getMinRtt())
+ ", RTT_candidate=" + TimeUnit.NANOSECONDS.toMillis(RTT_candidate.get())
+ ", isAppLimited=" + isAppLimited
+ ", " + limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
public class BlockingAdaptiveExecutorSimulation {
@Test
public void test() {
DefaultLimiter<Void> limiter = new DefaultLimiter<>(AIMDLimit.newBuilder().initialLimit(10).build(), new SimpleStrategy<>());
DefaultLimiter<Void> limiter = DefaultLimiter.newBuilder().limit(AIMDLimit.newBuilder().initialLimit(10).build()).build(new SimpleStrategy<>());
Executor executor = new BlockingAdaptiveExecutor(limiter);

run(10000, 20, executor, randomLatency(50, 150));
}

@Test
public void testVegas() {
DefaultLimiter<Void> limiter = new DefaultLimiter<>(VegasLimit.newBuilder().initialLimit(100).build(), new SimpleStrategy<>());
DefaultLimiter<Void> limiter = DefaultLimiter.newBuilder().limit(VegasLimit.newBuilder().initialLimit(100).build()).build(new SimpleStrategy<>());
Executor executor = new BlockingAdaptiveExecutor(limiter);
run(10000, 50, executor, randomLatency(50, 150));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class BlockingLimiterTest {
@Test
public void test() {
SettableLimit limit = SettableLimit.startingAt(10);
BlockingLimiter<Void> limiter = BlockingLimiter.wrap(new DefaultLimiter<>(limit, new SimpleStrategy()));
BlockingLimiter<Void> limiter = BlockingLimiter.wrap(DefaultLimiter.newBuilder().limit(limit).build(new SimpleStrategy<>()));

LinkedList<Limiter.Listener> listeners = new LinkedList<>();
for (int i = 0; i < 10; i++) {
Expand Down
1 change: 1 addition & 0 deletions concurrency-limits-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ dependencies {
testCompile "io.grpc:grpc-netty:1.9.0"
testCompile "io.grpc:grpc-stub:1.9.0"
testCompile "junit:junit-dep:4.10"
testCompile "org.slf4j:slf4j-log4j12:1.7.+"
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected GrpcClientLimiterBuilder self() {
}

public Limiter<GrpcClientRequestContext> build() {
Limiter<GrpcClientRequestContext> limiter = new DefaultLimiter<>(limit, getFinalStrategy());
Limiter<GrpcClientRequestContext> limiter = DefaultLimiter.newBuilder().limit(limit).build(getFinalStrategy());
if (blockOnLimit) {
limiter = BlockingLimiter.wrap(limiter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ protected GrpcServerLimiterBuilder self() {
}

public Limiter<GrpcServerRequestContext> build() {
return new DefaultLimiter<>(limit, getFinalStrategy());
return DefaultLimiter.newBuilder().limit(limit).build(getFinalStrategy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import org.junit.Test;

public class ConcurrencyLimitClientInterceptorTest {
private static final MethodDescriptor<String, String> METHOD_DESCRIPTOR = MethodDescriptor.create(
MethodType.UNARY, "service/method", StringMarshaller.INSTANCE, StringMarshaller.INSTANCE);
private static final MethodDescriptor<String, String> METHOD_DESCRIPTOR = MethodDescriptor.<String, String>newBuilder()
.setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(StringMarshaller.INSTANCE)
.setResponseMarshaller(StringMarshaller.INSTANCE)
.build();

@Test
@Ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@
import org.junit.Test;

public class ConcurrencyLimitServerInterceptorTest {
private static final MethodDescriptor<String, String> METHOD_DESCRIPTOR = MethodDescriptor.create(MethodType.UNARY,
"service/method", StringMarshaller.INSTANCE, StringMarshaller.INSTANCE);
private static final MethodDescriptor<String, String> METHOD_DESCRIPTOR = MethodDescriptor.<String, String>newBuilder()
.setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(StringMarshaller.INSTANCE)
.setResponseMarshaller(StringMarshaller.INSTANCE)
.build();

private static final Key<String> ID_HEADER = Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ protected ServletLimiterBuilder self() {
}

public Limiter<HttpServletRequest> build() {
return new DefaultLimiter<>(limit, getFinalStrategy());
return DefaultLimiter.newBuilder().limit(limit).build(getFinalStrategy());
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.netflix.concurrency.limits.spectator;

import java.util.function.Supplier;

import com.netflix.concurrency.limits.MetricRegistry;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.patterns.PolledMeter;

import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SpectatorMetricRegistry implements MetricRegistry {
private final Registry registry;
private final Id baseId;
Expand Down

0 comments on commit dfb6ba7

Please sign in to comment.