Skip to content

Commit

Permalink
Merge pull request #36 from elandau/bugfix/vegas_smoothing
Browse files Browse the repository at this point in the history
Fix Vegas smoothing and set default to 1.0
  • Loading branch information
elandau authored Apr 5, 2018
2 parents 04a21c7 + 8e11d8e commit 39486b6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public static class Builder {
private int initialLimit = 20;
private int maxConcurrency = 1000;
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private double smoothing = 0.2;
private double smoothing = 1.0;

private Function<Integer, Integer> alpha = (limit) -> 3;
private Function<Integer, Integer> beta = (limit) -> 6;
private Function<Integer, Integer> increaseFunc = (limit) -> limit + 1;
private Function<Integer, Integer> decreaseFunc = (limit) -> limit - 1;
private Function<Double, Double> increaseFunc = (limit) -> limit + 1;
private Function<Double, Double> decreaseFunc = (limit) -> limit - 1;

public Builder alpha(int alpha) {
this.alpha = (ignore) -> alpha;
Expand All @@ -56,12 +56,12 @@ public Builder beta(Function<Integer, Integer> beta) {
return this;
}

public Builder increase(Function<Integer, Integer> increase) {
public Builder increase(Function<Double, Double> increase) {
this.increaseFunc = increase;
return this;
}

public Builder decrease(Function<Integer, Integer> decrease) {
public Builder decrease(Function<Double, Double> decrease) {
this.decreaseFunc = decrease;
return this;
}
Expand Down Expand Up @@ -128,8 +128,8 @@ public static VegasLimit newDefault() {
private final double smoothing;
private final Function<Integer, Integer> alphaFunc;
private final Function<Integer, Integer> betaFunc;
private final Function<Integer, Integer> increaseFunc;
private final Function<Integer, Integer> decreaseFunc;
private final Function<Double, Double> increaseFunc;
private final Function<Double, Double> decreaseFunc;

private VegasLimit(Builder builder) {
this.estimatedLimit = builder.initialLimit;
Expand All @@ -153,25 +153,25 @@ public synchronized void update(long rtt, int maxInFlight) {
double newLimit;
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
if (didDrop) {
newLimit = decreaseFunc.apply((int)estimatedLimit);
newLimit = decreaseFunc.apply(estimatedLimit);
didDrop = false;
} else if (maxInFlight < estimatedLimit) {
} else if (maxInFlight + queueSize < estimatedLimit) {
return;
} else {
int alpha = alphaFunc.apply((int)estimatedLimit);
int beta = betaFunc.apply((int)estimatedLimit);

if (queueSize < alpha) {
newLimit = increaseFunc.apply((int)estimatedLimit);
newLimit = increaseFunc.apply(estimatedLimit);
} else if (queueSize > beta) {
newLimit = decreaseFunc.apply((int)estimatedLimit);
newLimit = decreaseFunc.apply(estimatedLimit);
} else {
return;
}
}

newLimit = Math.max(1, Math.min(maxLimit, newLimit));
newLimit = (int) ((1 - smoothing) * estimatedLimit + smoothing * newLimit);
newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} μs winRtt={} μs queueSize={}",
estimatedLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,47 @@ public void noChangeIfWithinThresholds() {
limit.update(TimeUnit.MILLISECONDS.toNanos(14), 14);
Assert.assertEquals(11, limit.getLimit());
}

@Test
public void decreaseSmoothing() {
VegasLimit limit = VegasLimit.newBuilder()
.decrease(current -> current / 2)
.smoothing(0.5)
.initialLimit(100)
.maxConcurrency(200)
.build();

// Pick up first min-rtt
limit.update(TimeUnit.MILLISECONDS.toNanos(10), 100);
Assert.assertEquals(100, limit.getLimit());

// First decrease
limit.update(TimeUnit.MILLISECONDS.toNanos(20), 100);
Assert.assertEquals(75, limit.getLimit());

// Second decrease
limit.update(TimeUnit.MILLISECONDS.toNanos(20), 100);
Assert.assertEquals(56, limit.getLimit());
}

@Test
public void decreaseWithoutSmoothing() {
VegasLimit limit = VegasLimit.newBuilder()
.decrease(current -> current / 2)
.initialLimit(100)
.maxConcurrency(200)
.build();

// Pick up first min-rtt
limit.update(TimeUnit.MILLISECONDS.toNanos(10), 100);
Assert.assertEquals(101, limit.getLimit());

// First decrease
limit.update(TimeUnit.MILLISECONDS.toNanos(20), 100);
Assert.assertEquals(50, limit.getLimit());

// Second decrease
limit.update(TimeUnit.MILLISECONDS.toNanos(20), 100);
Assert.assertEquals(25, limit.getLimit());
}
}

0 comments on commit 39486b6

Please sign in to comment.