Skip to content

Commit

Permalink
Use IntUnaryOperator / DoubleUnaryOperator in VegasLimit
Browse files Browse the repository at this point in the history
Switch to IntUnaryOperator and DoubleUnaryOperator in VegasLimit to avoid unnecessary boxing.
Additionally, reduced repeated reads of volatile fields.
  • Loading branch information
kilink committed Oct 24, 2024
1 parent 7989055 commit a127eb4
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import com.netflix.concurrency.limits.MetricRegistry;
import com.netflix.concurrency.limits.MetricRegistry.SampleListener;
import com.netflix.concurrency.limits.internal.EmptyMetricRegistry;
import com.netflix.concurrency.limits.internal.Preconditions;
import com.netflix.concurrency.limits.limit.functions.Log10RootFunction;
import com.netflix.concurrency.limits.limit.functions.Log10RootIntFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;

/**
* Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha)
Expand All @@ -41,19 +42,19 @@
public class VegasLimit extends AbstractLimit {
private static final Logger LOG = LoggerFactory.getLogger(VegasLimit.class);

private static final Function<Integer, Integer> LOG10 = Log10RootFunction.create(0);
private static final IntUnaryOperator LOG10 = Log10RootIntFunction.create(0);

public static class Builder {
private int initialLimit = 20;
private int maxConcurrency = 1000;
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private double smoothing = 1.0;

private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());
private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());
private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());
private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());
private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());
private IntUnaryOperator alphaFunc = (limit) -> 3 * LOG10.applyAsInt(limit);
private IntUnaryOperator betaFunc = (limit) -> 6 * LOG10.applyAsInt(limit);
private IntUnaryOperator thresholdFunc = LOG10;
private DoubleUnaryOperator increaseFunc = (limit) -> limit + LOG10.applyAsInt((int) limit);
private DoubleUnaryOperator decreaseFunc = (limit) -> limit - LOG10.applyAsInt((int) limit);
private int probeMultiplier = 30;

private Builder() {
Expand All @@ -74,13 +75,31 @@ public Builder alpha(int alpha) {
this.alphaFunc = (ignore) -> alpha;
return this;
}


/**
* @deprecated use {@link #thresholdFunction(IntUnaryOperator)}
*/
@Deprecated
public Builder threshold(Function<Integer, Integer> threshold) {
this.thresholdFunc = threshold::apply;
return this;
}

public Builder thresholdFunction(IntUnaryOperator threshold) {
this.thresholdFunc = threshold;
return this;
}


/**
* @deprecated use {@link #alphaFunction(IntUnaryOperator)}
*/
@Deprecated
public Builder alpha(Function<Integer, Integer> alpha) {
this.alphaFunc = alpha::apply;
return this;
}

public Builder alphaFunction(IntUnaryOperator alpha) {
this.alphaFunc = alpha;
return this;
}
Expand All @@ -89,18 +108,45 @@ public Builder beta(int beta) {
this.betaFunc = (ignore) -> beta;
return this;
}


/**
* @deprecated use {@link #betaFunction(IntUnaryOperator)}
*/
@Deprecated
public Builder beta(Function<Integer, Integer> beta) {
this.betaFunc = beta::apply;
return this;
}

public Builder betaFunction(IntUnaryOperator beta) {
this.betaFunc = beta;
return this;
}


/**
* @deprecated use {@link #increaseFunction(DoubleUnaryOperator)}
*/
@Deprecated
public Builder increase(Function<Double, Double> increase) {
this.increaseFunc = increase::apply;
return this;
}

public Builder increaseFunction(DoubleUnaryOperator increase) {
this.increaseFunc = increase;
return this;
}


/**
* @deprecated use {@link #decreaseFunction(DoubleUnaryOperator)}
*/
@Deprecated
public Builder decrease(Function<Double, Double> decrease) {
this.decreaseFunc = decrease::apply;
return this;
}

public Builder decreaseFunction(DoubleUnaryOperator decrease) {
this.decreaseFunc = decrease;
return this;
}
Expand Down Expand Up @@ -164,11 +210,11 @@ public static VegasLimit newDefault() {
private final int maxLimit;

private final double smoothing;
private final Function<Integer, Integer> alphaFunc;
private final Function<Integer, Integer> betaFunc;
private final Function<Integer, Integer> thresholdFunc;
private final Function<Double, Double> increaseFunc;
private final Function<Double, Double> decreaseFunc;
private final IntUnaryOperator alphaFunc;
private final IntUnaryOperator betaFunc;
private final IntUnaryOperator thresholdFunc;
private final DoubleUnaryOperator increaseFunc;
private final DoubleUnaryOperator decreaseFunc;
private final SampleListener rttSampleListener;
private final int probeMultiplier;
private int probeCount = 0;
Expand Down Expand Up @@ -201,69 +247,77 @@ private boolean shouldProbe() {

@Override
protected int _update(long startTime, long rtt, int inflight, boolean didDrop) {
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
if (rtt <= 0) {
throw new IllegalArgumentException("rtt must be >0 but got " + rtt);
}

probeCount++;
if (shouldProbe()) {
LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
if (LOG.isDebugEnabled()) {
LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
}
resetProbeJitter();
probeCount = 0;
rtt_noload = rtt;
return (int)estimatedLimit;
return (int) estimatedLimit;
}


long rtt_noload = this.rtt_noload;
if (rtt_noload == 0 || rtt < rtt_noload) {
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
rtt_noload = rtt;
return (int)estimatedLimit;
if (LOG.isDebugEnabled()) {
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
}
this.rtt_noload = rtt;
return (int) estimatedLimit;
}

rttSampleListener.addLongSample(rtt_noload);

return updateEstimatedLimit(rtt, inflight, didDrop);
return updateEstimatedLimit(rtt, rtt_noload, inflight, didDrop);
}

private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) {
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
private int updateEstimatedLimit(long rtt, long rtt_noload, int inflight, boolean didDrop) {
double estimatedLimit = this.estimatedLimit;
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double) rtt_noload / rtt));

double newLimit;
// Treat any drop (i.e timeout) as needing to reduce the limit
if (didDrop) {
newLimit = decreaseFunc.apply(estimatedLimit);
newLimit = decreaseFunc.applyAsDouble(estimatedLimit);
// Prevent upward drift if not close to the limit
} else if (inflight * 2 < estimatedLimit) {
return (int)estimatedLimit;
return (int) estimatedLimit;
} else {
int alpha = alphaFunc.apply((int)estimatedLimit);
int beta = betaFunc.apply((int)estimatedLimit);
int threshold = this.thresholdFunc.apply((int)estimatedLimit);
int alpha = alphaFunc.applyAsInt((int) estimatedLimit);
int beta = betaFunc.applyAsInt((int) estimatedLimit);
int threshold = thresholdFunc.applyAsInt((int) estimatedLimit);

// Aggressive increase when no queuing
if (queueSize <= threshold) {
newLimit = estimatedLimit + beta;
// Increase the limit if queue is still manageable
} else if (queueSize < alpha) {
newLimit = increaseFunc.apply(estimatedLimit);
newLimit = increaseFunc.applyAsDouble(estimatedLimit);
// Detecting latency so decrease
} else if (queueSize > beta) {
newLimit = decreaseFunc.apply(estimatedLimit);
newLimit = decreaseFunc.applyAsDouble(estimatedLimit);
// We're within he sweet spot so nothing to do
} else {
return (int)estimatedLimit;
return (int) estimatedLimit;
}
}

newLimit = Math.max(1, Math.min(maxLimit, newLimit));
newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
if ((int) newLimit != (int) estimatedLimit && LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",
(int)newLimit,
(int) newLimit,
TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,
TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,
queueSize);
}
estimatedLimit = newLimit;
return (int)estimatedLimit;
this.estimatedLimit = newLimit;
return (int) newLimit;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,33 @@
import java.util.stream.IntStream;

/**
* Function used by limiters to calculate thredsholds using log10 of the current limit.
* Function used by limiters to calculate thresholds using log10 of the current limit.
* Here we pre-compute the log10 of numbers up to 1000 as an optimization.
*
* @deprecated use {@link Log10RootIntFunction}
*/
@Deprecated
public final class Log10RootFunction implements Function<Integer, Integer> {
static final int[] lookup = new int[1000];

static {
IntStream.range(0, 1000).forEach(i -> lookup[i] = Math.max(1, (int)Math.log10(i)));
}

private static final Log10RootFunction INSTANCE = new Log10RootFunction();

/**
* Create an instance of a function that returns : baseline + sqrt(limit)
*
*
* @param baseline
* @return
*/
public static Function<Integer, Integer> create(int baseline) {
return INSTANCE.andThen(t -> t + baseline);
}

@Override
public Integer apply(Integer t) {
return t < 1000 ? lookup[t] : (int)Math.log10(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.netflix.concurrency.limits.limit.functions;

import java.util.function.IntUnaryOperator;

/**
* Function used by limiters to calculate thresholds using log10 of the current limit.
* Here we pre-compute the log10 of numbers up to 1000 as an optimization.
*/
public final class Log10RootIntFunction implements IntUnaryOperator {

private Log10RootIntFunction() {}

private static final int[] lookup = new int[1000];

static {
for (int i = 0; i < lookup.length; i++) {
lookup[i] = Math.max(1, (int) Math.log10(i));
}
}

private static final Log10RootIntFunction INSTANCE = new Log10RootIntFunction();

/**
* Create an instance of a function that returns : baseline + sqrt(limit)
*
* @param baseline
* @return
*/
public static IntUnaryOperator create(int baseline) {
return baseline == 0 ? INSTANCE : INSTANCE.andThen(t -> t + baseline);
}

@Override
public int applyAsInt(int t) {
return t < 1000 ? lookup[t] : (int) Math.log10(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void noChangeIfWithinThresholds() {
@Test
public void decreaseSmoothing() {
VegasLimit limit = VegasLimit.newBuilder()
.decrease(current -> current / 2)
.decreaseFunction(current -> current / 2)
.smoothing(0.5)
.initialLimit(100)
.maxConcurrency(200)
Expand All @@ -77,10 +77,33 @@ public void decreaseSmoothing() {
Assert.assertEquals(56, limit.getLimit());
}

@Test
public void decreaseSmoothingDeprecatedBuilderMethod() {
@SuppressWarnings("deprecation")
VegasLimit limit = VegasLimit.newBuilder()
.decrease(current -> current / 2)
.smoothing(0.5)
.initialLimit(100)
.maxConcurrency(200)
.build();

// Pick up first min-rtt
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false);
Assert.assertEquals(100, limit.getLimit());

// First decrease
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
Assert.assertEquals(75, limit.getLimit());

// Second decrease
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
Assert.assertEquals(56, limit.getLimit());
}

@Test
public void decreaseWithoutSmoothing() {
VegasLimit limit = VegasLimit.newBuilder()
.decrease(current -> current / 2)
.decreaseFunction(current -> current / 2)
.initialLimit(100)
.maxConcurrency(200)
.build();
Expand All @@ -97,4 +120,26 @@ public void decreaseWithoutSmoothing() {
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
Assert.assertEquals(25, limit.getLimit());
}

@Test
public void decreaseWithoutSmoothingDeprecatedBuilderMethod() {
@SuppressWarnings("deprecation")
VegasLimit limit = VegasLimit.newBuilder()
.decrease(current -> current / 2)
.initialLimit(100)
.maxConcurrency(200)
.build();

// Pick up first min-rtt
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false);
Assert.assertEquals(100, limit.getLimit());

// First decrease
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
Assert.assertEquals(50, limit.getLimit());

// Second decrease
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
Assert.assertEquals(25, limit.getLimit());
}
}
Loading

0 comments on commit a127eb4

Please sign in to comment.