diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java index 4f461d6fa63..1c8e760425d 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java @@ -38,7 +38,7 @@ Meter build() { .get("io.opentelemetry.sdk.metrics"); } }), - SDK( + SDK_CUMULATIVE( new SdkBuilder() { @Override Meter build() { @@ -50,6 +50,19 @@ Meter build() { .build() .get("io.opentelemetry.sdk.metrics"); } + }), + SDK_DELTA( + new SdkBuilder() { + @Override + Meter build() { + return SdkMeterProvider.builder() + .setClock(Clock.getDefault()) + .setResource(Resource.empty()) + // Must register reader for real SDK. + .registerMetricReader(InMemoryMetricReader.createDelta()) + .build() + .get("io.opentelemetry.sdk.metrics"); + } }); private final SdkBuilder sdkBuilder; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 3a359d75d23..cf43245637f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.StampedLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,7 +47,8 @@ public final class DefaultSynchronousMetricStorage aggregator; - private final ConcurrentHashMap> aggregatorHandles = + private final StampedLock sl = new StampedLock(); + private ConcurrentHashMap> aggregatorHandles = new ConcurrentHashMap<>(); private final AttributesProcessor attributesProcessor; @@ -83,8 +85,13 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { - AggregatorHandle handle = getAggregatorHandle(attributes, context); - handle.recordLong(value, attributes, context); + long stamp = sl.readLock(); + try { + AggregatorHandle handle = getAggregatorHandle(attributes, context); + handle.recordLong(value, attributes, context); + } finally { + sl.unlockRead(stamp); + } } @Override @@ -99,8 +106,13 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } - AggregatorHandle handle = getAggregatorHandle(attributes, context); - handle.recordDouble(value, attributes, context); + long stamp = sl.readLock(); + try { + AggregatorHandle handle = getAggregatorHandle(attributes, context); + handle.recordDouble(value, attributes, context); + } finally { + sl.unlockRead(stamp); + } } private AggregatorHandle getAggregatorHandle(Attributes attributes, Context context) { @@ -146,13 +158,25 @@ public MetricData collect( ? registeredReader.getLastCollectEpochNanos() : startEpochNanos; + ConcurrentHashMap> aggregatorHandles; + if (reset) { + long stamp = sl.writeLock(); + try { + aggregatorHandles = this.aggregatorHandles; + this.aggregatorHandles = new ConcurrentHashMap<>(); + } finally { + sl.unlockWrite(stamp); + } + } else { + aggregatorHandles = this.aggregatorHandles; + } + // Grab aggregated points. List points = new ArrayList<>(aggregatorHandles.size()); aggregatorHandles.forEach( (attributes, handle) -> { T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset); if (reset) { - aggregatorHandles.remove(attributes, handle); // Return the aggregator to the pool. aggregatorHandlePool.offer(handle); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index aa6fe1d3999..2d301eb6d88 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -12,6 +12,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.util.concurrent.AtomicDouble; +import com.google.common.util.concurrent.Uninterruptibles; import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -21,9 +23,11 @@ import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; @@ -37,8 +41,17 @@ import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiConsumer; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.event.Level; @SuppressLogger(DefaultSynchronousMetricStorage.class) @@ -370,4 +383,79 @@ void recordAndCollect_DeltaAtLimit() { assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } + + @ParameterizedTest + @MethodSource("concurrentStressTestArguments") + void recordAndCollect_concurrentStressTest( + DefaultSynchronousMetricStorage storage, BiConsumer collect) { + // Define record threads. Each records a value of 1.0, 2000 times + List threads = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(4); + for (int i = 0; i < 4; i++) { + Thread thread = + new Thread( + () -> { + for (int j = 0; j < 2000; j++) { + storage.recordDouble(1.0, Attributes.empty(), Context.current()); + Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); + } + latch.countDown(); + }); + threads.add(thread); + } + + // Define collect thread. Collect thread collects and aggregates the + AtomicDouble cumulativeSum = new AtomicDouble(); + Thread collectThread = + new Thread( + () -> { + do { + Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); + MetricData metricData = + storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1); + if (metricData.isEmpty()) { + continue; + } + metricData.getDoubleSumData().getPoints().stream() + .findFirst() + .ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum)); + } while (latch.getCount() != 0); + }); + + // Start all the threads + collectThread.start(); + threads.forEach(Thread::start); + + // Wait for the collect thread to end, which collects until the record threads are done + Uninterruptibles.joinUninterruptibly(collectThread); + + assertThat(cumulativeSum.get()).isEqualTo(8000.0); + } + + private static Stream concurrentStressTestArguments() { + Aggregator aggregator = + ((AggregatorFactory) Aggregation.sum()) + .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()); + return Stream.of( + Arguments.of( + // Delta + new DefaultSynchronousMetricStorage<>( + RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()), + METRIC_DESCRIPTOR, + aggregator, + AttributesProcessor.noop(), + CARDINALITY_LIMIT), + (BiConsumer) + (value, cumulativeCount) -> cumulativeCount.addAndGet(value)), + Arguments.of( + // Cumulative + new DefaultSynchronousMetricStorage<>( + RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()), + METRIC_DESCRIPTOR, + aggregator, + AttributesProcessor.noop(), + CARDINALITY_LIMIT), + (BiConsumer) + (value, cumulativeCount) -> cumulativeCount.set(value))); + } }