Skip to content

Commit

Permalink
Disallow aggregations for incompatible instruments (open-telemetry#4338)
Browse files Browse the repository at this point in the history
* Disallow aggregations for incompatible instruments

* Improve test coverage

* Ignore View if aggregation is incompatible, using default view if no others are configured
  • Loading branch information
jack-berg authored Apr 7, 2022
1 parent 7a8cd86 commit 6692683
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ public interface AggregatorFactory {
*/
<T> Aggregator<T> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter);

/**
* Determine if the {@link Aggregator} produced by {@link #createAggregator(InstrumentDescriptor,
* ExemplarFilter)} is compatible with the {@code instrumentDescriptor}.
*/
boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public <T> Aggregator<T> createAggregator(
.createAggregator(instrumentDescriptor, exemplarFilter);
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
// This should always return true
return ((AggregatorFactory) resolve(instrumentDescriptor))
.isCompatibleWithInstrument(instrumentDescriptor);
}

@Override
public String toString() {
return "DefaultAggregation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public <T> Aggregator<T> createAggregator(
return (Aggregator<T>) Aggregator.drop();
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
return true;
}

@Override
public String toString() {
return "DropAggregation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public <T> Aggregator<T> createAggregator(
Clock.getDefault(), bucketBoundaries)));
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
switch (instrumentDescriptor.getType()) {
case COUNTER:
case HISTOGRAM:
return true;
default:
return false;
}
}

@Override
public String toString() {
return "ExplicitBucketHistogramAggregation(" + bucketBoundaries.toString() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.internal.view;

import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleLastValueAggregator;
Expand Down Expand Up @@ -45,6 +46,11 @@ public <T> Aggregator<T> createAggregator(
throw new IllegalArgumentException("Invalid instrument value type");
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
return instrumentDescriptor.getType() == InstrumentType.OBSERVABLE_GAUGE;
}

@Override
public String toString() {
return "LastValueAggregation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ public <T> Aggregator<T> createAggregator(
throw new IllegalArgumentException("Invalid instrument value type");
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
switch (instrumentDescriptor.getType()) {
case COUNTER:
case OBSERVABLE_COUNTER:
case UP_DOWN_COUNTER:
case OBSERVABLE_UP_DOWN_COUNTER:
case HISTOGRAM:
return true;
default:
return false;
}
}

@Override
public String toString() {
return "SumAggregation";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregationUtil;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.annotation.concurrent.Immutable;

Expand All @@ -34,6 +38,7 @@ public final class ViewRegistry {
DEFAULT_VIEW,
AttributesProcessor.NOOP,
SourceInfo.noSourceInfo());
private static final Logger logger = Logger.getLogger(ViewRegistry.class.getName());

private final List<RegisteredView> reverseRegistration;

Expand All @@ -58,9 +63,23 @@ public List<RegisteredView> findViews(
List<RegisteredView> result = new ArrayList<>();
for (RegisteredView entry : reverseRegistration) {
if (matchesSelector(entry.getInstrumentSelector(), descriptor, meterScope)) {
result.add(entry);
AggregatorFactory viewAggregatorFactory =
(AggregatorFactory) entry.getView().getAggregation();
if (viewAggregatorFactory.isCompatibleWithInstrument(descriptor)) {
result.add(entry);
} else {
logger.log(
Level.WARNING,
"View aggregation "
+ AggregationUtil.aggregationName(entry.getView().getAggregation())
+ " is incompatible with instrument "
+ descriptor.getName()
+ " of type "
+ descriptor.getType());
}
}
}

if (result.isEmpty()) {
return Collections.singletonList(DEFAULT_REGISTERED_VIEW);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import java.util.Arrays;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -34,4 +36,61 @@ void histogramUsesExplicitBucket() {
.asString()
.contains("ExplicitBucketHistogram");
}

@Test
void aggregationIsCompatible() {
InstrumentDescriptor counter = descriptorForType(InstrumentType.COUNTER);
InstrumentDescriptor observableCounter = descriptorForType(InstrumentType.OBSERVABLE_COUNTER);
InstrumentDescriptor upDownCounter = descriptorForType(InstrumentType.UP_DOWN_COUNTER);
InstrumentDescriptor observableUpDownCounter =
descriptorForType(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER);
InstrumentDescriptor observableGauge = descriptorForType(InstrumentType.OBSERVABLE_GAUGE);
InstrumentDescriptor histogram = descriptorForType(InstrumentType.HISTOGRAM);

AggregatorFactory defaultAggregation = ((AggregatorFactory) Aggregation.defaultAggregation());
assertThat(defaultAggregation.isCompatibleWithInstrument(counter)).isTrue();
assertThat(defaultAggregation.isCompatibleWithInstrument(observableCounter)).isTrue();
assertThat(defaultAggregation.isCompatibleWithInstrument(upDownCounter)).isTrue();
assertThat(defaultAggregation.isCompatibleWithInstrument(observableUpDownCounter)).isTrue();
assertThat(defaultAggregation.isCompatibleWithInstrument(observableGauge)).isTrue();
assertThat(defaultAggregation.isCompatibleWithInstrument(histogram)).isTrue();

AggregatorFactory drop = ((AggregatorFactory) Aggregation.drop());
assertThat(drop.isCompatibleWithInstrument(counter)).isTrue();
assertThat(drop.isCompatibleWithInstrument(observableCounter)).isTrue();
assertThat(drop.isCompatibleWithInstrument(upDownCounter)).isTrue();
assertThat(drop.isCompatibleWithInstrument(observableUpDownCounter)).isTrue();
assertThat(drop.isCompatibleWithInstrument(observableGauge)).isTrue();
assertThat(drop.isCompatibleWithInstrument(histogram)).isTrue();

AggregatorFactory sum = ((AggregatorFactory) Aggregation.sum());
assertThat(sum.isCompatibleWithInstrument(counter)).isTrue();
assertThat(sum.isCompatibleWithInstrument(observableCounter)).isTrue();
assertThat(sum.isCompatibleWithInstrument(upDownCounter)).isTrue();
assertThat(sum.isCompatibleWithInstrument(observableUpDownCounter)).isTrue();
assertThat(sum.isCompatibleWithInstrument(observableGauge)).isFalse();
assertThat(sum.isCompatibleWithInstrument(histogram)).isTrue();

AggregatorFactory explicitHistogram =
((AggregatorFactory) Aggregation.explicitBucketHistogram());
assertThat(explicitHistogram.isCompatibleWithInstrument(counter)).isTrue();
assertThat(explicitHistogram.isCompatibleWithInstrument(observableCounter)).isFalse();
assertThat(explicitHistogram.isCompatibleWithInstrument(upDownCounter)).isFalse();
assertThat(explicitHistogram.isCompatibleWithInstrument(observableUpDownCounter)).isFalse();
assertThat(explicitHistogram.isCompatibleWithInstrument(observableGauge)).isFalse();
assertThat(explicitHistogram.isCompatibleWithInstrument(histogram)).isTrue();

AggregatorFactory lastValue = ((AggregatorFactory) Aggregation.lastValue());
assertThat(lastValue.isCompatibleWithInstrument(counter)).isFalse();
assertThat(lastValue.isCompatibleWithInstrument(observableCounter)).isFalse();
assertThat(lastValue.isCompatibleWithInstrument(upDownCounter)).isFalse();
assertThat(lastValue.isCompatibleWithInstrument(observableUpDownCounter)).isFalse();
assertThat(lastValue.isCompatibleWithInstrument(observableGauge)).isTrue();
assertThat(lastValue.isCompatibleWithInstrument(histogram)).isFalse();
}

private static InstrumentDescriptor descriptorForType(InstrumentType instrumentType) {
return InstrumentDescriptor.create(
"name", "description", "unit", instrumentType, InstrumentValueType.DOUBLE);
}
}
Loading

0 comments on commit 6692683

Please sign in to comment.