From 2ae01a73fea4cbcc69f67cbb5aa4be5cfe875e23 Mon Sep 17 00:00:00 2001 From: Rajkumar Rangaraj Date: Fri, 15 Nov 2024 09:05:52 -0800 Subject: [PATCH] [otlp] Add Log and Mertic Exporter to transmit custom serialized data (#5977) Co-authored-by: Mikel Blanchard --- .../ProtobufOtlpLogFieldNumberConstants.cs | 6 +- .../Serializer/ProtobufOtlpLogSerializer.cs | 5 + .../ProtobufOtlpMetricSerializer.cs | 5 + .../OtlpLogExporterHelperExtensions.cs | 14 +- .../OtlpMetricExporterExtensions.cs | 11 +- .../ProtobufOtlpLogExporter.cs | 127 ++++++++++++++++++ .../ProtobufOtlpMetricExporter.cs | 120 +++++++++++++++++ .../OtlpLogExporterTests.cs | 4 +- .../OtlpMetricsExporterTests.cs | 4 +- 9 files changed, 286 insertions(+), 10 deletions(-) create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogFieldNumberConstants.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogFieldNumberConstants.cs index 3361a8551c..1ff402d6b1 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogFieldNumberConstants.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogFieldNumberConstants.cs @@ -5,8 +5,12 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer internal static class ProtobufOtlpLogFieldNumberConstants { - // Resource Logs #pragma warning disable SA1310 // Field names should not contain underscore + + // Logs data + internal const int LogsData_Resource_Logs = 1; + + // Resource Logs internal const int ResourceLogs_Resource = 1; internal const int ResourceLogs_Scope_Logs = 2; internal const int ResourceLogs_Schema_Url = 3; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogSerializer.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogSerializer.cs index 4c2f9c95ac..f814c929e4 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogSerializer.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogSerializer.cs @@ -21,6 +21,10 @@ internal static class ProtobufOtlpLogSerializer internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch logRecordBatch) { + writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpLogFieldNumberConstants.LogsData_Resource_Logs, ProtobufWireType.LEN); + int logsDataLengthPosition = writePosition; + writePosition += ReserveSizeForLength; + foreach (var logRecord in logRecordBatch) { var scopeName = logRecord.Logger.Name; @@ -34,6 +38,7 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti } writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList); + ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength)); ReturnLogRecordListToPool(); return writePosition; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs index 77f5952fb4..e729539d80 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs @@ -19,6 +19,10 @@ internal static class ProtobufOtlpMetricSerializer internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources.Resource? resource, in Batch batch) { + writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.MetricsData_Resource_Metrics, ProtobufWireType.LEN); + int mericsDataLengthPosition = writePosition; + writePosition += ReserveSizeForLength; + foreach (var metric in batch) { var metricName = metric.MeterName; @@ -32,6 +36,7 @@ internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources } writePosition = WriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList); + ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength)); ReturnMetricListToPool(); return writePosition; diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporterHelperExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporterHelperExtensions.cs index 42faf42535..0929fe0cc7 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporterHelperExtensions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporterHelperExtensions.cs @@ -305,10 +305,16 @@ internal static BaseProcessor BuildOtlpLogExporter( * "OtlpLogExporter"); */ - BaseExporter otlpExporter = new OtlpLogExporter( - exporterOptions!, - sdkLimitOptions!, - experimentalOptions!); + BaseExporter otlpExporter; + + if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer) + { + otlpExporter = new ProtobufOtlpLogExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!); + } + else + { + otlpExporter = new OtlpLogExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!); + } if (configureExporterInstance != null) { diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporterExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporterExtensions.cs index 1a8ef2f1b4..ab42b02547 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporterExtensions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporterExtensions.cs @@ -175,7 +175,16 @@ internal static MetricReader BuildOtlpExporterMetricReader( exporterOptions!.TryEnableIHttpClientFactoryIntegration(serviceProvider!, "OtlpMetricExporter"); - BaseExporter metricExporter = new OtlpMetricExporter(exporterOptions!, experimentalOptions!); + BaseExporter metricExporter; + + if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer) + { + metricExporter = new ProtobufOtlpMetricExporter(exporterOptions!, experimentalOptions!); + } + else + { + metricExporter = new OtlpMetricExporter(exporterOptions!, experimentalOptions!); + } if (configureExporterInstance != null) { diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs new file mode 100644 index 0000000000..5e5f2db1b3 --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpLogExporter.cs @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Buffers.Binary; +using System.Diagnostics; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; +using OpenTelemetry.Logs; +using OpenTelemetry.Resources; + +namespace OpenTelemetry.Exporter; + +/// +/// Exporter consuming and exporting the data using +/// the OpenTelemetry protocol (OTLP). +/// +internal sealed class ProtobufOtlpLogExporter : BaseExporter +{ + private readonly SdkLimitOptions sdkLimitOptions; + private readonly ExperimentalOptions experimentalOptions; + private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler; + private readonly int startWritePosition; + + private Resource? resource; + + // Initial buffer size set to ~732KB. + // This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB, + // by the 7th doubling to maintain efficient allocation without frequent resizing. + private byte[] buffer = new byte[750000]; + + /// + /// Initializes a new instance of the class. + /// + /// Configuration options for the exporter. + public ProtobufOtlpLogExporter(OtlpExporterOptions options) + : this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// . + /// . + /// . + /// . + internal ProtobufOtlpLogExporter( + OtlpExporterOptions exporterOptions, + SdkLimitOptions sdkLimitOptions, + ExperimentalOptions experimentalOptions, + ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null) + { + Debug.Assert(exporterOptions != null, "exporterOptions was null"); + Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null"); + Debug.Assert(experimentalOptions != null, "experimentalOptions was null"); + + this.experimentalOptions = experimentalOptions!; + this.sdkLimitOptions = sdkLimitOptions!; + this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; + this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!); + } + + internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); + + /// + public override ExportResult Export(in Batch logRecordBatch) + { + // Prevents the exporter's gRPC and HTTP operations from being instrumented. + using var scope = SuppressInstrumentationScope.Begin(); + + try + { + int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch); + + if (this.startWritePosition == 5) + { + // Grpc payload consists of 3 parts + // byte 0 - Specifying if the payload is compressed. + // 1-4 byte - Specifies the length of payload in big endian format. + // 5 and above - Protobuf serialized data. + Span data = new Span(this.buffer, 1, 4); + var dataLength = writePosition - 5; + BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); + } + + if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) + { + return ExportResult.Failure; + } + } + catch (IndexOutOfRangeException) + { + if (!this.IncreaseBufferSize()) + { + throw; + } + } + catch (Exception ex) + { + OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex); + return ExportResult.Failure; + } + + return ExportResult.Success; + } + + /// + protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true; + + // TODO: Consider moving this to a shared utility class. + private bool IncreaseBufferSize() + { + var newBufferSize = this.buffer.Length * 2; + + if (newBufferSize > 100 * 1024 * 1024) + { + return false; + } + + var newBuffer = new byte[newBufferSize]; + this.buffer.CopyTo(newBuffer, 0); + this.buffer = newBuffer; + + return true; + } +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs new file mode 100644 index 0000000000..073932cd97 --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpMetricExporter.cs @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Buffers.Binary; +using System.Diagnostics; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; + +namespace OpenTelemetry.Exporter; + +/// +/// Exporter consuming and exporting the data using +/// the OpenTelemetry protocol (OTLP). +/// +internal sealed class ProtobufOtlpMetricExporter : BaseExporter +{ + private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler; + private readonly int startWritePosition; + + private Resource? resource; + + // Initial buffer size set to ~732KB. + // This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB, + // by the 7th doubling to maintain efficient allocation without frequent resizing. + private byte[] buffer = new byte[750000]; + + /// + /// Initializes a new instance of the class. + /// + /// Configuration options for the exporter. + public ProtobufOtlpMetricExporter(OtlpExporterOptions options) + : this(options, experimentalOptions: new(), transmissionHandler: null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// . + /// . + /// . + internal ProtobufOtlpMetricExporter( + OtlpExporterOptions exporterOptions, + ExperimentalOptions experimentalOptions, + ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null) + { + Debug.Assert(exporterOptions != null, "exporterOptions was null"); + Debug.Assert(experimentalOptions != null, "experimentalOptions was null"); + + this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; + this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!); + } + + internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); + + /// + public override ExportResult Export(in Batch metrics) + { + // Prevents the exporter's gRPC and HTTP operations from being instrumented. + using var scope = SuppressInstrumentationScope.Begin(); + + try + { + int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics); + + if (this.startWritePosition == 5) + { + // Grpc payload consists of 3 parts + // byte 0 - Specifying if the payload is compressed. + // 1-4 byte - Specifies the length of payload in big endian format. + // 5 and above - Protobuf serialized data. + Span data = new Span(this.buffer, 1, 4); + var dataLength = writePosition - 5; + BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); + } + + if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) + { + return ExportResult.Failure; + } + } + catch (IndexOutOfRangeException) + { + if (!this.IncreaseBufferSize()) + { + throw; + } + } + catch (Exception ex) + { + OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex); + return ExportResult.Failure; + } + + return ExportResult.Success; + } + + /// + protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds); + + // TODO: Consider moving this to a shared utility class. + private bool IncreaseBufferSize() + { + var newBufferSize = this.buffer.Length * 2; + + if (newBufferSize > 100 * 1024 * 1024) + { + return false; + } + + var newBuffer = new byte[newBufferSize]; + this.buffer.CopyTo(newBuffer, 0); + this.buffer = newBuffer; + + return true; + } +} diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpLogExporterTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpLogExporterTests.cs index 4cb11ec782..032e88252e 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpLogExporterTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpLogExporterTests.cs @@ -1888,9 +1888,9 @@ private static OtlpCollector.ExportLogsServiceRequest CreateLogsExportRequest(Sd var buffer = new byte[4096]; var writePosition = ProtobufOtlpLogSerializer.WriteLogsData(buffer, 0, sdkOptions, experimentalOptions, resource, batch); using var stream = new MemoryStream(buffer, 0, writePosition); - var logsData = OtlpLogs.ResourceLogs.Parser.ParseFrom(stream); + var logsData = OtlpLogs.LogsData.Parser.ParseFrom(stream); var request = new OtlpCollector.ExportLogsServiceRequest(); - request.ResourceLogs.Add(logsData); + request.ResourceLogs.Add(logsData.ResourceLogs); return request; } diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpMetricsExporterTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpMetricsExporterTests.cs index 74f9d70e36..f34bc57212 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpMetricsExporterTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpMetricsExporterTests.cs @@ -1070,10 +1070,10 @@ private static OtlpCollector.ExportMetricsServiceRequest CreateMetricExportReque var writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(buffer, 0, resource, in batch); using var stream = new MemoryStream(buffer, 0, writePosition); - var metricsData = OtlpMetrics.ResourceMetrics.Parser.ParseFrom(stream); + var metricsData = OtlpMetrics.MetricsData.Parser.ParseFrom(stream); var request = new OtlpCollector.ExportMetricsServiceRequest(); - request.ResourceMetrics.Add(metricsData); + request.ResourceMetrics.Add(metricsData.ResourceMetrics); return request; } }