Skip to content

Commit

Permalink
[sdk-metrics] Remove some reflection used by tests (#5338)
Browse files Browse the repository at this point in the history
Co-authored-by: Utkarsh Umesan Pillai <[email protected]>
  • Loading branch information
CodeBlanch and utpilla authored Feb 9, 2024
1 parent 8325786 commit f1a1835
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
67 changes: 33 additions & 34 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal sealed class AggregatorStore
internal readonly bool OutputDeltaWithUnusedMetricPointReclaimEnabled;
internal readonly int CardinalityLimit;
internal readonly bool EmitOverflowAttribute;
internal readonly ConcurrentDictionary<Tags, LookupData>? TagsToMetricPointIndexDictionaryDelta;
internal long DroppedMeasurements = 0;

private static readonly string MetricPointCapHitFixMessage = "Consider opting in for the experimental SDK feature to emit all the throttled metrics under the overflow attribute by setting env variable OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE = true. You could also modify instrumentation to reduce the number of unique key/value pair combinations. Or use Views to drop unwanted tags. Or use MeterProviderBuilder.SetMaxMetricPointsPerMetricStream to set higher limit.";
Expand All @@ -32,8 +33,6 @@ internal sealed class AggregatorStore
private readonly ConcurrentDictionary<Tags, int> tagsToMetricPointIndexDictionary =
new();

private readonly ConcurrentDictionary<Tags, LookupData>? tagsToMetricPointIndexDictionaryDelta;

private readonly string name;
private readonly string metricPointCapHitMessage;
private readonly MetricPoint[] metricPoints;
Expand Down Expand Up @@ -110,7 +109,7 @@ internal AggregatorStore(
// There is no overload which only takes capacity as the parameter
// Using the DefaultConcurrencyLevel defined in the ConcurrentDictionary class: https://github.com/dotnet/runtime/blob/v7.0.5/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs#L2020
// We expect at the most (maxMetricPoints - reservedMetricPointsCount) * 2 entries- one for sorted and one for unsorted input
this.tagsToMetricPointIndexDictionaryDelta =
this.TagsToMetricPointIndexDictionaryDelta =
new ConcurrentDictionary<Tags, LookupData>(concurrencyLevel: Environment.ProcessorCount, capacity: (cardinalityLimit - reservedMetricPointsCount) * 2);

// Add all the indices except for the reserved ones to the queue so that threads have
Expand Down Expand Up @@ -266,28 +265,28 @@ internal void SnapshotDeltaWithMetricPointReclaim()
// Snapshot method can use this to skip trying to reclaim indices which have already been reclaimed and added to the queue.
metricPoint.LookupData = null;

Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");

lock (this.tagsToMetricPointIndexDictionaryDelta!)
lock (this.TagsToMetricPointIndexDictionaryDelta!)
{
LookupData? dictionaryValue;
if (lookupData.SortedTags != Tags.EmptyTags)
{
// Check if no other thread added a new entry for the same Tags.
// If no, then remove the existing entries.
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue) &&
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue) &&
dictionaryValue == lookupData)
{
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out var _);
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out var _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
}
}
else
{
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue) &&
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue) &&
dictionaryValue == lookupData)
{
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
}
}

Expand Down Expand Up @@ -550,11 +549,11 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object
int index;
var givenTags = new Tags(tagKeysAndValues);

Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");

bool newMetricPointCreated = false;

if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out var lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out var lookupData))
{
if (length > 1)
{
Expand All @@ -567,7 +566,7 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object

var sortedTags = new Tags(tempSortedTagKeysAndValues);

if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
{
// Note: We are using storage from ThreadStatic (for up to MaxTagCacheSize tags) for both the input order of tags and the sorted order of tags,
// so we need to make a deep copy for Dictionary storage.
Expand All @@ -585,10 +584,10 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object

Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null");

lock (this.tagsToMetricPointIndexDictionaryDelta)
lock (this.TagsToMetricPointIndexDictionaryDelta)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -612,8 +611,8 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object
// MetricPoint, if dictionary entry found.

// Add the sorted order along with the given order of tags
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}
}
Expand All @@ -631,10 +630,10 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object

Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null");

lock (this.tagsToMetricPointIndexDictionaryDelta)
lock (this.TagsToMetricPointIndexDictionaryDelta)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(givenTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(givenTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -658,7 +657,7 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object
// MetricPoint, if dictionary entry found.

// givenTags will always be sorted when tags length == 1
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}
}
Expand Down Expand Up @@ -735,7 +734,7 @@ private bool TryGetAvailableMetricPointRare(
out LookupData? lookupData,
out bool newMetricPointCreated)
{
Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null");

int index;
Expand All @@ -744,8 +743,8 @@ private bool TryGetAvailableMetricPointRare(
if (length > 1)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData) &&
!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData) &&
!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -769,14 +768,14 @@ private bool TryGetAvailableMetricPointRare(
// MetricPoint, if dictionary entry found.

// Add the sorted order along with the given order of tags
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}
else
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -800,7 +799,7 @@ private bool TryGetAvailableMetricPointRare(
// MetricPoint, if dictionary entry found.

// givenTags will always be sorted when tags length == 1
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}

Expand All @@ -823,23 +822,23 @@ private int RemoveStaleEntriesAndGetAvailableMetricPointRare(LookupData lookupDa
// If self-claimed, then add a fresh entry to the dictionary
// If an available MetricPoint is found, then only increment the ReferenceCount

Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");

// Delete the entry for these Tags and get another MetricPoint.
lock (this.tagsToMetricPointIndexDictionaryDelta!)
lock (this.TagsToMetricPointIndexDictionaryDelta!)
{
LookupData? dictionaryValue;
if (lookupData.SortedTags != Tags.EmptyTags)
{
// Check if no other thread added a new entry for the same Tags in the meantime.
// If no, then remove the existing entries.
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue))
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue))
{
if (dictionaryValue == lookupData)
{
// No other thread added a new entry for the same Tags.
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out _);
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
}
else
{
Expand All @@ -851,12 +850,12 @@ private int RemoveStaleEntriesAndGetAvailableMetricPointRare(LookupData lookupDa
}
else
{
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue))
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue))
{
if (dictionaryValue == lookupData)
{
// No other thread added a new entry for the same Tags.
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
}
else
{
Expand Down
10 changes: 1 addition & 9 deletions test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTestsBase.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using System.Diagnostics.Metrics;
using System.Reflection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Tests;
Expand Down Expand Up @@ -286,23 +284,17 @@ private sealed class CustomExporter : BaseExporter<Metric>

private readonly bool assertNoDroppedMeasurements;

private readonly FieldInfo metricPointLookupDictionaryFieldInfo;

public CustomExporter(bool assertNoDroppedMeasurements)
{
this.assertNoDroppedMeasurements = assertNoDroppedMeasurements;

var aggregatorStoreFields = typeof(AggregatorStore).GetFields(BindingFlags.NonPublic | BindingFlags.Instance);
this.metricPointLookupDictionaryFieldInfo = aggregatorStoreFields!.FirstOrDefault(field => field.Name == "tagsToMetricPointIndexDictionaryDelta");
}

public override ExportResult Export(in Batch<Metric> batch)
{
foreach (var metric in batch)
{
var aggStore = metric.AggregatorStore;
var metricPointLookupDictionary = this.metricPointLookupDictionaryFieldInfo.GetValue(aggStore) as ConcurrentDictionary<Tags, LookupData>;

var metricPointLookupDictionary = aggStore.TagsToMetricPointIndexDictionaryDelta;
var droppedMeasurements = aggStore.DroppedMeasurements;

if (this.assertNoDroppedMeasurements)
Expand Down

0 comments on commit f1a1835

Please sign in to comment.