From f2528defbbf080dcaf5567861faec1506bac0aa2 Mon Sep 17 00:00:00 2001 From: Patrick Strawderman Date: Sun, 15 Sep 2024 11:00:27 -0700 Subject: [PATCH] Add methods to SampleListener to handle primitives Add two methods to SampleListener, addLongSample and addDoubleSample, which accept primitives and avoid boxing; default implementations are provided for backwards compatibility. --- .../concurrency/limits/MetricRegistry.java | 8 ++++++++ .../limits/limit/Gradient2Limit.java | 6 +++--- .../limits/limit/GradientLimit.java | 6 +++--- .../concurrency/limits/limit/VegasLimit.java | 2 +- .../limiter/AbstractPartitionedLimiter.java | 4 ++-- .../limits/limiter/SimpleLimiter.java | 2 +- .../spectator/SpectatorMetricRegistry.java | 19 ++++++++++++++++--- 7 files changed, 34 insertions(+), 13 deletions(-) diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/MetricRegistry.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/MetricRegistry.java index bb68f64a..058da789 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/MetricRegistry.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/MetricRegistry.java @@ -27,6 +27,14 @@ public interface MetricRegistry { */ interface SampleListener { void addSample(Number value); + + default void addLongSample(long value) { + addSample(value); + } + + default void addDoubleSample(double value) { + addSample(value); + } } interface Counter { diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java index 4103a199..18a6fd71 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java @@ -268,9 +268,9 @@ public int _update(final long startTime, final long rtt, final int inflight, fin final double shortRtt = (double)rtt; final double longRtt = this.longRtt.add(rtt).doubleValue(); - shortRttSampleListener.addSample(shortRtt); - longRttSampleListener.addSample(longRtt); - queueSizeSampleListener.addSample(queueSize); + shortRttSampleListener.addDoubleSample(shortRtt); + longRttSampleListener.addDoubleSample(longRtt); + queueSizeSampleListener.addDoubleSample(queueSize); // If the long RTT is substantially larger than the short RTT then reduce the long RTT measurement. // This can happen when latency returns to normal after a prolonged prior of excessive load. Reducing the diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java index 7ad67ae5..3961dac0 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java @@ -264,10 +264,10 @@ private int nextProbeCountdown() { @Override public int _update(final long startTime, final long rtt, final int inflight, final boolean didDrop) { lastRtt = rtt; - minWindowRttSampleListener.addSample(rtt); + minWindowRttSampleListener.addLongSample(rtt); final double queueSize = this.queueSize.apply((int)this.estimatedLimit); - queueSizeSampleListener.addSample(queueSize); + queueSizeSampleListener.addDoubleSample(queueSize); // Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit // in half to avoid having the limit drift upwards when the RTT is probed during heavy load. @@ -283,7 +283,7 @@ public int _update(final long startTime, final long rtt, final int inflight, fin } final long rttNoLoad = rttNoLoadMeasurement.add(rtt).longValue(); - minRttSampleListener.addSample(rttNoLoad); + minRttSampleListener.addLongSample(rttNoLoad); // Rtt could be higher than rtt_noload because of smoothing rtt noload updates // so set to 1.0 to indicate no queuing. Otherwise calculate the slope and don't diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java index 6ef716e2..b7b57fa9 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java @@ -218,7 +218,7 @@ protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { return (int)estimatedLimit; } - rttSampleListener.addSample(rtt_noload); + rttSampleListener.addLongSample(rtt_noload); return updateEstimatedLimit(rtt, inflight, didDrop); } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java index 36bb6adc..53109512 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java @@ -138,7 +138,7 @@ boolean isLimitExceeded() { void acquire() { int nowBusy = busy.incrementAndGet(); - inflightDistribution.addSample(nowBusy); + inflightDistribution.addLongSample(nowBusy); } /** @@ -149,7 +149,7 @@ boolean tryAcquire() { int current = busy.get(); while (current < limit) { if (busy.compareAndSet(current, current + 1)) { - inflightDistribution.addSample(current + 1); + inflightDistribution.addLongSample(current + 1); return true; } current = busy.get(); diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java index afd8acdc..0b6cd382 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java @@ -59,7 +59,7 @@ else if (!semaphore.tryAcquire()) { else { listener = Optional.of(new Listener(createListener())); } - inflightDistribution.addSample(getInflight()); + inflightDistribution.addLongSample(getInflight()); return listener; } diff --git a/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java b/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java index dd974b1c..17a52097 100644 --- a/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java +++ b/concurrency-limits-spectator/src/main/java/com/netflix/concurrency/limits/spectator/SpectatorMetricRegistry.java @@ -31,11 +31,24 @@ public SpectatorMetricRegistry(Registry registry, Id baseId) { this.registry = registry; this.baseId = baseId; } - + @Override public SampleListener distribution(String id, String... tagNameValuePairs) { DistributionSummary summary = registry.distributionSummary(suffixBaseId(id).withTags(tagNameValuePairs)); - return value -> summary.record(value.longValue()); + return new SampleListener() { + @Override + public void addSample(Number value) { + summary.record(value.longValue()); + } + @Override + public void addLongSample(long value) { + summary.record(value); + } + @Override + public void addDoubleSample(double value) { + summary.record((long) value); + } + }; } @Override @@ -52,7 +65,7 @@ public void gauge(String id, Supplier supplier, String... tagNameValuePa public Counter counter(String id, String... tagNameValuePairs) { Id metricId = suffixBaseId(id).withTags(tagNameValuePairs); com.netflix.spectator.api.Counter spectatorCounter = registry.counter(metricId); - return () -> spectatorCounter.increment(); + return spectatorCounter::increment; } private Id suffixBaseId(String suffix) {