Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prometheus] Fix issue with corrupted buffers when reading both OpenMetrics and plain text formats #5623

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Fixed an issue with corrupted buffers when reading both OpenMetrics and
plain text formats from Prometheus exporters.
([#5623](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5623))

## 1.8.0-rc.1

Released 2024-Mar-27
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public async Task InvokeAsync(HttpContext httpContext)

try
{
if (collectionResponse.View.Count > 0)
var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;

if (dataView.Count > 0)
{
response.StatusCode = 200;
#if NET8_0_OR_GREATER
Expand All @@ -69,7 +71,7 @@ public async Task InvokeAsync(HttpContext httpContext)
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";

await response.Body.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await response.Body.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Fixed an issue with corrupted buffers when reading both OpenMetrics and
plain text formats from Prometheus exporters.
([#5623](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5623))

## 1.8.0-rc.1

Released 2024-Mar-27
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ internal sealed class PrometheusCollectionManager
private readonly Dictionary<Metric, PrometheusMetric> metricsCache;
private readonly HashSet<string> scopes;
private int metricsCacheCount;
private byte[] buffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] plainTextBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] openMetricsBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private int targetInfoBufferLength = -1; // zero or positive when target_info has been written for the first time
private ArraySegment<byte> previousPlainTextDataView;
private ArraySegment<byte> previousOpenMetricsDataView;
private int globalLockState;
private ArraySegment<byte> previousDataView;
private DateTime? previousDataViewGeneratedAtUtc;
private DateTime? previousPlainTextDataViewGeneratedAtUtc;
private DateTime? previousOpenMetricsDataViewGeneratedAtUtc;
private int readerCount;
private bool collectionRunning;
private TaskCompletionSource<CollectionResponse> collectionTcs;
Expand All @@ -44,16 +47,20 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)

// If we are within {ScrapeResponseCacheDurationMilliseconds} of the
// last successful collect, return the previous view.
if (this.previousDataViewGeneratedAtUtc.HasValue
var previousDataViewGeneratedAtUtc = openMetricsRequested
? this.previousOpenMetricsDataViewGeneratedAtUtc
: this.previousPlainTextDataViewGeneratedAtUtc;

if (previousDataViewGeneratedAtUtc.HasValue
&& this.scrapeResponseCacheDurationMilliseconds > 0
&& this.previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
&& previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
{
Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();
#if NET6_0_OR_GREATER
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
#else
return Task.FromResult(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return Task.FromResult(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
#endif
}

Expand All @@ -78,16 +85,37 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)

// Start a collection on the current thread.
this.collectionRunning = true;
this.previousDataViewGeneratedAtUtc = null;

if (openMetricsRequested)
{
this.previousOpenMetricsDataViewGeneratedAtUtc = null;
}
else
{
this.previousPlainTextDataViewGeneratedAtUtc = null;
}

Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();

CollectionResponse response;
var result = this.ExecuteCollect(openMetricsRequested);
if (result)
{
this.previousDataViewGeneratedAtUtc = DateTime.UtcNow;
response = new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: false);
if (openMetricsRequested)
{
this.previousOpenMetricsDataViewGeneratedAtUtc = DateTime.UtcNow;
}
else
{
this.previousPlainTextDataViewGeneratedAtUtc = DateTime.UtcNow;
}

previousDataViewGeneratedAtUtc = openMetricsRequested
? this.previousOpenMetricsDataViewGeneratedAtUtc
: this.previousPlainTextDataViewGeneratedAtUtc;

response = new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: false);
}
else
{
Expand Down Expand Up @@ -170,6 +198,7 @@ private bool ExecuteCollect(bool openMetricsRequested)
private ExportResult OnCollect(Batch<Metric> metrics)
{
var cursor = 0;
var buffer = this.exporter.OpenMetricsRequested ? this.openMetricsBuffer : this.plainTextBuffer;

try
{
Expand All @@ -192,13 +221,13 @@ private ExportResult OnCollect(Batch<Metric> metrics)
{
try
{
cursor = PrometheusSerializer.WriteScopeInfo(this.buffer, cursor, metric.MeterName);
cursor = PrometheusSerializer.WriteScopeInfo(buffer, cursor, metric.MeterName);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
// there are two cases we might run into the following condition:
// 1. we have many metrics to be exported - in this case we probably want
Expand Down Expand Up @@ -226,7 +255,7 @@ private ExportResult OnCollect(Batch<Metric> metrics)
try
{
cursor = PrometheusSerializer.WriteMetric(
this.buffer,
buffer,
cursor,
metric,
this.GetPrometheusMetric(metric),
Expand All @@ -236,7 +265,7 @@ private ExportResult OnCollect(Batch<Metric> metrics)
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
throw;
}
Expand All @@ -248,24 +277,40 @@ private ExportResult OnCollect(Batch<Metric> metrics)
{
try
{
cursor = PrometheusSerializer.WriteEof(this.buffer, cursor);
cursor = PrometheusSerializer.WriteEof(buffer, cursor);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
throw;
}
}
}

this.previousDataView = new ArraySegment<byte>(this.buffer, 0, cursor);
if (this.exporter.OpenMetricsRequested)
{
this.previousOpenMetricsDataView = new ArraySegment<byte>(this.openMetricsBuffer, 0, cursor);
}
else
{
this.previousPlainTextDataView = new ArraySegment<byte>(this.plainTextBuffer, 0, cursor);
}

return ExportResult.Success;
}
catch (Exception)
{
this.previousDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
if (this.exporter.OpenMetricsRequested)
{
this.previousOpenMetricsDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
}
else
{
this.previousPlainTextDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
}

return ExportResult.Failure;
}
}
Expand All @@ -278,13 +323,13 @@ private int WriteTargetInfo()
{
try
{
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.buffer, 0, this.exporter.Resource);
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.openMetricsBuffer, 0, this.exporter.Resource);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
throw;
}
Expand All @@ -295,18 +340,18 @@ private int WriteTargetInfo()
return this.targetInfoBufferLength;
}

private bool IncreaseBufferSize()
private bool IncreaseBufferSize(ref byte[] buffer)
{
var newBufferSize = this.buffer.Length * 2;
var newBufferSize = buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;
buffer.CopyTo(newBuffer, 0);
buffer = newBuffer;

return true;
}
Expand All @@ -331,14 +376,17 @@ private PrometheusMetric GetPrometheusMetric(Metric metric)

public readonly struct CollectionResponse
{
public CollectionResponse(ArraySegment<byte> view, DateTime generatedAtUtc, bool fromCache)
public CollectionResponse(ArraySegment<byte> openMetricsView, ArraySegment<byte> plainTextView, DateTime generatedAtUtc, bool fromCache)
{
this.View = view;
this.OpenMetricsView = openMetricsView;
this.PlainTextView = plainTextView;
this.GeneratedAtUtc = generatedAtUtc;
this.FromCache = fromCache;
}

public ArraySegment<byte> View { get; }
public ArraySegment<byte> OpenMetricsView { get; }

public ArraySegment<byte> PlainTextView { get; }

public DateTime GeneratedAtUtc { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,18 @@ private async Task ProcessRequestAsync(HttpListenerContext context)
try
{
context.Response.Headers.Add("Server", string.Empty);
if (collectionResponse.View.Count > 0)

var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;

if (dataView.Count > 0)
{
context.Response.StatusCode = 200;
context.Response.Headers.Add("Last-Modified", collectionResponse.GeneratedAtUtc.ToString("R"));
context.Response.ContentType = openMetricsRequested
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";

await context.Response.OutputStream.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await context.Response.OutputStream.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{
Expand Down
Loading