Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prometheus] Fixed a race condition happening in simultaneous OpenMetrics and PlainText requests #5517

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c1af823
Reset cached target info cursor when OpenMetricsRequested is false
hez2010 Apr 8, 2024
1c843d0
Add tests and fix race condition
hez2010 Apr 9, 2024
224d1bd
Add CHANGELOG
hez2010 Apr 9, 2024
677b121
Oops
hez2010 Apr 9, 2024
90c348c
Merge branch 'main' into fix/prometheus_cursor
vishweshbankwar Apr 9, 2024
6e5549f
Apply code suggestion
hez2010 Apr 9, 2024
286d23d
Fix the race condition and add parallel tests
hez2010 Apr 11, 2024
7df9d9c
Update CHANGELOG.md
hez2010 Apr 11, 2024
c2e10df
Merge branch 'main' into fix/prometheus_cursor
hez2010 Apr 11, 2024
0e2551a
Unsubscribe from OnCollect in dtor
hez2010 Apr 11, 2024
00149ef
more fixes
hez2010 Apr 11, 2024
3558e9f
Return two views to simplify the implementation
hez2010 Apr 11, 2024
cb95766
Revert changes to PrometheusExporter
hez2010 Apr 11, 2024
19ae570
Reuse HttpClient in tests
hez2010 Apr 11, 2024
63f32d6
Fix incorrect CompareExchange
hez2010 Apr 11, 2024
7ee3712
Make test faster and add timeout
hez2010 Apr 11, 2024
580257e
Fix linefeeds and add volatile
hez2010 Apr 11, 2024
fb54fc8
Fix markdown
hez2010 Apr 11, 2024
7b3b31e
Merge branch 'main' into fix/prometheus_cursor
CodeBlanch Apr 12, 2024
033148c
Merge branch 'main' into fix/prometheus_cursor
vishweshbankwar Apr 17, 2024
aaf7e43
Adding TODO and CHANGELOG
hez2010 Apr 18, 2024
017dd39
Merge branch 'main' into fix/prometheus_cursor
CodeBlanch Apr 18, 2024
2804a7d
fix lf
hez2010 Apr 19, 2024
fd01eb2
Merge branch 'fix/prometheus_cursor' of github.com:hez2010/openteleme…
hez2010 Apr 19, 2024
665e390
Merge branch 'main' into fix/prometheus_cursor
reyang Apr 19, 2024
45dadb2
Rework the metrics collection
hez2010 Apr 19, 2024
3ee6067
Use CompareExchange
hez2010 Apr 19, 2024
d930f97
Fix invalid assert
hez2010 Apr 20, 2024
0269b93
Fix fromCache
hez2010 Apr 20, 2024
3e003f8
Add an assert
hez2010 Apr 20, 2024
aed502c
typo
hez2010 Apr 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

* Fixed a race condition happening in simultaneous OpenMetrics and PlainText requests which could cause malformed response

Check failure on line 5 in src/OpenTelemetry.Exporter.Prometheus.AspNetCore/CHANGELOG.md

View workflow job for this annotation

GitHub Actions / lint-md / run-markdownlint

Line length

src/OpenTelemetry.Exporter.Prometheus.AspNetCore/CHANGELOG.md:5:81 MD013/line-length Line length [Expected: 80; Actual: 122] https://github.com/DavidAnson/markdownlint/blob/v0.34.0/doc/md013.md
([#5517](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5517))

## 1.8.0-rc.1

Released 2024-Mar-27
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ public async Task InvokeAsync(HttpContext httpContext)
try
{
var openMetricsRequested = AcceptsOpenMetrics(httpContext.Request);
var collectionResponse = await this.exporter.CollectionManager.EnterCollect(openMetricsRequested).ConfigureAwait(false);
var collectionResponse = await this.exporter.CollectionManager.EnterCollect().ConfigureAwait(false);

try
{
if (collectionResponse.View.Count > 0)
var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;

if (dataView.Count > 0)
{
response.StatusCode = 200;
#if NET8_0_OR_GREATER
Expand All @@ -69,7 +71,7 @@ public async Task InvokeAsync(HttpContext httpContext)
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";

await response.Body.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await response.Body.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{
Expand All @@ -91,8 +93,6 @@ public async Task InvokeAsync(HttpContext httpContext)
response.StatusCode = 500;
}
}

this.exporter.OnExport = null;
}

private static bool AcceptsOpenMetrics(HttpRequest request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ internal sealed class PrometheusCollectionManager
private readonly Dictionary<Metric, PrometheusMetric> metricsCache;
private readonly HashSet<string> scopes;
private int metricsCacheCount;
private byte[] buffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] plainTextBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
hez2010 marked this conversation as resolved.
Show resolved Hide resolved
private byte[] openMetricsBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private int targetInfoBufferLength = -1; // zero or positive when target_info has been written for the first time
private int globalLockState;
private ArraySegment<byte> previousDataView;
private ArraySegment<byte> previousPlainTextDataView;
private ArraySegment<byte> previousOpenMetricsDataView;
private DateTime? previousDataViewGeneratedAtUtc;
private int readerCount;
private bool collectionRunning;
Expand All @@ -35,9 +37,9 @@ public PrometheusCollectionManager(PrometheusExporter exporter)
}

#if NET6_0_OR_GREATER
public ValueTask<CollectionResponse> EnterCollect(bool openMetricsRequested)
public ValueTask<CollectionResponse> EnterCollect()
#else
public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)
public Task<CollectionResponse> EnterCollect()
#endif
{
this.EnterGlobalLock();
Expand All @@ -51,9 +53,9 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)
Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();
#if NET6_0_OR_GREATER
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
#else
return Task.FromResult(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return Task.FromResult(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
#endif
}

Expand All @@ -65,12 +67,13 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)
this.collectionTcs = new TaskCompletionSource<CollectionResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
}

var task = this.collectionTcs.Task;
Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();
#if NET6_0_OR_GREATER
return new ValueTask<CollectionResponse>(this.collectionTcs.Task);
return new ValueTask<CollectionResponse>(task);
#else
return this.collectionTcs.Task;
return task;
#endif
}

Expand All @@ -83,11 +86,11 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)
this.ExitGlobalLock();

CollectionResponse response;
var result = this.ExecuteCollect(openMetricsRequested);
var result = this.ExecuteCollect();
if (result)
{
this.previousDataViewGeneratedAtUtc = DateTime.UtcNow;
response = new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: false);
response = new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: false);
}
else
{
Expand Down Expand Up @@ -147,7 +150,7 @@ private void WaitForReadersToComplete()
SpinWait readWait = default;
while (true)
{
if (Interlocked.CompareExchange(ref this.readerCount, 0, this.readerCount) != 0)
if (Interlocked.CompareExchange(ref this.readerCount, 0, 0) != 0)
{
readWait.SpinOnce();
continue;
Expand All @@ -158,56 +161,51 @@ private void WaitForReadersToComplete()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool ExecuteCollect(bool openMetricsRequested)
private bool ExecuteCollect()
{
this.exporter.OnExport = this.onCollectRef;
this.exporter.OpenMetricsRequested = openMetricsRequested;
var result = this.exporter.Collect(Timeout.Infinite);
this.exporter.OnExport = null;
return result;
}

private ExportResult OnCollect(Batch<Metric> metrics)
{
var cursor = 0;

try
{
if (this.exporter.OpenMetricsRequested)
{
cursor = this.WriteTargetInfo();
var openMetricsCursor = this.WriteTargetInfo();
var plainTextCursor = 0;

this.scopes.Clear();
this.scopes.Clear();

foreach (var metric in metrics)
foreach (var metric in metrics)
{
if (!PrometheusSerializer.CanWriteMetric(metric))
{
if (!PrometheusSerializer.CanWriteMetric(metric))
{
continue;
}
continue;
}

if (this.scopes.Add(metric.MeterName))
if (this.scopes.Add(metric.MeterName))
{
while (true)
{
while (true)
try
{
try
{
cursor = PrometheusSerializer.WriteScopeInfo(this.buffer, cursor, metric.MeterName);
openMetricsCursor = PrometheusSerializer.WriteScopeInfo(this.openMetricsBuffer, openMetricsCursor, metric.MeterName);

break;
}
catch (IndexOutOfRangeException)
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
if (!this.IncreaseBufferSize())
{
// there are two cases we might run into the following condition:
// 1. we have many metrics to be exported - in this case we probably want
// to put some upper limit and allow the user to configure it.
// 2. we got an IndexOutOfRangeException which was triggered by some other
// code instead of the buffer[cursor++] - in this case we should give up
// at certain point rather than allocating like crazy.
throw;
}
// there are two cases we might run into the following condition:
// 1. we have many metrics to be exported - in this case we probably want
// to put some upper limit and allow the user to configure it.
// 2. we got an IndexOutOfRangeException which was triggered by some other
// code instead of the buffer[cursor++] - in this case we should give up
// at certain point rather than allocating like crazy.
throw;
}
}
}
Expand All @@ -221,22 +219,46 @@ private ExportResult OnCollect(Batch<Metric> metrics)
continue;
}

var prometheusMetric = this.GetPrometheusMetric(metric);

while (true)
{
try
{
openMetricsCursor = PrometheusSerializer.WriteMetric(
this.openMetricsBuffer,
openMetricsCursor,
metric,
prometheusMetric,
true);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
throw;
}
}
}

while (true)
{
try
{
cursor = PrometheusSerializer.WriteMetric(
this.buffer,
cursor,
plainTextCursor = PrometheusSerializer.WriteMetric(
this.plainTextBuffer,
plainTextCursor,
metric,
this.GetPrometheusMetric(metric),
this.exporter.OpenMetricsRequested);
prometheusMetric,
false);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref this.plainTextBuffer))
{
throw;
}
Expand All @@ -248,24 +270,41 @@ private ExportResult OnCollect(Batch<Metric> metrics)
{
try
{
cursor = PrometheusSerializer.WriteEof(this.buffer, cursor);
openMetricsCursor = PrometheusSerializer.WriteEof(this.openMetricsBuffer, openMetricsCursor);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
throw;
}
}
}

this.previousDataView = new ArraySegment<byte>(this.buffer, 0, cursor);
while (true)
{
try
{
plainTextCursor = PrometheusSerializer.WriteEof(this.plainTextBuffer, plainTextCursor);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize(ref this.plainTextBuffer))
{
throw;
}
}
}

this.previousOpenMetricsDataView = new ArraySegment<byte>(this.openMetricsBuffer, 0, openMetricsCursor);
this.previousPlainTextDataView = new ArraySegment<byte>(this.plainTextBuffer, 0, plainTextCursor);
return ExportResult.Success;
}
catch (Exception)
{
this.previousDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
this.previousOpenMetricsDataView = this.previousPlainTextDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
return ExportResult.Failure;
}
}
Expand All @@ -278,13 +317,13 @@ private int WriteTargetInfo()
{
try
{
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.buffer, 0, this.exporter.Resource);
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.openMetricsBuffer, 0, this.exporter.Resource);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
throw;
}
Expand All @@ -295,18 +334,18 @@ private int WriteTargetInfo()
return this.targetInfoBufferLength;
}

private bool IncreaseBufferSize()
private bool IncreaseBufferSize(ref byte[] buffer)
{
var newBufferSize = this.buffer.Length * 2;
var newBufferSize = buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;
buffer.CopyTo(newBuffer, 0);
buffer = newBuffer;

return true;
}
Expand All @@ -331,14 +370,17 @@ private PrometheusMetric GetPrometheusMetric(Metric metric)

public readonly struct CollectionResponse
{
public CollectionResponse(ArraySegment<byte> view, DateTime generatedAtUtc, bool fromCache)
public CollectionResponse(ArraySegment<byte> openMetricsView, ArraySegment<byte> plainTextView, DateTime generatedAtUtc, bool fromCache)
{
this.View = view;
this.OpenMetricsView = openMetricsView;
this.PlainTextView = plainTextView;
this.GeneratedAtUtc = generatedAtUtc;
this.FromCache = fromCache;
}

public ArraySegment<byte> View { get; }
public ArraySegment<byte> OpenMetricsView { get; }

public ArraySegment<byte> PlainTextView { get; }

public DateTime GeneratedAtUtc { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,21 @@ private async Task ProcessRequestAsync(HttpListenerContext context)
try
{
var openMetricsRequested = AcceptsOpenMetrics(context.Request);
var collectionResponse = await this.exporter.CollectionManager.EnterCollect(openMetricsRequested).ConfigureAwait(false);
var collectionResponse = await this.exporter.CollectionManager.EnterCollect().ConfigureAwait(false);

try
{
context.Response.Headers.Add("Server", string.Empty);
if (collectionResponse.View.Count > 0)
var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;
if (dataView.Count > 0)
{
context.Response.StatusCode = 200;
context.Response.Headers.Add("Last-Modified", collectionResponse.GeneratedAtUtc.ToString("R"));
context.Response.ContentType = openMetricsRequested
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";

await context.Response.OutputStream.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await context.Response.OutputStream.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{
Expand Down
Loading
Loading