Skip to content

Commit

Permalink
[sdk-metrics] Refactor and improve interlocking code for doubles in M…
Browse files Browse the repository at this point in the history
…etricPoint (#5378)
  • Loading branch information
CodeBlanch authored Feb 22, 2024
1 parent 5bcc805 commit 09dd46f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 39 deletions.
46 changes: 46 additions & 0 deletions src/OpenTelemetry/Internal/InterlockedHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Runtime.CompilerServices;

namespace OpenTelemetry.Internal;

internal static class InterlockedHelper
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Add(ref double location, double value)
{
// Note: Not calling InterlockedHelper.Read here on purpose because it
// is too expensive for fast/happy-path. If the first attempt fails
// we'll end up in an Interlocked.CompareExchange loop anyway.
double currentValue = Volatile.Read(ref location);

var returnedValue = Interlocked.CompareExchange(ref location, currentValue + value, currentValue);
if (returnedValue != currentValue)
{
AddRare(ref location, value, returnedValue);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static double Read(ref double location)
=> Interlocked.CompareExchange(ref location, double.NaN, double.NaN);

[MethodImpl(MethodImplOptions.NoInlining)]
private static void AddRare(ref double location, double value, double currentValue)
{
var sw = default(SpinWait);
while (true)
{
sw.SpinOnce();

var returnedValue = Interlocked.CompareExchange(ref location, currentValue + value, currentValue);
if (returnedValue == currentValue)
{
break;
}

currentValue = returnedValue;
}
}
}
46 changes: 7 additions & 39 deletions src/OpenTelemetry/Metrics/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics;

Expand Down Expand Up @@ -578,25 +579,7 @@ internal void Update(double number)
{
case AggregationType.DoubleSumIncomingDelta:
{
double initValue, newValue;
var sw = default(SpinWait);
while (true)
{
initValue = this.runningValue.AsDouble;

unchecked
{
newValue = initValue + number;
}

if (initValue == Interlocked.CompareExchange(ref this.runningValue.AsDouble, newValue, initValue))
{
break;
}

sw.SpinOnce();
}

InterlockedHelper.Add(ref this.runningValue.AsDouble, number);
break;
}

Expand Down Expand Up @@ -833,31 +816,21 @@ internal void TakeSnapshot(bool outputDelta)
{
if (outputDelta)
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
double initValue = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
double initValue = InterlockedHelper.Read(ref this.runningValue.AsDouble);
this.snapshotValue.AsDouble = initValue - this.deltaLastValue.AsDouble;
this.deltaLastValue.AsDouble = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity))
if (initValue != InterlockedHelper.Read(ref this.runningValue.AsDouble))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);
}

break;
Expand All @@ -880,17 +853,12 @@ internal void TakeSnapshot(bool outputDelta)

case AggregationType.DoubleGauge:
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.snapshotValue.AsDouble != Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity))
if (this.snapshotValue.AsDouble != InterlockedHelper.Read(ref this.runningValue.AsDouble))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
Expand Down

0 comments on commit 09dd46f

Please sign in to comment.