Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[otlp] Fix TODOs, Refactor Buffer Size Handling, and Cleanup Environment Variables #6009

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ internal static IOpenTelemetryBuilder UseOtlpExporter(
/// <para><see cref="IConfiguration"/> to bind onto <see cref="OtlpExporterBuilderOptions"/>.</para>
/// <para>Notes:
/// <list type="bullet">
/// <item docLink="true">See [TODO:Add doc link] for details on the configuration
/// schema.</item>
/// <item docLink="true"><see href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md"/>
/// for details on the configuration schema.</item>
/// <item>The <see cref="OtlpExporterBuilderOptions"/> instance will be
/// named "otlp" by default when calling this method.</item>
/// </list>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ internal sealed class ExperimentalOptions

public const string OtlpDiskRetryDirectoryPathEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_DISK_RETRY_DIRECTORY_PATH";

public const string OtlpUseCustomSerializer = "OTEL_DOTNET_EXPERIMENTAL_USE_CUSTOM_PROTOBUF_SERIALIZER";

public ExperimentalOptions()
: this(new ConfigurationBuilder().AddEnvironmentVariables().Build())
{
Expand All @@ -31,11 +29,6 @@ public ExperimentalOptions(IConfiguration configuration)
this.EmitLogEventAttributes = emitLogEventAttributes;
}

if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, OtlpUseCustomSerializer, out var useCustomSerializer))
{
this.UseCustomProtobufSerializer = useCustomSerializer;
}

if (configuration.TryGetStringValue(OtlpRetryEnvVar, out var retryPolicy) && retryPolicy != null)
{
if (retryPolicy.Equals("in_memory", StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -85,9 +78,4 @@ public ExperimentalOptions(IConfiguration configuration)
/// Gets the path on disk where the telemetry will be stored for retries at a later point.
/// </summary>
public string? DiskRetryDirectoryPath { get; }

/// <summary>
/// Gets a value indicating whether custom serializer should be used for OTLP export.
/// </summary>
public bool UseCustomProtobufSerializer { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ internal static class GrpcStatusDeserializer
TimeSpan.FromTicks(retryInfo.Value.RetryDelay.Value.Nanos / 100); // Convert nanos to ticks
}
}
catch
catch (Exception ex)
{
// TODO: Log exception to event source.
OpenTelemetryProtocolExporterEventSource.Log.GrpcRetryDelayParsingFailed(grpcStatusDetailsHeader, ex);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public void RequestTimedOut(Uri endpoint, Exception ex)
}
}

[NonEvent]
public void GrpcRetryDelayParsingFailed(string? grpcStatusDetailsHeader, Exception ex)
{
if (Log.IsEnabled(EventLevel.Warning, EventKeywords.All))
{
this.GrpcRetryDelayParsingFailed(grpcStatusDetailsHeader ?? "null", ex.ToInvariantString());
}
}

[Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)]
public void FailedToReachCollector(string rawCollectorUri, string ex)
{
Expand Down Expand Up @@ -205,6 +214,12 @@ public void ExportFailure(string endpoint, string message)
this.WriteEvent(23, endpoint, message);
}

[Event(24, Message = "Failed to parse gRPC retry delay from header grpcStatusDetailsHeader: '{0}'. Exception: {1}", Level = EventLevel.Warning)]
public void GrpcRetryDelayParsingFailed(string grpcStatusDetailsHeader, string exception)
{
this.WriteEvent(24, grpcStatusDetailsHeader, exception);
}

void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value)
{
this.InvalidConfigurationValue(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,36 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti
logRecords.Add(logRecord);
}

writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
writePosition = TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength));
ReturnLogRecordListToPool();

return writePosition;
}

internal static int TryWriteResourceLogs(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, Dictionary<string, List<LogRecord>> scopeLogs)
{
try
{
writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
}
catch (IndexOutOfRangeException)
{
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Logs))
{
throw;
}

return TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
rajkumar-rangaraj marked this conversation as resolved.
Show resolved Hide resolved
}
catch
{
throw;
}
rajkumar-rangaraj marked this conversation as resolved.
Show resolved Hide resolved

return writePosition;
}

internal static void ReturnLogRecordListToPool()
{
if (ScopeLogsList.Count != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,36 @@ internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources
metrics.Add(metric);
}

writePosition = WriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
writePosition = TryWriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength));
ReturnMetricListToPool();

return writePosition;
}

internal static int TryWriteResourceMetrics(byte[] buffer, int writePosition, Resources.Resource? resource, Dictionary<string, List<Metric>> scopeMetrics)
{
try
{
writePosition = WriteResourceMetrics(buffer, writePosition, resource, scopeMetrics);
}
catch (IndexOutOfRangeException)
{
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Metrics))
{
throw;
}

return TryWriteResourceMetrics(buffer, writePosition, resource, scopeMetrics);
}
catch
{
throw;
}

return writePosition;
}

private static void ReturnMetricListToPool()
{
if (ScopeMetricsList.Count != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
public static OtlpExporterTransmissionHandler GetExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
{
var exportClient = GetProtobufExportClient(options, otlpSignalType);
var exportClient = GetExportClient(options, otlpSignalType);

// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
Expand Down Expand Up @@ -88,7 +88,7 @@ public static OtlpExporterTransmissionHandler GetProtobufExportTransmissionHandl
}
}

public static IExportClient GetProtobufExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
public static IExportClient GetExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
{
var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public sealed class OtlpLogExporter : BaseExporter<LogRecord>
{
private const int GrpcStartWritePosition = 5;
private readonly SdkLimitOptions sdkLimitOptions;
private readonly ExperimentalOptions experimentalOptions;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
Expand Down Expand Up @@ -57,8 +58,8 @@ internal OtlpLogExporter(

this.experimentalOptions = experimentalOptions!;
this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -73,14 +74,14 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand All @@ -89,13 +90,6 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -107,21 +101,4 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpMetricExporter : BaseExporter<Metric>
{
private const int GrpcStartWritePosition = 5;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

Expand Down Expand Up @@ -50,8 +51,8 @@ internal OtlpMetricExporter(
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");

this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -66,14 +67,14 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand All @@ -82,13 +83,6 @@ public override ExportResult Export(in Batch<Metric> metrics)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -100,21 +94,4 @@ public override ExportResult Export(in Batch<Metric> metrics)

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpTraceExporter : BaseExporter<Activity>
{
private const int GrpcStartWritePosition = 5;
private readonly SdkLimitOptions sdkLimitOptions;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;
Expand Down Expand Up @@ -53,8 +54,8 @@ internal OtlpTraceExporter(
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");

this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -69,14 +70,14 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
{
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand Down
Loading
Loading