Skip to content

Commit

Permalink
[otlp] Add Trace Exporter to transmit custom serialized data. (#5969)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikel Blanchard <[email protected]>
  • Loading branch information
rajkumar-rangaraj and CodeBlanch authored Nov 14, 2024
1 parent b201d70 commit 1e7397e
Show file tree
Hide file tree
Showing 16 changed files with 786 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ internal sealed class ExperimentalOptions

public const string OtlpDiskRetryDirectoryPathEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_DISK_RETRY_DIRECTORY_PATH";

public const string OtlpUseCustomSerializer = "OTEL_DOTNET_EXPERIMENTAL_USE_CUSTOM_PROTOBUF_SERIALIZER";

public ExperimentalOptions()
: this(new ConfigurationBuilder().AddEnvironmentVariables().Build())
{
Expand All @@ -29,6 +31,11 @@ public ExperimentalOptions(IConfiguration configuration)
this.EmitLogEventAttributes = emitLogEventAttributes;
}

if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, OtlpUseCustomSerializer, out var useCustomSerializer))
{
this.UseCustomProtobufSerializer = useCustomSerializer;
}

if (configuration.TryGetStringValue(OtlpRetryEnvVar, out var retryPolicy) && retryPolicy != null)
{
if (retryPolicy.Equals("in_memory", StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -78,4 +85,9 @@ public ExperimentalOptions(IConfiguration configuration)
/// Gets the path on disk where the telemetry will be stored for retries at a later point.
/// </summary>
public string? DiskRetryDirectoryPath { get; }

/// <summary>
/// Gets a value indicating whether custom serializer should be used for OTLP export.
/// </summary>
public bool UseCustomProtobufSerializer { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Export client interface.</summary>
internal interface IProtobufExportClient
{
/// <summary>
/// Method for sending export request to the server.
/// </summary>
/// <param name="buffer">The request body to send to the server.</param>
/// <param name="contentLength">length of the content.</param>
/// <param name="deadlineUtc">The deadline time in utc for export request to finish.</param>
/// <param name="cancellationToken">An optional token for canceling the call.</param>
/// <returns><see cref="ExportClientResponse"/>.</returns>
ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default);

/// <summary>
/// Method for shutting down the export client.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> if shutdown succeeded; otherwise, <c>false</c>.
/// </returns>
bool Shutdown(int timeoutMilliseconds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using Grpc.Core;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Base class for sending OTLP export request over gRPC.</summary>
internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient
{
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc");
private static readonly Version Http2RequestVersion = new(2, 0);

public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal int TimeoutMilliseconds { get; }

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);

using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex);
}

// TODO: Hande retries & failures.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex);
}
}

public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = MediaHeaderValue;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
internal sealed class ProtobufOtlpHttpExportClient : IProtobufExportClient
{
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
#if NET
private readonly bool synchronousSendSupportedByCurrentPlatform;
#endif

internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.AppendSignalPathToEndpoint
? options.Endpoint.AppendPathIfNotPresent(signalPath)
: options.Endpoint;
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;

#if NET
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
this.synchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
&& !OperatingSystem.IsIOS()
&& !OperatingSystem.IsTvOS()
&& !OperatingSystem.IsBrowser();
#endif
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);

using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex);
}

return SuccessExportResponse;
}
catch (HttpRequestException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);
return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex);
}
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

var content = new ByteArrayContent(exportRequest, 0, contentLength);
content.Headers.ContentType = MediaHeaderValue;
request.Content = content;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
#if NET
return this.synchronousSendSupportedByCurrentPlatform
? this.HttpClient.Send(request, cancellationToken)
: this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#else
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer

internal static class ProtobufOtlpTraceFieldNumberConstants
{
// Resource spans
#pragma warning disable SA1310 // Field names should not contain underscore

// Traces data
internal const int TracesData_Resource_Spans = 1;

// Resource spans
internal const int ResourceSpans_Resource = 1;
internal const int ResourceSpans_Scope_Spans = 2;
internal const int ResourceSpans_Schema_Url = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ internal static class ProtobufOtlpTraceSerializer

internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, Resources.Resource? resource, in Batch<Activity> batch)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpTraceFieldNumberConstants.TracesData_Resource_Spans, ProtobufWireType.LEN);
int resourceSpansScopeSpansLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

foreach (var activity in batch)
{
var sourceName = activity.Source.Name;
if (!ScopeTracesList.TryGetValue(sourceName, out var activities))
{
activities = ActivityListPool.Count > 0 ? ActivityListPool.Pop() : new List<Activity>();
activities = ActivityListPool.Count > 0 ? ActivityListPool.Pop() : [];
ScopeTracesList[sourceName] = activities;
}

Expand All @@ -34,6 +38,7 @@ internal static int WriteTraceData(byte[] buffer, int writePosition, SdkLimitOpt

writePosition = WriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource, ScopeTracesList);
ReturnActivityListToPool();
ProtobufSerializer.WriteReservedLength(buffer, resourceSpansScopeSpansLengthPosition, writePosition - (resourceSpansScopeSpansLengthPosition + ReserveSizeForLength));

return writePosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds)

protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
if (RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out _))
if (RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out _))
{
byte[]? data = null;
if (request is ExportTraceServiceRequest traceRequest)
Expand Down Expand Up @@ -158,7 +158,8 @@ private void RetryStoredRequests()
{
var deadlineUtc = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);
var request = this.requestFactory.Invoke(data);
if (this.TryRetryRequest(request, deadlineUtc, out var response) || !RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo))
if (this.TryRetryRequest(request, deadlineUtc, out var response)
|| !RetryHelper.ShouldRetryRequest(response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo))
{
blob.TryDelete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal OtlpExporterRetryTransmissionHandler(IExportClient<TRequest> exportClie
protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
var nextRetryDelayMilliseconds = OtlpRetry.InitialBackoffMilliseconds;
while (RetryHelper.ShouldRetryRequest(request, response, nextRetryDelayMilliseconds, out var retryResult))
while (RetryHelper.ShouldRetryRequest(response, nextRetryDelayMilliseconds, out var retryResult))
{
// Note: This delay cannot exceed the configured timeout period for otlp exporter.
// If the backend responds with `RetryAfter` duration that would result in exceeding the configured timeout period
Expand Down
Loading

0 comments on commit 1e7397e

Please sign in to comment.