Skip to content

Commit

Permalink
Fix long task timer
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerwowen committed Jul 24, 2024
1 parent cddbb72 commit e874301
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,59 +1,71 @@
/**
* Copyright (c) 2024 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.teletraan.universal.metrics.micrometer;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import java.lang.reflect.Field;
import java.time.Instant;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.internal.CumulativeHistogramLongTaskTimer;

/**
* This is a custom implementation of {@link LongTaskTimer}
* to support user supplied start time.
*/
public class PinStatsLongTaskTimer extends CumulativeHistogramLongTaskTimer {
private static final Logger LOG = LoggerFactory.getLogger(PinStatsLongTaskTimer.class);
/** This is a custom implementation of {@link LongTaskTimer} to support user supplied start time. */
public class PinStatsLongTaskTimer extends DefaultLongTaskTimer {
private static final Logger LOG = LoggerFactory.getLogger(PinStatsLongTaskTimer.class);

private final Clock clock;
private final Clock clock;

public PinStatsLongTaskTimer(Id id, Clock clock, TimeUnit baseTimeUnit,
DistributionStatisticConfig distributionStatisticConfig) {
super(id, clock, baseTimeUnit, distributionStatisticConfig);
this.clock = clock;
}
public PinStatsLongTaskTimer(
Id id,
Clock clock,
TimeUnit baseTimeUnit,
DistributionStatisticConfig distributionStatisticConfig) {
super(id, clock, baseTimeUnit, distributionStatisticConfig, false);
this.clock = clock;
}

/**
* Start the timer with user supplied start time.
*
* This method can only provide an approximation of the start time. Therefore it
* is not suitable for high precision use cases. You shouldn't use a
* {@link LongTaskTimer} for tracking high precision durations anyways.
*
* If for any reason the start time cannot be set, the current time will be
* used.
*
* @param startTime start time
* @return the sample with specified start time
*/
public Sample start(Instant startTime) {
Sample sample = start();
try {
long timeLapsed = clock.wallTime() - startTime.toEpochMilli();
long monotonicStartTime = clock.monotonicTime() - timeLapsed * 1000000;
// The class `SampleImpl` is not visible, so we have to use reflection to set
// the start time.
Class<?> sampleImplClass = sample.getClass();
Field field = sampleImplClass.getDeclaredField("startTime");
field.setAccessible(true);
field.set(sample, monotonicStartTime);
} catch (Exception e) {
LOG.error("Failed to set start time, use current time instead", e);
/**
* Start the timer with user supplied start time.
*
* <p>This method can only provide an approximation of the start time. Therefore it is not
* suitable for high precision use cases. You shouldn't use a {@link LongTaskTimer} for tracking
* high precision durations anyways.
*
* <p>If for any reason the start time cannot be set, the current time will be used.
*
* @param startTime start time
* @return the sample with specified start time
*/
public Sample start(Instant startTime) {
Sample sample = start();
try {
long timeLapsed = clock.wallTime() - startTime.toEpochMilli();
long monotonicStartTime = clock.monotonicTime() - timeLapsed * 1000000;
// The class `SampleImpl` is not visible, so we have to use reflection to set
// the start time.
Class<?> sampleImplClass = sample.getClass();
Field field = sampleImplClass.getDeclaredField("startTime");
field.setAccessible(true);
field.set(sample, monotonicStartTime);
} catch (Exception e) {
LOG.error("Failed to set start time, use current time instead", e);
}
return sample;
}
return sample;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -47,7 +48,11 @@ protected LongTaskTimer newLongTaskTimer(
id, clock, getBaseTimeUnit(), distributionStatisticConfig);
}
};
sut = (PinStatsLongTaskTimer) LongTaskTimer.builder("my.ltt").register(registry);
sut =
(PinStatsLongTaskTimer)
LongTaskTimer.builder("my.ltt")
.serviceLevelObjectives(Duration.ofSeconds(10))
.register(registry);
}

@Test
Expand All @@ -71,4 +76,26 @@ void withInput_start_inputAsStartTime() {
clock.addSeconds(5);
assertEquals(10 * 1000, sample.duration(TimeUnit.MILLISECONDS), 0.1);
}

@Test
void testGaugeHistogram() {
Sample sample = sut.start();
sut.start();
clock.addSeconds(1);

// both active and within SLO
assertEquals(2, sut.activeTasks());
assertEquals(2, sut.takeSnapshot().histogramCounts()[0].count());

// 1 remains active
sample.stop();
assertEquals(1, sut.activeTasks());
assertEquals(1, sut.takeSnapshot().histogramCounts()[0].count());

// remaining exceeds SLO
clock.addSeconds(10);

assertEquals(1, sut.activeTasks());
assertEquals(0, sut.takeSnapshot().histogramCounts()[0].count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.LongTaskTimer.Sample;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Meter.Type;
import io.micrometer.core.instrument.MockClock;
Expand All @@ -32,7 +33,9 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -98,49 +101,88 @@ void histogramBucketsHaveCorrectBaseUnit() {
clock.add(config.step());

publisher.writeTimer(timer).forEach(LOG::debug);
Supplier<Stream<String>> stream = () -> publisher.writeTimer(timer);

assertTrue(
publisher
.writeTimer(timer)
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.timers.my_timer_duration_seconds.bucket 60001 1 le=1.0\n")));
assertTrue(
publisher
.writeTimer(timer)
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.timers.my_timer_duration_seconds.bucket 60001 0 le=0.9\n")));
}

@Test
void longTaskTimer() {
LongTaskTimer timer = LongTaskTimer.builder("my.timer").tags(tags).register(meterRegistry);
void writeLongTaskTimer() {
LongTaskTimer timer =
LongTaskTimer.builder("my.timer")
.tags(tags)
.serviceLevelObjectives(Duration.ofSeconds(5))
.register(meterRegistry);
Sample s1 = timer.start();
timer.start();

clock.addSeconds(5);
s1.stop();
publisher.writeLongTaskTimer(timer).forEach(LOG::debug);
Supplier<Stream<String>> stream = () -> publisher.writeLongTaskTimer(timer);

assertTrue(
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.bucket 5001 1 le=5.0 tag=value\n")));
assertTrue(
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.bucket 5001 1 le=+Inf tag=value\n")));

clock.addSeconds(1);

assertTrue(
publisher
.writeLongTaskTimer(timer)
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.active_count 6001 1 tag=value\n")));
assertTrue(
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.duration_sum 6001 6 tag=value\n")));
assertTrue(
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.max 6001 6 tag=value\n")));
assertTrue(
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.active_count 1 0 tag=value\n")));
"put mm.my_timer_duration_seconds.max 6001 6 tag=value\n")));
assertTrue(
publisher
.writeLongTaskTimer(timer)
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.duration_sum 1 0 tag=value\n")));
"put mm.my_timer_duration_seconds.bucket 6001 0 le=5.0 tag=value\n")));
assertTrue(
publisher
.writeLongTaskTimer(timer)
stream.get()
.anyMatch(
t ->
t.equals(
"put mm.my_timer_duration_seconds.max 1 0 tag=value\n")));
"put mm.my_timer_duration_seconds.bucket 6001 1 le=+Inf tag=value\n")));
}

@Test
Expand Down

0 comments on commit e874301

Please sign in to comment.