Skip to content

Commit

Permalink
Merge pull request #207 from kilink/gradient2-limit-int-unary-operator
Browse files Browse the repository at this point in the history
Use IntUnaryOperator in Gradient2Limit for queueSize
  • Loading branch information
kilink authored Sep 19, 2024
2 parents 7989055 + 26f23a0 commit 74bb308
Showing 1 changed file with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;

/**
* Concurrency limit algorithm that adjusts the limit based on the gradient of change of the current average RTT and
Expand Down Expand Up @@ -76,7 +77,7 @@ public static class Builder {
private int maxConcurrency = 200;

private double smoothing = 0.2;
private Function<Integer, Integer> queueSize = concurrency -> 4;
private IntUnaryOperator queueSize = concurrency -> 4;
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private int longWindow = 600;
private double rttTolerance = 1.5;
Expand Down Expand Up @@ -127,10 +128,23 @@ public Builder queueSize(int queueSize) {
/**
* Function to dynamically determine the amount the estimated limit can grow while
* latencies remain low as a function of the current limit.
* @param queueSize
* @param queueSize the queue size function
* @return Chainable builder
* @deprecated use {@link #queueSizeFunction(IntUnaryOperator)}
*/
@Deprecated
public Builder queueSize(Function<Integer, Integer> queueSize) {
this.queueSize = queueSize::apply;
return this;
}

/**
* Function to dynamically determine the amount the estimated limit can grow while
* latencies remain low as a function of the current limit.
* @param queueSize the queue size function
* @return Chainable builder
*/
public Builder queueSizeFunction(IntUnaryOperator queueSize) {
this.queueSize = queueSize;
return this;
}
Expand Down Expand Up @@ -231,7 +245,7 @@ public static Gradient2Limit newDefault() {

private final int minLimit;

private final Function<Integer, Integer> queueSize;
private final IntUnaryOperator queueSize;

private final double smoothing;

Expand Down Expand Up @@ -262,10 +276,11 @@ private Gradient2Limit(Builder builder) {

@Override
public int _update(final long startTime, final long rtt, final int inflight, final boolean didDrop) {
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
double estimatedLimit = this.estimatedLimit;
final double queueSize = this.queueSize.applyAsInt((int) estimatedLimit);

this.lastRtt = rtt;
final double shortRtt = (double)rtt;
final double shortRtt = (double) rtt;
final double longRtt = this.longRtt.add(rtt).doubleValue();

shortRttSampleListener.addDoubleSample(shortRtt);
Expand Down Expand Up @@ -293,7 +308,7 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
newLimit = estimatedLimit * (1 - smoothing) + newLimit * smoothing;
newLimit = Math.max(minLimit, Math.min(maxLimit, newLimit));

if ((int)estimatedLimit != newLimit) {
if ((int) estimatedLimit != newLimit && LOG.isDebugEnabled()) {
LOG.debug("New limit={} shortRtt={} ms longRtt={} ms queueSize={} gradient={}",
(int)newLimit,
getLastRtt(TimeUnit.MICROSECONDS) / 1000.0,
Expand All @@ -302,9 +317,9 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
gradient);
}

estimatedLimit = newLimit;
this.estimatedLimit = newLimit;

return (int)estimatedLimit;
return (int) newLimit;
}

public long getLastRtt(TimeUnit units) {
Expand Down

0 comments on commit 74bb308

Please sign in to comment.