Skip to content

Commit

Permalink
Merge pull request #208 from kilink/sample-listener-primitive
Browse files Browse the repository at this point in the history
Add methods to SampleListener to handle primitives
  • Loading branch information
kilink authored Sep 18, 2024
2 parents 450aa7a + f2528de commit 7989055
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public interface MetricRegistry {
*/
interface SampleListener {
void addSample(Number value);

default void addLongSample(long value) {
addSample(value);
}

default void addDoubleSample(double value) {
addSample(value);
}
}

interface Counter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
final double shortRtt = (double)rtt;
final double longRtt = this.longRtt.add(rtt).doubleValue();

shortRttSampleListener.addSample(shortRtt);
longRttSampleListener.addSample(longRtt);
queueSizeSampleListener.addSample(queueSize);
shortRttSampleListener.addDoubleSample(shortRtt);
longRttSampleListener.addDoubleSample(longRtt);
queueSizeSampleListener.addDoubleSample(queueSize);

// If the long RTT is substantially larger than the short RTT then reduce the long RTT measurement.
// This can happen when latency returns to normal after a prolonged prior of excessive load. Reducing the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ private int nextProbeCountdown() {
@Override
public int _update(final long startTime, final long rtt, final int inflight, final boolean didDrop) {
lastRtt = rtt;
minWindowRttSampleListener.addSample(rtt);
minWindowRttSampleListener.addLongSample(rtt);

final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
queueSizeSampleListener.addSample(queueSize);
queueSizeSampleListener.addDoubleSample(queueSize);

// Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
Expand All @@ -283,7 +283,7 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
}

final long rttNoLoad = rttNoLoadMeasurement.add(rtt).longValue();
minRttSampleListener.addSample(rttNoLoad);
minRttSampleListener.addLongSample(rttNoLoad);

// Rtt could be higher than rtt_noload because of smoothing rtt noload updates
// so set to 1.0 to indicate no queuing. Otherwise calculate the slope and don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ protected int _update(long startTime, long rtt, int inflight, boolean didDrop) {
return (int)estimatedLimit;
}

rttSampleListener.addSample(rtt_noload);
rttSampleListener.addLongSample(rtt_noload);

return updateEstimatedLimit(rtt, inflight, didDrop);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ boolean isLimitExceeded() {

void acquire() {
int nowBusy = busy.incrementAndGet();
inflightDistribution.addSample(nowBusy);
inflightDistribution.addLongSample(nowBusy);
}

/**
Expand All @@ -149,7 +149,7 @@ boolean tryAcquire() {
int current = busy.get();
while (current < limit) {
if (busy.compareAndSet(current, current + 1)) {
inflightDistribution.addSample(current + 1);
inflightDistribution.addLongSample(current + 1);
return true;
}
current = busy.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ else if (!semaphore.tryAcquire()) {
else {
listener = Optional.of(new Listener(createListener()));
}
inflightDistribution.addSample(getInflight());
inflightDistribution.addLongSample(getInflight());
return listener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,24 @@ public SpectatorMetricRegistry(Registry registry, Id baseId) {
this.registry = registry;
this.baseId = baseId;
}

@Override
public SampleListener distribution(String id, String... tagNameValuePairs) {
DistributionSummary summary = registry.distributionSummary(suffixBaseId(id).withTags(tagNameValuePairs));
return value -> summary.record(value.longValue());
return new SampleListener() {
@Override
public void addSample(Number value) {
summary.record(value.longValue());
}
@Override
public void addLongSample(long value) {
summary.record(value);
}
@Override
public void addDoubleSample(double value) {
summary.record((long) value);
}
};
}

@Override
Expand All @@ -52,7 +65,7 @@ public void gauge(String id, Supplier<Number> supplier, String... tagNameValuePa
public Counter counter(String id, String... tagNameValuePairs) {
Id metricId = suffixBaseId(id).withTags(tagNameValuePairs);
com.netflix.spectator.api.Counter spectatorCounter = registry.counter(metricId);
return () -> spectatorCounter.increment();
return spectatorCounter::increment;
}

private Id suffixBaseId(String suffix) {
Expand Down

0 comments on commit 7989055

Please sign in to comment.