Skip to content

Commit

Permalink
[otlp] Add Log and Mertic Exporter to transmit custom serialized data (
Browse files Browse the repository at this point in the history
…#5977)

Co-authored-by: Mikel Blanchard <[email protected]>
  • Loading branch information
rajkumar-rangaraj and CodeBlanch authored Nov 15, 2024
1 parent 1e7397e commit 2ae01a7
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer

internal static class ProtobufOtlpLogFieldNumberConstants
{
// Resource Logs
#pragma warning disable SA1310 // Field names should not contain underscore

// Logs data
internal const int LogsData_Resource_Logs = 1;

// Resource Logs
internal const int ResourceLogs_Resource = 1;
internal const int ResourceLogs_Scope_Logs = 2;
internal const int ResourceLogs_Schema_Url = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ internal static class ProtobufOtlpLogSerializer

internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch<LogRecord> logRecordBatch)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpLogFieldNumberConstants.LogsData_Resource_Logs, ProtobufWireType.LEN);
int logsDataLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

foreach (var logRecord in logRecordBatch)
{
var scopeName = logRecord.Logger.Name;
Expand All @@ -34,6 +38,7 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti
}

writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength));
ReturnLogRecordListToPool();

return writePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ internal static class ProtobufOtlpMetricSerializer

internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources.Resource? resource, in Batch<Metric> batch)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.MetricsData_Resource_Metrics, ProtobufWireType.LEN);
int mericsDataLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

foreach (var metric in batch)
{
var metricName = metric.MeterName;
Expand All @@ -32,6 +36,7 @@ internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources
}

writePosition = WriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength));
ReturnMetricListToPool();

return writePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,16 @@ internal static BaseProcessor<LogRecord> BuildOtlpLogExporter(
* "OtlpLogExporter");
*/

BaseExporter<LogRecord> otlpExporter = new OtlpLogExporter(
exporterOptions!,
sdkLimitOptions!,
experimentalOptions!);
BaseExporter<LogRecord> otlpExporter;

if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer)
{
otlpExporter = new ProtobufOtlpLogExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
}
else
{
otlpExporter = new OtlpLogExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
}

if (configureExporterInstance != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,16 @@ internal static MetricReader BuildOtlpExporterMetricReader(

exporterOptions!.TryEnableIHttpClientFactoryIntegration(serviceProvider!, "OtlpMetricExporter");

BaseExporter<Metric> metricExporter = new OtlpMetricExporter(exporterOptions!, experimentalOptions!);
BaseExporter<Metric> metricExporter;

if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer)
{
metricExporter = new ProtobufOtlpMetricExporter(exporterOptions!, experimentalOptions!);
}
else
{
metricExporter = new OtlpMetricExporter(exporterOptions!, experimentalOptions!);
}

if (configureExporterInstance != null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers.Binary;
using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Logs;
using OpenTelemetry.Resources;

namespace OpenTelemetry.Exporter;

/// <summary>
/// Exporter consuming <see cref="LogRecord"/> and exporting the data using
/// the OpenTelemetry protocol (OTLP).
/// </summary>
internal sealed class ProtobufOtlpLogExporter : BaseExporter<LogRecord>
{
private readonly SdkLimitOptions sdkLimitOptions;
private readonly ExperimentalOptions experimentalOptions;
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

private Resource? resource;

// Initial buffer size set to ~732KB.
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
// by the 7th doubling to maintain efficient allocation without frequent resizing.
private byte[] buffer = new byte[750000];

/// <summary>
/// Initializes a new instance of the <see cref="ProtobufOtlpLogExporter"/> class.
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public ProtobufOtlpLogExporter(OtlpExporterOptions options)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="ProtobufOtlpLogExporter"/> class.
/// </summary>
/// <param name="exporterOptions"><see cref="OtlpExporterOptions"/>.</param>
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal ProtobufOtlpLogExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");

this.experimentalOptions = experimentalOptions!;
this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();

/// <inheritdoc/>
public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();

try
{
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);

if (this.startWritePosition == 5)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
{
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}

return ExportResult.Success;
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers.Binary;
using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;

namespace OpenTelemetry.Exporter;

/// <summary>
/// Exporter consuming <see cref="Metric"/> and exporting the data using
/// the OpenTelemetry protocol (OTLP).
/// </summary>
internal sealed class ProtobufOtlpMetricExporter : BaseExporter<Metric>
{
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

private Resource? resource;

// Initial buffer size set to ~732KB.
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
// by the 7th doubling to maintain efficient allocation without frequent resizing.
private byte[] buffer = new byte[750000];

/// <summary>
/// Initializes a new instance of the <see cref="ProtobufOtlpMetricExporter"/> class.
/// </summary>
/// <param name="options">Configuration options for the exporter.</param>
public ProtobufOtlpMetricExporter(OtlpExporterOptions options)
: this(options, experimentalOptions: new(), transmissionHandler: null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="ProtobufOtlpMetricExporter"/> class.
/// </summary>
/// <param name="exporterOptions"><see cref="OtlpExporterOptions"/>.</param>
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal ProtobufOtlpMetricExporter(
OtlpExporterOptions exporterOptions,
ExperimentalOptions experimentalOptions,
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");

this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();

/// <inheritdoc />
public override ExportResult Export(in Batch<Metric> metrics)
{
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();

try
{
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics);

if (this.startWritePosition == 5)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
{
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}

return ExportResult.Success;
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1888,9 +1888,9 @@ private static OtlpCollector.ExportLogsServiceRequest CreateLogsExportRequest(Sd
var buffer = new byte[4096];
var writePosition = ProtobufOtlpLogSerializer.WriteLogsData(buffer, 0, sdkOptions, experimentalOptions, resource, batch);
using var stream = new MemoryStream(buffer, 0, writePosition);
var logsData = OtlpLogs.ResourceLogs.Parser.ParseFrom(stream);
var logsData = OtlpLogs.LogsData.Parser.ParseFrom(stream);
var request = new OtlpCollector.ExportLogsServiceRequest();
request.ResourceLogs.Add(logsData);
request.ResourceLogs.Add(logsData.ResourceLogs);
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1070,10 +1070,10 @@ private static OtlpCollector.ExportMetricsServiceRequest CreateMetricExportReque
var writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(buffer, 0, resource, in batch);
using var stream = new MemoryStream(buffer, 0, writePosition);

var metricsData = OtlpMetrics.ResourceMetrics.Parser.ParseFrom(stream);
var metricsData = OtlpMetrics.MetricsData.Parser.ParseFrom(stream);

var request = new OtlpCollector.ExportMetricsServiceRequest();
request.ResourceMetrics.Add(metricsData);
request.ResourceMetrics.Add(metricsData.ResourceMetrics);
return request;
}
}

0 comments on commit 2ae01a7

Please sign in to comment.